Что такое RabbitMQ и как организовать очередь сообщений

9 минут чтения
Средний рейтинг статьи — 4.6

RabbitMQ — это брокер сообщений, который позволяет разным сервисам обмениваться данными через очереди. Он обеспечивает надёжную доставку сообщений, балансировку нагрузки, повторную обработку задач и является одним из самых популярных решений для асинхронного взаимодействия между сервисами.

В современной микросервисной архитектуре RabbitMQ используется для разгрузки API, выполнения фоновых операций, распределения задач между воркерами и построения событийных систем.

Зачем нужен RabbitMQ

Микросервисы часто выполняют долгие или тяжёлые операции, которые не стоит держать в рамках HTTP-запроса. Например:

  • обработка изображений и видео
  • отправка e-mail
  • генерация отчётов
  • интеграции с внешними API
  • массовые уведомления

Если выполнять всё синхронно, API начнет «тормозить» или отдавать 502. Вместо этого задача помещается в очередь, а воркеры забирают её и обрабатывают асинхронно.

RabbitMQ решает такие проблемы, как:

  • декуплинг сервисов
  • автоматическая повторная доставка сообщений
  • балансировка нагрузки
  • гарантия, что сообщение не потеряется
  • возможность создания очередей под разные типы задач

Архитектура RabbitMQ: базовые понятия

Чтобы понимать, как работает RabbitMQ, нужно знать несколько ключевых сущностей.

Producer

Приложение, которое отправляет сообщение в RabbitMQ.

Queue

Очередь, в которую попадают сообщения. Работает по принципу FIFO.

Consumer

Сервис или воркер, который получает сообщения из очереди и обрабатывает их.

Exchange

Компонент, который принимает сообщение от producer и решает, в какие очереди его направить.

Binding

Правило маршрутизации между exchange и очередью.

Типы exchange в RabbitMQ

RabbitMQ поддерживает четыре основных типа exchange.

direct

Сообщения направляются в очередь по точному совпадению routing key.

topic

Передача сообщений по шаблонам, например:
user.* или order.#

fanout

Рассылка сообщения во все привязанные очереди.

headers

Маршрутизация по значениям HTTP-похожих заголовков.

Пример: простая очередь задач

Ниже — классическая схема: один сервис публикует задачу, второй обрабатывает.

Producer (Node.js пример)

const amqp = require('amqplib');
 
async function sendTask() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
 
  const queue = 'tasks';
  await channel.assertQueue(queue);
 
  const msg = JSON.stringify({ userId: 42, action: 'sendEmail' });
  channel.sendToQueue(queue, Buffer.from(msg));
 
  console.log('Task sent');
  await channel.close();
  await connection.close();
}
 
sendTask();

Consumer (обработчик)

const amqp = require('amqplib');
 
async function startWorker() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
 
  const queue = 'tasks';
  await channel.assertQueue(queue);
 
  channel.consume(queue, async (msg) => {
    const data = JSON.parse(msg.content.toString());
    console.log('Processing:', data);
 
    // эмуляция нагрузки
    await new Promise((r) => setTimeout(r, 2000));
 
    channel.ack(msg); // подтверждаем обработку
  });
}
 
startWorker();

Подтверждения сообщений (acknowledgements)

По умолчанию RabbitMQ может доставлять сообщение повторно, если воркер упал.
Для этого используется channel.ack() — после успешной обработки.

Если воркер не отправил ack, сообщение возвращается в очередь.

Также есть channel.nack(msg) и channel.reject(msg).

Durable очереди и persistent сообщения

Чтобы сообщения не потерялись при перезапуске брокера, включают два параметра.

Очередь

channel.assertQueue('tasks', { durable: true });

Сообщение

channel.sendToQueue('tasks', Buffer.from(msg), { persistent: true });

Это гарантирует, что сообщения будут сохранены на диск.

Prefetch: контроль нагрузки на воркеры

Если есть много сообщений и много воркеров, можно ограничить количество задач, которые получает каждый воркер.

channel.prefetch(1);

Это гарантирует, что воркер получает новое сообщение только после обработки предыдущего.

Автоматическое масштабирование воркеров

RabbitMQ не масштабирует воркеры сам, но даёт возможность:

  • поднять 1, 10 или 100 обработчиков для одной очереди
  • равномерно распределить сообщения
  • добавить воркеров под нагрузку и убрать, когда нагрузка упала

Это особенно удобно в Kubernetes:
каждый pod — это воркер, который подключается к очереди.

Dead Letter Queue (DLQ)

Если сообщение «битое» или обработка каждый раз падает, чтобы не создавать бесконечный цикл повторов, используют DLQ.

Настройка:

  1. основная очередь
  2. DLX (dead-letter exchange)
  3. очередь для «проблемных» сообщений

Так можно анализировать ошибочные задачи или пересылать их вручную.

Где RabbitMQ используется чаще всего

  • email-рассылки
  • пуш-уведомления
  • очереди задач бекенда
  • микросервисы с событиями
  • системы биллинга
  • обработка медиа-файлов
  • очереди для IoT-устройств

Итоги

RabbitMQ — надёжный и гибкий брокер сообщений, который позволяет микросервисам обмениваться данными безопасно и асинхронно. Он помогает разгружать API, выполнять фоновые задачи, повышать отказоустойчивость системы и строить архитектуру, где каждый сервис работает независимо.

9 минут чтения
Средний рейтинг статьи — 4.6

Настроить мониторинг за 30 секунд

Надежные оповещения о даунтаймах. Без ложных срабатываний