Nano Hash - криптовалюты, майнинг, программирование

Azure ServiceBus с использованием async/await в Python, похоже, не работает

Я пытаюсь читать сообщения из тем Azure ServiceBus с помощью async/await, а затем пересылать содержимое другому приложению через HTTP. Мой код прост:

import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio.async_client import ServiceBusService

bus_service = ServiceBusService(service_namespace=..., shared_access_key_name=..., shared_access_key_value=...)

async def watch(topic_name, subscription_name):
    print('{} started'.format(topic_name))

    message = bus_service.receive_subscription_message(topic_name, subscription_name, peek_lock=False, timeout=1)

    if message.body is not None:
        async with ClientSession() as session:
            await session.post('ip:port/endpoint',
                               headers={'Content-type': 'application/x-www-form-urlencoded'},
                               data={'data': message.body.decode()})


async def do():
    while True:
        for topic in ['topic1', 'topic2', 'topic3']:
            await watch(topic, 'watcher')


if __name__ == "__main__":
    asyncio.run(do())

Я хочу искать сообщения (навсегда) из разных тем и, когда приходит сообщение, отправлять POST. Я импортирую пакет aio из azure, который должен работать асинхронно. После многих попыток единственное решение, которое я получил, это while True и установка timeout=1. Это не то, что я хотел, я делаю это последовательно.

azure-servicebus версия 0.50.3.

Это мой первый раз с async/await, возможно, я что-то упускаю...
Есть какие-нибудь решения/предложения?


  • docs Я не знаком с библиотекой, но вы вызываете метод для получения сообщения. Это блокирующий вызов. Вы должны подписаться, а затем обрабатывать полученные события. Это где-то здесь 29.07.2020
  • Используйте azure-servicebus 7.0.0 для использования asyncio pypi.org/project/azure-servicebus 18.12.2020

Ответы:


1

Вот как это можно сделать с помощью последней основной версии v7 сервисной шины. Ознакомьтесь с примерами асинхронного режима для отправки и получения сообщений о подписке https://github.com/Azure/azure-sdk-for-python/blob/04290863fa8963ec525a0b2f4079595287e15d93/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py

import os
import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio import ServiceBusClient
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']

async def watch(topic_name, subscription_name):
    async with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client:
        subscription_receiver = servicebus_client.get_subscription_receiver(
            topic_name=topic_name,
            subscription_name=subscription_name,
        )
    async with subscription_receiver:
         message = await subscription_receiver.receive_messages(max_wait_time=1)

    if message.body is not None:
        async with ClientSession() as session:
            await session.post('ip:port/endpoint',
                               headers={'Content-type': 'application/x-www-form-urlencoded'},
                               data={'data': message.body.decode()})

async def do():
    while True:
        for topic in ['topic1', 'topic2', 'topic3']:
            await watch(topic, 'watcher')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do())
10.01.2021

2

Вам нужно будет использовать пакет: azure.servicebus.aio

У них есть следующие модули для асинхронности: введите здесь описание изображения

Нам придется использовать класс обработчика Receive — он может быть создан с помощью метода get_receiver(). С помощью этого объекта вы сможете асинхронно перебирать сообщение. Запустите пример скрипта, который поможет вам оптимизировать его:

from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()

        
Receiving = True

#Topic 1 receiver : 
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver =  SubsClient.get_receiver()

#Topic 2 receiver : 
conn_str2= "<>"
name2="Allmessages2"
SubsClient2 = SubscriptionClient.from_connection_string(conn_str2, name2)
receiver2 =  SubsClient2.get_receiver()
    
#obj= SubscriptionClient("svijayservicebus","mytopic1", shared_access_key_name="RootManageSharedAccessKey", shared_access_key_value="ySr+maBCmIRDK4I1aGgkoBl5sNNxJt4HTwINo0FQ/tc=")
async def receive_message_from1():
    await receiver.open()
    print("Opening the Receiver for Topic1")
    async with receiver:
      while(Receiving):
        msgs =  await receiver.fetch_next()
        for m in msgs:
            print("Received the message from topic 1.....")
            print(str(m))
            await m.complete()
       
async def receive_message_from2():
    await receiver2.open()
    print("Opening the Receiver for Topic2")
    async with receiver2:
      while(Receiving):
        msgs =  await receiver2.fetch_next()
        for m in msgs:
            print("Received the message from topic 2.....")
            print(str(m))
            await m.complete()
               



loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())
topic2receiver = loop.create_task(receive_message_from2())

Я создал две задачи для облегчения параллелизма. Вы можете обратиться к этому сообщение, чтобы получить больше ясности по ним.

Вывод: введите здесь описание изображения

01.08.2020
  • Я уже пробовал что-то подобное, используя azure-servicebus == 7.0.0b4, но я получил ConnectionClose('ErrorCodes.UnknownError: Connection in an unexpected error state.'), что является той же ошибкой, которую я получаю, используя ваше решение. Я также скопировал и вставил один пример из официального Github Azure и получил ту же ошибку. 03.08.2020
  • Вы получаете указанную выше ошибку через какое-то время или в начале выполнения? 03.08.2020
  • Начало. Строка подключения связана с темой, верно? 03.08.2020
  • Да все верно. Мне понадобится еще несколько деталей, таких как журналы отладки. Я также могу просмотреть его в автономном режиме для более подробного изучения и предоставить быструю и специализированную помощь. Отправьте электронное письмо с темой «Attn:Sathya» в AzCommunity[at]Microsoft[dot]com со ссылкой на этот поток вместе с образцом кода, который вы повторное использование и отладка файла журнала. Ссылка: docs.microsoft.com/en-us/ лазурь/разработчик/питон/ 03.08.2020
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..