Nano Hash - криптовалюты, майнинг, программирование

Многопроцессорная общая память Python; одна запись, несколько чтений

СИСТЕМА

  • Linux (Манжаро KDE)
  • Питон 3.8.3

ПРОГРАММА:
У меня есть входящие строковые данные на порт UDP. Основной цикл запускает процессы перед использованием селекторов для мониторинга порта UDP. Я хочу, чтобы данные UDP, которые постоянно обновляются, были доступны для каждого процесса.

ПЫТАЛИСЬ:

  • Многопроцессорные очереди с maxsize = 1 стали головной болью и быстро сломались.
  • Многопроцессорные массивы (вот где я сейчас)

Я проверил, и массив в каждом месте, на которое я смотрю, имеет один и тот же адрес памяти (я думаю). По какой-то причине, когда я пытаюсь получить доступ к содержимому массива в дочернем процессе, процесс зависает.

НЕ ПРОБЫВАЕТСЯ

  • Трубы. У меня есть ощущение, что это может быть путь. Но я уже глубоко на неизведанной территории; Я никогда не использовал их раньше.

ЧЕГО Я ХОЧУ
Я хотел бы получить доступ к данным UDP из дочерних процессов — это метод camera_view.

Фиктивная строка UDP

import socket
import random
import datetime
import time

conn = ('127.0.0.1', 6666)

def rand_value(f_val, t_val):
    result = round(random.uniform(f_val, t_val), 2)  
    result = random.uniform(f_val, t_val)
    return result

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

while True:

    time.sleep(6)
    timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    overlay = timestamp

    for i in range(9):
        val = rand_value(i*10, i*10+10)
        if i == 8: val = 'TASK: Im the real Batman'
        overlay = overlay + "," + str(val)
    
    print(overlay)
    sock.sendto(overlay.encode(), conn)

Моя программа

import datetime
import selectors
import socket
import time
from multiprocessing import Lock, Process, Queue
from multiprocessing.sharedctypes import Array
from ctypes import c_char_p


REQUIRED_CAMERAS = 1
CAMERA_CONN = {'name':['Colour Camera'], 'ip':['127.0.0.1'], 'port':[9000]}
OVERLAY_CONN = ('0.0.0.0', 6666)
CONTROL_CONN = ('0.0.0.0', 6667)
NUMBER_OF_ITEMS_IN_OVERLAY = 10

class Camera():
    def __init__(self, cam_name, cam_ip, cam_port):
        self.ip = cam_ip
        self.port = cam_port
        self.capture = cv2.VideoCapture(0)
        self.frame_width = int(self.capture.get(3))
        self.frame_height = int(self.capture.get(4))
        self.name = cam_name


def get_overlay(data_packet):
        data = data_packet.decode()
        data = data.split(',')
        field0 = data[0]
        field1 = 'KP: ' + str(round(float(data[1]), 3))
        field2 = 'DCC: ' + str(round(float(data[2]), 2)) + 'm'
        field3 = 'E: ' + str(round(float(data[3]), 2)) + 'm'
        field4 = 'N: ' + str(round(float(data[4]), 2)) + 'm'
        field5 = 'D: ' + str(round(float(data[5]), 2)) + 'm'
        field6 = 'H: ' + str(round(float(data[6]), 2)) # + '°'
        field7 = 'R: ' + str(round(float(data[7]), 2)) # + '°'
        field8 = 'P: ' + str(round(float(data[8]), 2)) # + '°' 
        field9 = data[9]

        x = []
        for i in range(NUMBER_OF_ITEMS_IN_OVERLAY):
            x.append(eval('field' + str(i)).encode())
            # if i == 0:
            #     print(x[i])
                
        return x

def socket_reader(sock, mask, q, REQUIRED_CAMERAS, overlay):
    data_packet, sensor_ip = sock.recvfrom(1024)
    sensor_port = sock.getsockname()[1]
    print(f'SENSOR PORT {sensor_port} and SENSOR_IP {sensor_ip}')

    if sensor_port == OVERLAY_CONN[1]:
        x = get_overlay(data_packet)
        for i in range(len(x)):
            overlay[i] = x[i]
            print(f'Socket Reader {overlay}')

def camera_view(CAMERA_CONN, cam_name, camera, overlay_q, control_q, overlay):
    while True:
        print(f'PROCESS {camera} RUNNING FOR: {cam_name}')
        try:
            print(f'Camera View {overlay}')
            for i in range(len(overlay)):
                print(overlay[i])
        except:
            pass
        time.sleep(1)
        

