Nano Hash - криптовалюты, майнинг, программирование

Многопроцессорность с блоками контекста fork в Linux/Intel Xeon с Python 3.6.1?

Описание проблемы
Я изменил код из этого ответьте немного (см. ниже). Однако при запуске этого скрипта в Linux (поэтому командная строка: python script_name.py) он будет печатать jobs running: x для всех заданий, но после этого кажется, что он просто зависает. Однако, когда я использую метод spawn (mp.set_start_method('spawn')), он работает нормально и сразу начинает печатать значение переменной counter (см. метод listener).

Вопрос

  • Почему это работает только при спауне процессов?
  • Как настроить код, чтобы он работал с fork? (потому что это, вероятно, быстрее)

Код

import io
import csv
import multiprocessing as mp

NEWLINE = '\n'

def file_searcher(file_path):
    parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='\t')

    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool(mp.cpu_count())

    # put listener to work first
    watcher = pool.apply_async(listener, (q,))

    jobs = []
    for row in parsed_file:
        print('jobs running: ' + str(len(jobs) + 1))
        job = pool.apply_async(worker, (row, q))
        jobs.append(job)

  # collect results from the workers through the pool result queue
    for job in jobs:
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

def worker(genome_row, q):
    complete_data = []
    #data processing
    #ftp connection to retrieve data
    #etc.
    q.put(complete_data)
    return complete_data

def listener(q):
    '''listens for messages on the q, writes to file. '''
    f = io.open('output.txt', 'w', encoding='utf-8')
    counter = 0
    while 1:
        m = q.get()
        counter +=1
        print(counter)
        if m == 'kill':
            break
        for x in m:
            f.write(x + NEWLINE)
        f.flush()
    f.close()

if __name__ == "__main__":
   file_searcher('path_to_some_tab_del_file.txt')

Информация о процессоре

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                20
On-line CPU(s) list:   0-19
Thread(s) per core:    1
Core(s) per socket:    1
Socket(s):             20
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 45
Model name:            Intel(R) Xeon(R) CPU E5-2660 v3 @ 2.60GHz
Stepping:              2
CPU MHz:               2596.501
BogoMIPS:              5193.98
Hypervisor vendor:     VMware
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              25600K
NUMA node0 CPU(s):     0-19

Версия ядра Linux

3.10.0-514.26.2.el7.x86_64

Версия Python

Python 3.6.1 :: Continuum Analytics, Inc.

ЖУРНАЛ
Я добавил код, предложенный @yacc, это даст следующий журнал:

