Nano Hash - криптовалюты, майнинг, программирование

Многопоточный клиент JMS ActiveMQ

Я использую приведенный ниже код для создания нескольких сеансов JMS, чтобы несколько потребителей могли получать сообщения. Моя проблема в том, что код работает в однопоточном режиме. Даже если сообщения присутствуют в очереди, второй поток не может ничего получить и просто продолжает опрос. Тем временем первый поток завершает обработку первого пакета, возвращается и потребляет оставшиеся сообщения. Что-то не так с использованием здесь?

static {
    try {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616");
        connection = connectionFactory.createConnection();
        connection.start();
    } catch (JMSException e) {
        LOGGER.error("Unable to initialise JMS Queue.", e);
    }

}

public JMSClientReader(boolean isQueue, String name) throws QueueException {

    init(isQueue,name);
}

@Override
public void init(boolean isQueue, String name) throws QueueException
{

    // Create a Connection
    try {
        // Create a Session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        if (isQueue) {
            destination = new ActiveMQQueue(name);// session.createQueue("queue");
        } else {
            destination = new ActiveMQTopic(name);// session.createTopic("topic");
        }
        consumer = session.createConsumer(destination);
    } catch (JMSException e) {
        LOGGER.error("Unable to initialise JMS Queue.", e);
        throw new QueueException(e);
    }
}

public String readQueue() throws QueueException {

    // connection.setExceptionListener(this);
    // Wait for a message
    String text = null;
    Message message;
    try {
        message = consumer.receive(1000);
        if(message==null)
            return "done";
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            text = textMessage.getText();
            LOGGER.info("Received: " + text);
        } else {
            throw new JMSException("Invalid message found");
        }
    } catch (JMSException e) {
        LOGGER.error("Unable to read message from Queue", e);
        throw new QueueException(e);
    }


    LOGGER.info("Message read is " + text);
    return text;

}

  • Если вы хотите, чтобы сообщения получали несколько потребителей, используйте тему вместо очереди. activemq.apache.org/how-does- a-queue-compare-to-a-topic.html 26.02.2017
  • В моем случае я просто пытаюсь увеличить количество слушателей, слушающих очередь. Не хочу использовать темы в настоящее время. 26.02.2017

Ответы:


1

ваша проблема в prefetchPolicy.

persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)

все сообщения были отправлены первому подключенному потребителю, и когда другой подключается, он не получает сообщения, поэтому, чтобы изменить это поведение, если у вас есть параллельный потребитель для очереди, вам нужно установить prefetchPolicy на более низкое значение, чем по умолчанию. например, добавьте этот jms.prefetchPolicy.queuePrefetch=1 в конфигурацию uri в activemq.xml или установите его в URL-адресе клиента, как это

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616?jms.prefetchPolicy.queuePrefetch=1");

Большие значения предварительной выборки рекомендуются для высокой производительности при больших объемах сообщений. Однако для небольших объемов сообщений, когда обработка каждого сообщения занимает много времени, для предварительной выборки следует установить значение 1. Это гарантирует, что потребитель обрабатывает только одно сообщение за раз. Однако указание нулевого предела предварительной выборки приведет к тому, что потребитель будет опрашивать сообщения по одному, вместо того, чтобы сообщение было отправлено потребителю.

Взгляните на http://activemq.apache.org/what-is-the-prefetch-limit-for.html

И

http://activemq.apache.org/destination-options.html

26.02.2017
  • Это сработало. Спасибо за отличный ответ. Это сводило меня с ума. 26.02.2017
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..