def controller(REQUIRED_CAMERAS, CAMERA_CONN, OVERLAY_CONN, CONTROL_CONN):
    
    if REQUIRED_CAMERAS > len(CAMERA_CONN['name']):
        print(f'REQURIED_CAMERAS: {REQUIRED_CAMERAS} - more than connections in CAMERA_CONN ')
    else:
        # Set up a UDP connection for the overlay string and the control commands
        sock_overlay = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock_control = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock_overlay.bind(OVERLAY_CONN)
        sock_control.bind(CONTROL_CONN)
        
        # Set up the selector to watch over the socket
        # and trigger when data is ready for reading
        sel = selectors.DefaultSelector()
        sel.register(fileobj=sock_overlay, events=selectors.EVENT_READ, data=socket_reader)
        sel.register(fileobj=sock_control, events=selectors.EVENT_READ, data=socket_reader)
        
        # create shared memory
        overlay_q = Queue(maxsize=1)
        control_q = Queue(maxsize=1)     
        overlay = Array(c_char_p, range(NUMBER_OF_ITEMS_IN_OVERLAY))
        print(f'Init Overlay {overlay}')
        
        # Generate the processes; one per camera
        processes = []
        
        for camera in range(REQUIRED_CAMERAS):
            processes.append(Process(target=camera_view, args=(CAMERA_CONN, CAMERA_CONN['name'][camera], camera, overlay_q, control_q, overlay)))

        for process in processes:
            process.daemon = True
            process.start()
            
        # Spin over the selector
        while True:

            # Only have one connnection registered, so to stop
            # the loop spinning up the CPU, I have made it blocking 
            # with the timeout = 1 (sec) instead of =0.
            events = sel.select(timeout=None)

            for key, mask in events:
                # the selector callback is the data= from the register above
                callback = key.data
                # the callback gets the sock, mask and the sensor queues
                if key.fileobj == sock_overlay:
                    callback(key.fileobj, mask, overlay_q, REQUIRED_CAMERAS, overlay)
                else:
                    callback(key.fileobj, mask, control_q, REQUIRED_CAMERAS, overlay)


if __name__ == "__main__":
    
    controller(REQUIRED_CAMERAS, CAMERA_CONN, OVERLAY_CONN, CONTROL_CONN)

EDIT1:

from multiprocessing import Process, Array
from ctypes import c_char_p
import time

def worker(arr):
    count = 0
    while True:
        count += 1
        val = 'val' + str(count)
        arr[0] = val
        print(arr[:])
        time.sleep(2)

def main():
    arr = Array(c_char_p, 1)
    p = Process(target=worker, args=(arr,))
    p.daemon = True
    p.start()
    
    while True:
        print(arr[:])
        try:
            print(arr[:].decode('utf-8'))
        except :
            pass
        # try:
        #     val = arr[:]
        #     val = val.decode('utf-8')
        #     print(f'main {val}')
        # except:
        #     pass
        time.sleep(1)

if __name__ == "__main__":
    main()
    
    
'''
from multiprocessing import Process, Array
from ctypes import c_char_p
import time

def worker(arr):
    count = 0
    while True:
        count += 1
        val = 'val' + str(count)
        arr[0] = bytes(val, 'utf-8')
        print(arr[:])
        time.sleep(2)

def main():
    arr = Array(c_char_p, 1)
    p = Process(target=worker, args=(arr,))
    p.daemon = True
    p.start()
    
    while True:
        print(arr[:])
        try:
            print(arr[:].decode('utf-8'))
        except :
            pass

        time.sleep(1)

if __name__ == "__main__":
    main()

if __name__ == "__main__":
    main()
'''

EDIT2:
Благодаря @RolandSmith я упорно работал с очередями и думаю, что у меня есть шаблон того, как я могу двигаться вперед. См. код ниже. Если я не смогу заставить это работать в программе, я вернусь сюда.

from multiprocessing import Process, Queue
import time
import datetime

def worker(camera, q):
    val = ''
    while True:    
        if q.full() == True:
            val = q.get()
        else:
            val = val
        print(f'WORKER{camera} {val}')
        time.sleep(0.2)