[server scripts]$ python main_v3.py
[INFO/SyncManager-1] child process calling self.run()
[INFO/SyncManager-1] created temp directory /tmp/pymp-2a9stjh6
[INFO/SyncManager-1] manager serving at '/tmp/pymp-2a9stjh6/listener-jxwseclw'
[DEBUG/MainProcess] requesting creation of a shared 'Queue' object
[DEBUG/SyncManager-1] 'Queue' callable returned object with id '7f0842da56a0'
[DEBUG/MainProcess] INCREF '7f0842da56a0'
[DEBUG/MainProcess] created semlock with handle 139673691570176
[DEBUG/MainProcess] created semlock with handle 139673691566080
[DEBUG/MainProcess] created semlock with handle 139673691561984
[DEBUG/MainProcess] created semlock with handle 139673691557888
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0'
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-6] child process calling self.run()
[INFO/ForkPoolWorker-5] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-7] child process calling self.run()
[INFO/ForkPoolWorker-8] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-12] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-13] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-14] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-15] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-16] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-17] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-18] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-19] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-20] child process calling self.run()
jobs running: 1
jobs running: 2
jobs running: 3
jobs running: 4
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-21] child process calling self.run()
jobs running: 5
jobs running: 6
jobs running: 7
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0'
jobs running: 8
written to file
jobs running: 9
jobs running: 10
[DEBUG/ForkPoolWorker-2] thread 'MainThread' does not own a connection
[DEBUG/ForkPoolWorker-2] making connection to manager
jobs running: 11
jobs running: 12
jobs running: 13
jobs running: 14
jobs running: 15
[DEBUG/SyncManager-1] starting server thread to service 'ForkPoolWorker-2'
jobs running: 16
jobs running: 17
jobs running: 18
jobs running: 19
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0'

  • Не могли бы вы предоставить подробную информацию о версиях Linux, Python, пакета MP и оборудовании/процессоре? 05.09.2017
  • Я добавил информацию, которую вы просили (см. редактирование) @yacc. Я не смог понять, как получить версию пакета MP. Я надеюсь, что вы можете найти проблему 05.09.2017
  • многопроцессорность является частью основной библиотеки, так что это та же версия, что и остальные. Для меня (Python 3.4.3) код работает нормально (единственное, что я изменил, это удалить csvreader и вместо этого прочитать обычный файл). Вы пытались воспроизвести его в другом месте? 05.09.2017
  • @CodeNoob Хорошо, похоже, что информация о версии mp была удалена с p3. Не могли бы вы затем добавить вывод журнала? import logging и под основными logger = mp.log_to_stderr() и logger.setLevel(mp.SUBDEBUG). Запустите его с контекстом fork. 06.09.2017
  • Это будет печатать что-то только в начале программы, но не в тот момент, когда она застряла (см. Мое редактирование) @yacc 06.09.2017
  • @CodeNoob Отсутствует много отладочной информации, например [INFO/SyncManager-1] child process calling self.run() и т. д. Должна была быть напечатана стандартная ошибка. Есть ли шанс получить это? 06.09.2017
  • Я создал меньше процессов, поэтому теперь виден полный журнал (см. редактирование) @yacc 06.09.2017
  • Хорошо. Отличие от моего журнала в том, что вы получили только один [DEBUG/ForkPoolWorker-2] thread 'MainThread' does not own a connection [DEBUG/ForkPoolWorker-2] making connection to manager. Я вижу это для всех рабочих процессов, а затем большое количество журналов выполнения/завершения. Похоже, менеджер прекратил обслуживание и заблокировал пул, или вы что-то пропустили? У меня есть Python 3.5.3 на Ubuntu 17.04, возможно, ваша проблема сильно зависит от версии, поэтому я не могу вам помочь. 06.09.2017
  • Спасибо, по крайней мере, моя проблема теперь более конкретна, зная разницу с вашим журналом @yacc 06.09.2017
  • Я обновился до 3.6.1, и он все еще работает, как и ожидалось. Хотя размер бассейна 3. Не могли бы вы повторить попытку с mp.Pool(3)? 11.09.2017
  • Я пробовал это, но это приведет к той же проблеме :( Я думаю, что @yacc имеет какое-то отношение к установке Linux 12.09.2017
  • Это немного смущает, но, по крайней мере, у вас есть обходной путь с нерестом. Я предлагаю перейти на 3.6.2, и если это все еще происходит, откройте тикет на Python. Это может быть важно исправить для будущих выпусков. 12.09.2017
  • Я не знал теперь, когда он был отключен? На основании чего ты это увидел? @jxh 22.09.2017
  • Thread(s) per core: 1, с гиперпоточностью я ожидаю увидеть здесь 2. 22.09.2017
  • Но ведь это не влияет на возможности многопроцессорной обработки? @jxh 22.09.2017
  • Это означает, что что-то лжет вашей ОС. Это точно виртуальная машина? Ах, я вижу, что это так. 22.09.2017
  • Учитывая, что ваша проблема с производительностью связана с виртуальной машиной (примечание: Hypervisor vendor: VMware), я подозреваю, что на уровне аппаратной абстракции гипервизора есть блокирующий примитив, который мешает вам добиться максимальной производительности. Пожалуйста, посмотрите, есть ли у вас такая же проблема при работе на голом железе. 22.09.2017
  • Нет, я просто подключаюсь к серверу @jxh 22.09.2017
  • Ой, подождите, может быть, вы правы, я работаю над виртуальной машиной, с которой я подключаюсь к серверу @jxh 22.09.2017
  • Пожалуйста, объясните, что означает поставщик гипервизора: VMware и как это влияет на производительность (вы буквально первый, кто действительно нашел что-то в журналах, что может быть причиной) 22.09.2017
  • Это означает, что ваша машина на самом деле виртуальная, а не голое железо. Это означает, что существует уровень аппаратной абстракции, который притворяется машиной для Linux, в то время как на самом деле это процесс, работающий под ОС хоста. VMware — компания-разработчик программного обеспечения, которая создает гипервизоры — программное обеспечение, работающее на хост-ОС и предоставляющее виртуальным машинам их виртуальное оборудование. 22.09.2017
  • Спасибо, вы узнали что-то сегодня ;) Но мне все еще не ясно, как это влияет на mp, потому что при использовании mp я использую только 1 ядро ​​​​на процессор, верно? @jxh 22.09.2017
  • Поскольку fork должен делиться ресурсами с родителем, Linux, вероятно, накладывает блокировку. На реальном оборудовании эта блокировка будет дешевой, но на виртуальной машине неясно, насколько дешевой будет виртуальная версия этой блокировки. 22.09.2017
  • Аааа, это объясняет, почему spwan работает нормально, потому что тогда совместное использование не требуется. Ну ты гений, что понял это ;) Спасибо! @jxh 22.09.2017
  • Последний вопрос, ха-ха, повлияет ли это на многопоточность? @jxh 22.09.2017
  • Многопоточность фактически использует настоящее совместное использование всех ресурсов, в то время как разветвленные процессы будут выполнять копирование при записи. 22.09.2017
  • Я заметил, что вы не закрываете файл, который вы передаете в csv.DictReader. 21.05.2018
  • Может быть, это связано с фондом ошибок Hyper-Threadin в процессорах Skylake и Kabylake? theregister.co.uk/2017/06/25/ 31.07.2018
  • Скорее всего, метод fork каким-то образом оставляет внутреннее состояние несогласованным и вызывает взаимоблокировку io.open, делая listener бесполезным. Возможно, добавить операторы печати непосредственно перед и после строки f = io.open(...)? 19.09.2018

