Nano Hash - криптовалюты, майнинг, программирование

порт udp заблокирован во всех процессах (python - pi)

У меня есть сеть контроллеров Raspberry PI в доморощенной системе безопасности. Каждый из них запускает несколько независимых процессов для разных функций, а MQTT-брокер связывает все воедино.

Каждый PI/процесс жестко запрограммирован с адресом брокера MQTT, и вместо этого я хотел бы, чтобы каждый из них запрашивал центральный сервер для определения местоположения брокера.

У меня есть каждый процесс, открывающий набор сокетов UDP для широковещательной передачи, и каждый из них привязывается к другому порту прослушивания. При запуске он отправляет запрос (включая порт прослушивания) на центральный сервер, который отправляет ответ на правильный порт прослушивания.

Это отлично работает только для одного процесса, остальные (в том же PI) все таймауты без получения ответа. Wireshark показывает, что только один из трех на самом деле отправляет запрос в сеть, и любопытно то, что ПОСЛЕДНИЙ процесс для открытия сокета — это тот, который работает. Фрагмент из журнала отладки:

mpi@RPI02:~ $ cat /var/log/vlab.debug.log
2020-02-28 17:08:55,242 garagectlr.31: startup
2020-02-28 17:08:55,243 root.2:
2020-02-28 17:08:55,244 root.2: startup
2020-02-28 17:08:55,250 root: looking for broker on port 10417...
2020-02-28 17:08:55,255 garagectlr: looking for broker on port 10416...
2020-02-28 17:08:57,117 sms-receiver.18:
2020-02-28 17:08:57,118 sms-receiver.18: startup
2020-02-28 17:08:57,121 sms-receiver: looking for broker on port 10419...
2020-02-28 17:08:57,124 sms-receiver: rx [mqtt-broker=192.168.99.99]
2020-02-28 17:08:57,225 sms-receiver: located broker at 192.168.99.99
2020-02-28 17:08:57,255 root: FATAL: unable to locate MQTT broker
2020-02-28 17:08:57,260 garagectlr: FATAL: unable to locate MQTT broker
2020-02-28 17:08:57,270  * Running on http://192.168.99.162:6060/ (Press CTRL+C to quit)

Вот соответствующий код Python для каждого PI, немного обрезанный:

    #
    # start looking for broker
    #

    # get unique listening port
    listen_port = os.getpid() + 10000
    logging.debug("%s: looking for broker on port %d..."%(self.process,listen_port))
    try:
        sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        sock.bind(('',listen_port))
    except Exception:
        logging.exception("%s: "%(self.process))

    tries = 0
    try:
        while self.broker == None and tries < 2:
            query = "query mqtt-broker:%s:%d"%(self.process,listen_port)
            sock.sendto(query,("192.168.99.255",1414))
            sock.settimeout(2)
            data,addr = sock.recvfrom(64)
            logging.debug("%s: rx [%s]"%(self.process,data))
            if data[0:5]=='mqtt-':
                self.broker = data[12:]
                time.sleep(.1)
            else:
                time.sleep(1)
                tries = tries + 1
    except Exception:
        print('timeout')
    sock.close()        
    if self.broker == None:
        logging.debug("%s: FATAL: unable to locate MQTT broker"%(self.process))
    else:    
        logging.debug("%s: located broker at %s"%(self.process,self.broker))
        self.sendBoot()            
        self.pubTest("mqtt@%s.%s startup"%(self.process,self.version))            
        m_thread = threading.Thread(target = self.mqtt_thread)
        m_thread.start()

Я думал, что за счет привязки к уникальным портам для RX у меня не будет помех между сокетами, но работает явно только один. Почему двое других даже не отправляют запрос?


Ответы:


1

Я заметил, что у меня есть цикл WHILE внутри блока TRY-CATCH, так что в первый раз, когда время ожидания получателя истекло, цикл while был прерван.

Я помещаю TRY внутрь WHILE следующим образом:

    tries = 0
    query = "query mqtt-broker:%s:%d"%(self.process,listen_port)

    while self.broker == None and tries < 4:
        try:
            sock.sendto(query,("192.168.99.255",1414))
            sock.settimeout(2)
            data,addr = sock.recvfrom(64)
            if data[0:5]=='mqtt-':
                self.broker = data[12:]
                time.sleep(.1)
            else:
                time.sleep(1)
                tries = tries + 1
        except Exception:
            pass    ## timeout

    try:
        sock.close()        
    except Exception:
        logging.exception("%s: "%(self.process))

думая, что у каждого процесса будет возможность повторить попытку, если запрос не удался. Я не уверен, что было не так в первую очередь, но теперь это работает довольно надежно:

mpi@RPI02:~ $ cat /var/log/vlab.debug.log
2020-02-29 17:07:47,981 root.2:
2020-02-29 17:07:47,982 root.2: startup
2020-02-29 17:07:47,986 garagectlr.31: startup
2020-02-29 17:07:47,992 garagectlr: looking for broker on port 10421...
2020-02-29 17:07:48,010 root: looking for broker on port 10424...
2020-02-29 17:07:49,895 sms-receiver.18:
2020-02-29 17:07:49,896 sms-receiver.18: startup
2020-02-29 17:07:49,901 sms-receiver: looking for broker on port 10423...
2020-02-29 17:07:50,004 sms-receiver: located broker at 192.168.99.99
2020-02-29 17:07:50,048 sms-receiver:  * Running on http://192.168.99.162:6060
2020-02-29 17:07:50,098 garagectlr: located broker at 192.168.99.99
2020-02-29 17:07:50,140 root: located broker at 192.168.99.99
mpi@RPI02:~ $

Мне кажется странным, что запросы складываются, а затем выполняются в обратном порядке. Но эй, это работает.

29.02.2020
Новые материалы

Кластеризация: более глубокий взгляд
Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

Как написать эффективное резюме
Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

Частный метод Python: улучшение инкапсуляции и безопасности
Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

Как я автоматизирую тестирование с помощью Jest
Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..