Nano Hash - криптовалюты, майнинг, программирование

Сообщения не помещаются в очередь при выброшенном исключении

Я использую

Интеграция Spring 4.1.2.RELEASE

Весна AMQP 1.4.3.РЕЛИЗ

Когда возникает исключение, я ожидаю, что сообщение будет повторно поставлено в очередь. Я считаю, что у меня есть все условия для этого, как я понял из этого post

В посте я узнал, что есть три условия для обеспечения повторной очереди: 1) acknowledge-mode должен быть AUTO. 2) requeue-rejected должно быть ИСТИНА. 3) Выбросить любое исключение, НО AmqpRejectAndDontRequeueException.

Я считаю, что выполняю эти условия в каком-то тестовом коде, который я написал:

Конфигурация

Активатор службы - это то место, где выдается исключение, которое не наследуется от AmqpRejectAndDontRequeueException

@Autowired
public void setSpringIntegrationConfigHelper (SpringIntegrationHelper springIntegrationConfig) {
    this.springIntegrationConfigHelper = springIntegrationConfig;   
}

@Bean
public String priorityPOCQueueName() {
    return "poc.priority";
}

@Bean
public Queue priorityPOCQueue(RabbitAdmin rabbitAdmin) {
    Queue queue = new Queue(priorityPOCQueueName(), true);
    rabbitAdmin.declareQueue(queue);
    return queue;
}

@Bean
public Binding priorityPOCQueueBinding(RabbitAdmin rabbitAdmin) {
    Binding binding = new Binding(priorityPOCQueueName(),
                                  DestinationType.QUEUE,
                                  "amq.direct",
                                  priorityPOCQueue(rabbitAdmin).getName(),
                                  null);
    rabbitAdmin.declareBinding(binding);
    return binding;
}

@Bean
public AmqpTemplate priorityPOCMessageTemplate(ConnectionFactory amqpConnectionFactory,
                                                @Qualifier("priorityPOCQueueName") String queueName,
                                                @Qualifier("jsonMessageConverter") MessageConverter messageConverter) {
    RabbitTemplate template = new RabbitTemplate(amqpConnectionFactory);
    template.setChannelTransacted(false);
    template.setExchange("amq.direct");
    template.setQueue(queueName);
    template.setRoutingKey(queueName);
    template.setMessageConverter(messageConverter);
    return template;
}


@Autowired
@Qualifier("priorityPOCQueue")
public void setPriorityPOCQueue(Queue priorityPOCQueue) {
    this.priorityPOCQueue = priorityPOCQueue;
}

@Bean(name="exec.priorityPOC")
TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
    e.setCorePoolSize(1);
    e.setQueueCapacity(1);
    return e;
}

@Bean(name="poc.priorityChannel")
public MessageChannel pocPriorityChannel() {
    PriorityChannel c = new PriorityChannel(new PriorityComparator());
    c.setComponentName("poc.priorityChannel");
    c.setBeanName("poc.priorityChannel");
    return c;
}

@Bean(name="poc.inboundChannelAdapter") //make this a unique name
public AmqpInboundChannelAdapter amqpInboundChannelAdapter(@Qualifier("exec.priorityPOC") TaskExecutor taskExecutor
        , @Qualifier("poc.errorChannel") MessageChannel pocErrorChannel) {

    int concurrentConsumers = 1;
    AmqpInboundChannelAdapter a =  mimediaSpringIntegrationConfigHelper.createInboundChannelAdapter(taskExecutor
            , pocPriorityChannel(), new Queue[]{priorityPOCQueue},  concurrentConsumers);
    a.setErrorChannel(pocErrorChannel);
    return a;

}

@ServiceActivator(inputChannel="poc.priorityChannel")
public void processUserFileCollectionAudit(@Header(SimulateErrorHeaderPostProcessor.ERROR_SIMULATE_HEADER_KEY) Boolean simulateError, PriorityMessage priorityMessage) throws InterruptedException {
    if (isFirstMessageReceived == false) {
        Thread.sleep(15000); //Cause a bit of a backup so we can see prioritizing in action.
        isFirstMessageReceived = true;
    }
    logger.debug("Received message with priority: " + priorityMessage.getPriority() + ", simulateError: " + simulateError +  ", Current retry count is "
        + priorityMessage.getRetryCount());
    if (simulateError && priorityMessage.getRetryCount() < PriorityMessage.MAX_MESSAGE_RETRY_COUNT) {
        logger.debug(" Simulating an error and re-queue'ng. Current retry count is " + priorityMessage.getRetryCount());
        priorityMessage.setRetryCount(priorityMessage.getRetryCount() + 1);
        throw new NonAdequateResourceException();
    } else if (simulateError && priorityMessage.getRetryCount() >= PriorityMessage.MAX_MESSAGE_RETRY_COUNT) {
        logger.debug(" Max retry count exceeded");
    }
}

Помощник SpringIntegration

Здесь настраиваются автоматическое подтверждение и отклонение повторной очереди.

protected ConnectionFactory connectionFactory;
protected MessageChannel errorChannel;
protected MessageConverter messageConverter;

@Autowired
public void setConnectionFactory (ConnectionFactory connectionFactory) {
    this.connectionFactory = connectionFactory;
}

@Autowired
public void setErrorChannel(MessageChannel errorChannel) {
    this.errorChannel = errorChannel;
}

@Autowired
public void setMessageConverter(@Qualifier("jsonMessageConverter") MessageConverter messageConverter) {
    this.messageConverter = messageConverter;
}

public AmqpInboundChannelAdapter createInboundChannelAdapter(TaskExecutor taskExecutor
        ,  MessageChannel outputChannel, Queue[] queues, int concurrentConsumers) {
    SimpleMessageListenerContainer listenerContainer =
            new SimpleMessageListenerContainer(connectionFactory);
    //AUTO is default, but setting it anyhow.
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
    listenerContainer.setAutoStartup(true);
    listenerContainer.setConcurrentConsumers(concurrentConsumers);
    listenerContainer.setMessageConverter(messageConverter);
    listenerContainer.setQueues(queues);
    //listenerContainer.setChannelTransacted(false);
    listenerContainer.setPrefetchCount(100);
    listenerContainer.setTaskExecutor(taskExecutor);
    listenerContainer.setDefaultRequeueRejected(true);



    AmqpInboundChannelAdapter a = new AmqpInboundChannelAdapter(listenerContainer);
    a.setMessageConverter(messageConverter);
    a.setAutoStartup(true);
    //TODO This was stopping my custom error handler. Fix. a.setErrorChannel(errorChannel);
    a.setHeaderMapper(MimediaAmqpHeaderMapper.createPassAllHeaders());
    a.setOutputChannel(outputChannel);
    return a;
}

Почему мои сообщения не помещаются в очередь повторно?

16.09.2015

Ответы:


1

4) Исключение должно быть выбрано в потоке слушателя.

Это будет работать только в том случае, если исключение будет вызвано потоком контейнера прослушивателя, который получил сообщение.

Поскольку вы используете PriorityChannel в качестве выходного канала адаптера, вы немедленно передаете сообщение другому потоку, поэтому в потоке прослушивателя нет исключений, и сообщение всегда подтверждается.

16.09.2015
  • Спасибо, очевидно, я полностью пропустил это; прекрасно это объясняет. У меня есть следующий вопрос, но его, вероятно, следует опубликовать как отдельный вопрос, а не вести беседу в этом разделе комментариев. Я составлю еще один пост. 16.09.2015
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..