Ответы:


1

Как намекнул @jxh, различия между fork и spawn очень важны. документация по многопроцессорной обработке указывает в разделе 17.2.1.2, что разница заключается в следующем: сохраняет среду и такие вещи, как stdin/out, тогда как spawn просто создает новый новый процесс. Я думаю, что у вас, возможно, есть что-то в вашей среде, что вызывает проблемы для рабочей функции, вероятно, в коде, стоящем за вашими комментариями о другой обработке. Нерест дает вам чистый лист, и в этих условиях все работает нормально.

Чтобы определить, что происходит, я бы попросил каждого рабочего процесса распечатать диагностические сообщения, вероятно, записанные в файл, уникальный для каждого рабочего процесса. открывайте/закрывайте этот файл каждый раз, когда вы хотите написать сообщение, чтобы содержимое обновлялось/удалялось.

Вилка не должна быть быстрее, чем порождение, потому что вилке необходимо скопировать информацию об окружении в новый процесс. В любом случае, это только начальные затраты, которые, как я предполагаю, минимальны, потому что рабочему необходимо выполнить некоторую вычислительную работу или работу, связанную с вводом-выводом, которую вы хотите распараллелить.

18.10.2018
Новые материалы

Кластеризация: более глубокий взгляд
Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

Как написать эффективное резюме
Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

Частный метод Python: улучшение инкапсуляции и безопасности
Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

Как я автоматизирую тестирование с помощью Jest
Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..