Nano Hash - криптовалюты, майнинг, программирование

Многопроцессорность: скорость отклика и загрузка ЦП при использовании нескольких рабочих процессов while true queue.get()

Я написал пару примеров воркеров While True, которые извлекают данные из очереди. В принципе производители должны вести себя примерно так, как показано ниже, но на самом деле это данные из нескольких фидов сокетов ssl, поэтому производители здесь предназначены для целей моделирования.

В моем случае важны доли миллисекунд, и я изо всех сил пытаюсь определить, какая из приведенных ниже альтернатив (или какая-либо другая альтернатива) является лучшей. Я понял, что разделение времени может повлиять на скорость отклика. Когда я пытаюсь запустить приведенный ниже код, все примеры работают хорошо, когда дело доходит до загрузки ЦП, за исключением альтернативы 5, которая потребляет 100% моего ЦП.

На данный момент мой вывод заключается в том, что блокирующий .get() потребляет меньше ресурсов ЦП, чем неблокирующий .get(), учитывая, что неблокирующий цикл .get() настроен на очень короткий сон.

Моя цель — реализовать код, который будет потреблять мало процессорного времени, а также реагировать на новые обновления. т. е. я хотел бы, чтобы один из рабочих выполнял работу в течение 0,1 миллисекунды с момента поступления нового элемента в очередь.

Пример кода, написанного на Python 2.7 для Windows 7 (обратите внимание, что я использую 24-ядерную машину, поэтому количество рабочих может быть уменьшено):

import multiprocessing
import os
import time
import Queue


class MyClass:
    def __init__(self):
        self.the_queue = multiprocessing.Queue()
        self.printerq = multiprocessing.Queue()

    def producer(self):
        i=0
        while True:
            self.the_queue.put(["hello",i],False)
            i=i+1
            time.sleep(1)

    'alternative 1'
    '''
    def worker_main(self):
        while True:
            try:
                item = self.the_queue.get(timeout=0.1)
            except Queue.Empty:
                time.sleep(0.0001) 
            else:
                self.printerq.put([item,os.getpid()],False)
    '''

    'alternative 2'
    '''
    def worker_main(self):
        while True:
            if not self.the_queue.empty():
                item = self.the_queue.get()
                #print os.getpid(), "got", item
                self.printerq.put([item,os.getpid()],False)
            time.sleep(0.0001)
    '''

    'alternative 3'

    def worker_main(self):
        while True:
            item = self.the_queue.get()
            self.printerq.put([item,os.getpid()],False)

    'alternative 4'
    '''
    def worker_main(self):
        while True:
            item = self.the_queue.get()
            self.printerq.put([item,os.getpid()],False)
            time.sleep(0.0001)
    '''

    'alternative 5 eats CPU 100%'
    '''
    def worker_main(self):
        while True:
            try:
                item = self.the_queue.get(False)
            except Queue.Empty:
                time.sleep(0.0001)
            else:
                self.printerq.put([item,os.getpid()],False)
                time.sleep(0.0001)
    '''

    def printer(self):
        while True:
            stuff=self.printerq.get()
            print stuff


if __name__=='__main__':
    mc=MyClass()

    process_printer = multiprocessing.Process(target=mc.printer, args=())
    process_printer.start() 

    for i in range(100):
        process_window = multiprocessing.Process(target=mc.worker_main, args=())
        process_window.start()
        time.sleep(0.1)

    for i in range(100):
        process_producer = multiprocessing.Process(target=mc.producer, args=())
        process_producer.start() 
        time.sleep(0.1)

  • В моем случае важны доли миллисекунд — вы уверены, что Python — правильный инструмент для этой работы? 02.11.2016
  • Я надеюсь, что это будет работать по назначению... 02.11.2016
  • Я надеюсь, что это сработает для этой цели ... Я рассчитал время расчета примерно до 0,07 мс. Когда я начал программировать, на каждое обновление уходило около 6-7 секунд. Если я смогу убедиться, что приложение достаточно отзывчиво и может отправлять https-запросы от рабочих в течение 0,1 мс, тогда я буду знать, что общая скорость отклика моей системы будет около 0,2 мс, что нормально. 02.11.2016

Ответы:


1

Скорее всего, вы добьетесь наилучшей производительности, удалив все вызовы time.sleep(). Я бы рекомендовал использовать queue.get(), который выполняет активное ожидание и использует флаги уничтожения для завершения ваших процессов. Ваш воркер должен выглядеть примерно так:

def worker_main(self):
    while True:
        item = self.the_queue.get()
        # Checks for kill flag
        if item == None:
            break 
        self.printerq.put([item,os.getpid()],False)

Этот метод требует, чтобы ваш продюсер поместил None в self.the_queue. Тем не менее, поскольку у вас есть несколько производителей, вы можете захотеть изменить рабочих так, чтобы они получали флаг уничтожения от каждого производителя. Что-то типа:

def worker_main(self):
    kill_flags = 0        
    # Make sure to pass in num_producers
    while kill_flags < num_producers:
        item = self.the_queue.get()
        # Checks for kill flag
        if item == None:
            kill_flag += 1
            continue 
        self.printerq.put([item,os.getpid()],False)

Что касается ваших продюсеров, аналогичный подход будет работать, когда они закончат производство:

def producer(self):
    i=0
    while (some_condition_to_stop_producing):
        self.the_queue.put(["hello",i],False)
        i=i+1
        time.sleep(1)
    # Pass in the number of workers
    for i in range(num_workers):
        self.the_queue.put(None)

Не забывайте, что вам также нужно будет изменить принтер, чтобы он заканчивался, когда вы закончите помещать вещи в printerq.

02.11.2016
  • Спасибо Навидад20. Как для ответа, так и для редактирования. Это было решение, к которому я тоже склонялся. Единственное, что мне интересно, это то, как этот тип решения влияет на реакцию в других процессах/потоках, как процессор понимает, что он должен переключаться между ними, и время ожидания системы, прежде чем произойдет переключение при использовании Windows 7. Многопроцессорность — это немного. сложно, когда дело доходит до времени работы алгоритма... 02.11.2016
  • Рассматривали ли вы время каждой из ваших функций, чтобы увидеть, какие из них занимают больше всего времени? Функция mulitprocessing.cpu_count может дать вам количество работающих логических ядер, и, таким образом, вы можете назначить соответствующее количество процессов, чтобы у вас не было рабочих/производителей, ожидающих получения процесса. 02.11.2016
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..