Nano Hash - криптовалюты, майнинг, программирование

Многопроцессорная обработка Python Pool.apply_async с общими переменными (значение)

Для моего проекта в колледже я пытаюсь разработать генератор трафика на основе Python. Я создал 2 машины CentOS на vmware, и я использую 1 в качестве моего клиента и 1 в качестве моей серверной машины. Я использовал псевдоним IP метод увеличения количества клиентов и серверов с использованием только одной клиент-серверной машины. До сих пор я создал 50 псевдонимов IP на моей клиентской машине и 10 псевдонимов IP на моей серверной машине. Я также использую многопроцессорный модуль для одновременной генерации трафика со всех 50 клиентов на все 10 серверов. Я также разработал несколько профилей (1 КБ, 10 КБ, 50 КБ, 100 КБ, 500 КБ, 1 МБ) на своем сервере (в каталоге /var/www/html, так как я использую Apache Server), и я использую urllib2 для отправки запроса на эти профили из моя клиентская машина. Я использую https://stackoverflow.com/questions/26903520/how-to-send-http-request-using-virtual-ip-address-in-linux сначала привязаться к любому из IP-адресов исходного псевдонима, а затем отправить запрос с этого IP-адреса, используя urllib2. Здесь, чтобы увеличить мой число TCP-соединений, я пытаюсь использовать модуль multiprocessing.Pool.apply_async. Но я получаю эту ошибку «RuntimeError: синхронизированные объекты должны быть разделены между процессами только через наследование» при запуске моих сценариев. После небольшой отладки я обнаружил, что эта ошибка вызвана использованием multiprocessing.Value. Но я хочу разделить некоторые переменные между своими процессами, а также увеличить количество TCP-соединений. Какой другой модуль (кроме multiprocessing.Value) можно использовать здесь для совместного использования некоторых общих переменных? Или еще есть какое-то другое решение для этого запроса?

'''
Traffic Generator Script:

 Here I have used IP Aliasing to create multiple clients on single vm machine. 
 Same I have done on server side to create multiple servers. I have around 50 clients and 10 servers
'''
import multiprocessing
import urllib2
import random
import myurllist    #list of all destination urls for all 10 servers
import time
import socbindtry   #script that binds various virtual/aliased client ips to the script
m=multiprocessing.Manager()
response_time=m.list()    #some shared variables
error_count=multiprocessing.Value('i',0)
def send_request3():    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
def send_request4():    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
#50 such functions are defined here for 50 clients
def func():
    pool=multiprocessing.Pool(processes=750)
    for i in range(5):
        pool.apply_async(send_request3)
        pool.apply_async(send_request4)
        pool.apply_async(send_request5)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return
start=float(time.time())
func()
end=float(time.time())-start
print end

  • response_time , это ваша общая переменная, не так ли? 03.04.2015
  • Используйте multiprocessing.Queue вместо одной глобальной переменной IMO. 03.04.2015
  • Pool.apply_async отлично работает, если я не использую общие переменные response_time и error_count. 03.04.2015
  • @pnv yes response_time и error_count — мои общие переменные. 03.04.2015
  • @pnv Я никогда не работал с multiprocessing.Queue. не могли бы вы подсказать, как мне использовать multiprocessing.Queue здесь? 03.04.2015
  • Я не совсем уверен, что это решение, но вы можете попробовать. pymotw.com/2/multiprocessing/communication.html 03.04.2015
  • Ну, я забыл, что вы использовали m.list, он не должен выдавать никаких ошибок. Можете поделиться трассировкой стека? 03.04.2015
  • да, я думаю, это из-за использования модуля Value для error_count (и еще нескольких переменных, которые я здесь не упомянул). 03.04.2015
  • да m.list не выдает никаких ошибок 03.04.2015
  • Давайте продолжим обсуждение в чате. 03.04.2015

Ответы:


1

Как указано в сообщениях об ошибках, вы не можете передать multiprocessing.Value через рассол. Однако вы можете использовать multiprocessing.Manager().Value:

import multiprocessing
import urllib2
import random
import myurllist    #list of all destination urls for all 10 servers
import time
import socbindtry   #script that binds various virtual/aliased client ips to the script

def send_request3(response_time, error_count):    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        with error_count.get_lock():
            error_count.value += 1

def send_request4(response_time, error_count):    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        with error_count.get_lock():
            error_count.value += 1

#50 such functions are defined here for 50 clients

def func(response_time, error_count):
    pool=multiprocessing.Pool(processes=2*multiprocessing.cpu_count())
    args = (response_time, error_count)
    for i in range(5):
        pool.apply_async(send_request3, args=args)
        pool.apply_async(send_request4, args=args)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return