def main():
    
    cameras = 2
    
    processes = []
    queues = []
    
    for camera in range(cameras):
        queues.append(Queue(maxsize=1))
        processes.append(Process(target=worker, args=(camera, queues[camera])))
        
    for process in processes:                     
        process.daemon = True
        process.start()

    while True:
        for q in queues:
            if not q.empty():
                try:
                    _ = q.get()
                except:
                    pass
            else:
                q.put(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        time.sleep(.5)

if __name__ == "__main__":
    main()

  • Для начала реквизиты для отправки информации о системе. Многие забывают это сделать. Теперь к моим комментариям. В представленном виде ваша программа находится на полпути между реальной программой и минимально воспроизводимым примером, что сбивает с толку. Можете ли вы немного очистить его и удалить неиспользуемые части? Теперь о проблеме с Queues. Что ты имеешь в виду под тем, что стала болеть голова и быстро сломалась? 07.08.2020
  • Глядя на get_overlay, почему бы не сделать field напрямую массивом вместо использования eval? 07.08.2020
  • @RolandSmith, спасибо за ответ. Я добавил правку. Закомментированный блок работает, но он просто выводит тест, введенный в сценарий. Я получаю None (пусто), когда пытаюсь запрограммировать строку. 07.08.2020
  • @РоландСмит. Проблема, с которой я столкнулся с очередями, заключалась в том, что они правильно передавались вместе с замками. Потребитель будет читать намного быстрее, чем производитель, и я хотел, чтобы потребитель продолжал использовать старые данные, пока не станут доступны новые данные. 07.08.2020
  • Всякий раз, когда вы get что-то из Queue, это уходит из Queue. Так что в воркере вам придется использовать Queue.empty(). Если это возвращает True, повторно используйте ранее полученные данные. 07.08.2020
  • Глядя на преобразование в c_char_p; будет продолжать поиск по этому пути. Я чувствую, что это неправильно. Подойдет ли Queue вместо Array? Или есть какие-то другие способы сделать то, что я хочу сделать? 07.08.2020
  • Давайте продолжим обсуждение в чате. 07.08.2020

Ответы:


1

На мой взгляд, использование Queue менее подвержено ошибкам, чем использование Array.

Вот ваш второй пример, преобразованный в использование Queue:

from multiprocessing import Process, Queue
import time


def worker(q):
    count = 0 
    while True:
        count += 1
        val = 'val' + str(count)
        q.put(val)
        print('worker:', val)
        time.sleep(2)


def main():
    q = Queue()

    p = Process(target=worker, args=(q, ))
    p.daemon = True
    p.start()

    while True:
        if not q.empty():
            print('main:', q.get())
        time.sleep(1)


if __name__ == "__main__":
    main()

Это дает:

> python3 test3.py
worker: val1
main: val1
worker: val2
main: val2
worker: val3
main: val3
worker: val4
main: val4
worker: val5

Вот тот же пример с использованием Pipe:

from multiprocessing import Process, Pipe
import time


def worker(p):
    count = 0 
    while True:
        count += 1
        val = 'val' + str(count)
        p.send(val)
        print('worker:', val)
        time.sleep(2)


def main():
    child, parent = Pipe()

    p = Process(target=worker, args=(child, ))
    p.daemon = True
    p.start()

    while True:
        if parent.poll():
            print('main:', parent.recv())
        time.sleep(1)


if __name__ == "__main__":
    main()

Это дает тот же результат, что и в предыдущем примере.

Кроме того, по умолчанию канал является двунаправленным. Таким образом, вы также можете отправлять данные от рабочих к родительскому.

07.08.2020
  • Хороший человек. Я начал изучать этот массив и обнаружил, что мне нужно a = c_wchar_p, поскольку я, должно быть, нашел примеры в Py2. Благодаря вашему вкладу я снова пойду по пути Queue. Причина, по которой я этого не сделал, заключалась в том, что у меня было несколько рабочих, которым я хотел использовать одни и те же данные. Это будет означать, что queue.put будет основным, а q.get будет рабочим. Если бы у меня было 4x воркера, сработало бы простое повторение q.put 4x или был бы шанс, что один воркер может замедлиться и пропустить? Думаю, мне придется создать q для каждого работника. 07.08.2020
  • Вам действительно придется создать очередь (или Pipe) для каждого работника, чтобы надежно получать данные для каждого работника. Имейте в виду, что Queue — это просто Pipe с дополнительными блокировками и семафорами. Таким образом, вы также можете использовать Pipe, см. мой второй пример. А поскольку канал по умолчанию является двунаправленным, вы также можете использовать его для отправки сообщений обратно от рабочих процессов к родительскому. 07.08.2020
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..