Я написал пару примеров воркеров While True, которые извлекают данные из очереди. В принципе производители должны вести себя примерно так, как показано ниже, но на самом деле это данные из нескольких фидов сокетов ssl, поэтому производители здесь предназначены для целей моделирования.
В моем случае важны доли миллисекунд, и я изо всех сил пытаюсь определить, какая из приведенных ниже альтернатив (или какая-либо другая альтернатива) является лучшей. Я понял, что разделение времени может повлиять на скорость отклика. Когда я пытаюсь запустить приведенный ниже код, все примеры работают хорошо, когда дело доходит до загрузки ЦП, за исключением альтернативы 5, которая потребляет 100% моего ЦП.
На данный момент мой вывод заключается в том, что блокирующий .get() потребляет меньше ресурсов ЦП, чем неблокирующий .get(), учитывая, что неблокирующий цикл .get() настроен на очень короткий сон.
Моя цель — реализовать код, который будет потреблять мало процессорного времени, а также реагировать на новые обновления. т. е. я хотел бы, чтобы один из рабочих выполнял работу в течение 0,1 миллисекунды с момента поступления нового элемента в очередь.
Пример кода, написанного на Python 2.7 для Windows 7 (обратите внимание, что я использую 24-ядерную машину, поэтому количество рабочих может быть уменьшено):
import multiprocessing
import os
import time
import Queue
class MyClass:
def __init__(self):
self.the_queue = multiprocessing.Queue()
self.printerq = multiprocessing.Queue()
def producer(self):
i=0
while True:
self.the_queue.put(["hello",i],False)
i=i+1
time.sleep(1)
'alternative 1'
'''
def worker_main(self):
while True:
try:
item = self.the_queue.get(timeout=0.1)
except Queue.Empty:
time.sleep(0.0001)
else:
self.printerq.put([item,os.getpid()],False)
'''
'alternative 2'
'''
def worker_main(self):
while True:
if not self.the_queue.empty():
item = self.the_queue.get()
#print os.getpid(), "got", item
self.printerq.put([item,os.getpid()],False)
time.sleep(0.0001)
'''
'alternative 3'
def worker_main(self):
while True:
item = self.the_queue.get()
self.printerq.put([item,os.getpid()],False)
'alternative 4'
'''
def worker_main(self):
while True:
item = self.the_queue.get()
self.printerq.put([item,os.getpid()],False)
time.sleep(0.0001)
'''
'alternative 5 eats CPU 100%'
'''
def worker_main(self):
while True:
try:
item = self.the_queue.get(False)
except Queue.Empty:
time.sleep(0.0001)
else:
self.printerq.put([item,os.getpid()],False)
time.sleep(0.0001)
'''
def printer(self):
while True:
stuff=self.printerq.get()
print stuff
if __name__=='__main__':
mc=MyClass()
process_printer = multiprocessing.Process(target=mc.printer, args=())
process_printer.start()
for i in range(100):
process_window = multiprocessing.Process(target=mc.worker_main, args=())
process_window.start()
time.sleep(0.1)
for i in range(100):
process_producer = multiprocessing.Process(target=mc.producer, args=())
process_producer.start()
time.sleep(0.1)
mulitprocessing.cpu_count
может дать вам количество работающих логических ядер, и, таким образом, вы можете назначить соответствующее количество процессов, чтобы у вас не было рабочих/производителей, ожидающих получения процесса. 02.11.2016