if __name__ == "__main__":
    m=multiprocessing.Manager()
    response_time=m.list()    #some shared variables
    error_count=m.Value('i',0)

    start=float(time.time())
    func(response_time, error_count)
    end=float(time.time())-start
    print end

Несколько других заметок здесь:

  1. Использование Pool с 750 процессами - не лучшая идея. Если вы не используете сервер с сотнями ядер ЦП, это перегрузит вашу машину. Было бы быстрее и меньше нагружать вашу машину, если бы вы использовали значительно меньше процессов. Что-то более похожее на 2 * multiprocessing.cpu_count().
  2. Рекомендуется явно передавать все общие аргументы, которые необходимо использовать, дочерним процессам, а не использовать глобальные переменные. Это увеличивает шансы, что код будет работать в Windows.
  3. Похоже, что все ваши send_request* функции делают почти одно и то же. Почему бы просто не сделать одну функцию и использовать переменную, чтобы решить, какой socbindtry.BindableHTTPHandler использовать? Сделав это, вы избежите тонны дублирования кода.
  4. Способ, которым вы увеличиваете error_count, не является безопасным для процесса/потока и подвержен условиям гонки. Вам нужно защитить приращение блокировкой (как я сделал в приведенном выше примере кода).
06.04.2015
  • @dano, вам нужна блокировка значения, управляемого менеджером? Разве менеджер не заботится о доступе к Value? См. также bugs.python.org/issue35786. 05.06.2019
  • @ Авраам Да, я верю, что да. Значение, управляемое менеджером, является просто прокси для обычного значения, поэтому я не думаю, что он каким-либо образом мог бы сделать операцию += атомарной, поскольку это привело бы к двум отдельным вызовам прокси (один раз для получения текущего значения, а затем еще один, чтобы установить его на новый val). Хороший улов по вопросу get_lock(). Я думаю, вам придется использовать управляемый менеджером Lock() вместо этого. 05.06.2019
  • @dano, FWIW, в коде, который я использую, передача значения менеджера в пул, называемый asynchronoulsy, для 15 файлов, использование простых управляемых значений отлично работает без использования блокировки менеджера, но не работает, если перед ним стоит блокировка with. Я исследовал, будет ли значение быстрее, чем очередь, которую я использовал в качестве счетчика, и, как вы сказали, ответ, кажется, нет, поскольку я использую пул, все становится маринованным, а менеджер обрабатывает доступ к очереди и ценить оба. Спасибо! 06.06.2019

  • 2

    Возможно, потому что Python Multiprocess diff между Windows и Linux (я серьезно, не знаю, как многопроцессорность работает в виртуальных машинах, как здесь.)

    Это может сработать;

    import multiprocessing
    import random
    import myurllist    #list of all destination urls for all 10 servers
    import time
    
    def send_request3(response_time, error_count):    #function to send requests from alias client ip 1
        opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
        try:
            tstart=time.time()
            for i in range(myurllist.url):
                x=random.choice(myurllist.url[i])
                opener.open(x).read()
                print "file downloaded:",x
                response_time.append(time.time()-tstart)
        except urllib2.URLError, e:
            error_count.value=error_count.value+1
    def send_request4(response_time, error_count):    #function to send requests from alias client ip 2
        opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
        try:
            tstart=time.time()
            for i in range(myurllist.url):
                x=random.choice(myurllist.url[i])
                opener.open(x).read()
                print "file downloaded:",x
                response_time.append(time.time()-tstart)
        except urllib2.URLError, e:
            error_count.value=error_count.value+1
    #50 such functions are defined here for 50 clients
    def func():
        m=multiprocessing.Manager()
        response_time=m.list()    #some shared variables
        error_count=multiprocessing.Value('i',0)
    
        pool=multiprocessing.Pool(processes=750)
        for i in range(5):
            pool.apply_async(send_request3, [response_time, error_count])
            pool.apply_async(send_request4, [response_time, error_count])
            # pool.apply_async(send_request5)
    #append 50 functions here
        pool.close()
        pool.join()
        print"All work Done..!!"
        return
    
    
    start=float(time.time())
    func()
    end=float(time.time())-start
    print end
    
    03.04.2015
  • Нет, это не сработало для меня .. в любом случае спасибо за помощь 06.04.2015
  • Неважно, находится ли он в виртуальной машине, все дело в ОС, работающей внутри виртуальной машины. В этом весь смысл виртуальных машин — ни одно из программ, работающих внутри них, не должно знать, что это не настоящее оборудование. Виртуальные машины работают под управлением CentOS, поэтому применимы все нормальные поведения Linux multiprocessing. 06.04.2015
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..