В реальном бизнесе есть несколько сценариев, когда необходимо отправлять сообщения с некоторой задержкой или в определенное время, например:

  1. В доме стоит умный водонагреватель, который нужно активировать через 30 минут.
  2. Неоплаченный заказ закрывается через 15 минут.
  3. Отправка SMS, почты, push-уведомлений о продаже в 14:00.

Но поскольку протокол AMQP не имеет встроенной функции отложенной очереди, если вы выполните поиск по запросу «как задерживать / планировать сообщения в RabbitMQ», вы, скорее всего, столкнетесь с двумя возможными решениями для этого.

  1. Одно из решений - использовать комбинацию функции TTL сообщения и функции мертвых букв, чтобы имитировать это.
  2. Второе решение - использовать официальный плагин для обмена отложенными сообщениями RabbitMQ.

Оба решения действительны, но второе решение относительно просто по сравнению с решениями, основанными на обмене недоставленными сообщениями и TTL сообщений.

Плагины отложенных сообщений RabbitMQ добавляют новый тип обмена в RabbitMQ, который будет хранить сообщения внутри, используя Mnesia, до тех пор, пока они не будут запланированы для доставки. Это защищает в случае отказа сервера.

Итак, давайте начнем с реализации второго решения, сначала установив плагин, но перед этим посмотрим на его предварительные условия:

  1. RabbitMQ 3.5.8 и более поздние версии.
  2. Erlang / OTP 18.0 и более поздние версии

Установка плагина

Чтобы установить плагин, перейдите на страницу плагинов сообщества, найдите rabbitmq_delayed_message_exchange и загрузите соответствующие файлы .ez (которые представляют собой zip-файлы с метаданными) для вашей установки RabbitMQ. Скопируйте плагин в папку плагинов RabbitMQ.

Расположение каталога подключаемых модулей определяется переменной среды RABBITMQ_PLUGINS_DIR. Его расположение по умолчанию зависит от того, как был установлен RabbitMQ. Вот некоторые общие ценности:

После того, как файлы были скопированы в правильный каталог, включите его, выполнив следующую команду:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Объявление обмена

Чтобы отложить сообщения, объявите обмен с типом x-delayed-message

channel.assertExchange(exchange, "x-delayed-message", {
    autoDelete: false,
    durable: true,
    passive: true,
    arguments: {
        'x-delayed-type': "direct"
    }
})

Когда мы объявили обмен выше, мы предоставили аргумент x-delayed-type со значением «direct», поэтому наш обмен будет вести себя как прямой обмен, но мы могли бы передать тему, разветвление или пользовательский тип обмена, предоставленные другим плагином.

Публикация сообщений

Чтобы отложить сообщение, опубликуйте сообщения с настраиваемым заголовком x-delay, в котором выражается время задержки сообщения в миллисекундах. Сообщение будет доставлено в соответствующие очереди через x-delay миллисекунд, до тех пор, пока оно не будет сохранено в Mnesia таблица.

Если заголовок x-delay отсутствует, плагин без задержки продолжит маршрутизацию сообщения.

channel.publish(exchange, queue, new Buffer.from(params), {
    headers: {
        "x-delay": delayInMilliSeconds
    }
})

Проверка того, было ли сообщение задержано

Чтобы проверить, было ли сообщение задержано или нет, вы можете проверить заголовок x-delay полученного сообщения, он будет равен - (минус) задержке.

Например, если вы опубликовали сообщение с задержкой в ​​10000 миллисекунд, потребитель, получивший указанное сообщение, обнаружит, что заголовок x-delay установлен на -10000.

Полный код:



Использованная литература: