RabbitMQ

从基础到高级到分布式,涵盖安装、核心概念、交换机模式、持久化、高可用、集群等核心内容。以 RabbitMQ 3.12.x 为基准。


目录

  1. 一、基础篇

  2. 二、进阶篇

  3. 三、性能优化篇

  4. 四、高可用篇

  5. 五、分布式篇

  6. 六、常见业务场景

  7. 七、运维与监控

  8. 参考资源


一、基础篇

1.1 安装与初始化

# Ubuntu/Debian
sudo apt install rabbitmq-server

# 启动 & 开机自启
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

# 开启 Web 管理界面(默认端口 15672)
sudo rabbitmq-plugins enable rabbitmq_management

# 创建管理员用户(默认 guest 只能本地访问)
rabbitmqctl add_user admin yourpassword
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# 删除默认 guest 用户(生产建议)
rabbitmqctl delete_user guest

端口说明

端口

说明

5672

AMQP 协议(客户端连接)

15672

HTTP 管理界面

25672

集群节点间通信(Erlang)

5671

AMQPS(TLS 加密)

1883

MQTT 插件

61613

STOMP 插件


1.2 核心概念

Producer(生产者)
    ↓ 发布消息
Exchange(交换机)── 根据 Routing Key + Binding → Queue(队列)
                                                        ↓
                                               Consumer(消费者)

概念

说明

Producer

消息生产者,发消息到 Exchange

Consumer

消息消费者,从 Queue 读取消息

Exchange

交换机,接收消息并按规则路由到队列

Queue

队列,存储消息

Binding

Exchange 与 Queue 的绑定关系,含路由规则

Routing Key

消息的路由键,Exchange 根据它决定路由到哪个队列

Virtual Host(vhost)

虚拟主机,逻辑隔离,类似数据库的 schema

Connection

TCP 连接

Channel

信道,Connection 内的虚拟连接,复用 TCP


1.3 六种消息模式

① 简单模式(Simple)

一个生产者,一个队列,一个消费者。

import pika

# 生产者
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
connection.close()

# 消费者
def callback(ch, method, properties, body):
    print(f"收到: {body.decode()}")

channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
channel.start_consuming()

② 工作队列模式(Work Queue)

一个队列,多个消费者,轮询分发(默认)或公平分发

# 公平分发:消费者处理完一条再接收下一条
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

③ 发布订阅模式(Fanout)

消息广播到所有绑定的队列,忽略 Routing Key。

# 声明 fanout 交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 生产者:发布到交换机(不指定队列)
channel.basic_publish(exchange='logs', routing_key='', body='log message')

# 消费者:声明临时队列并绑定
result = channel.queue_declare(queue='', exclusive=True)  # 随机临时队列
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)

④ 路由模式(Direct)

按 Routing Key 精确匹配,路由到特定队列。

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 绑定:error 级别的日志到 error 队列
channel.queue_bind(exchange='direct_logs', queue='error_queue', routing_key='error')
channel.queue_bind(exchange='direct_logs', queue='all_queue',   routing_key='info')
channel.queue_bind(exchange='direct_logs', queue='all_queue',   routing_key='error')

# 发送 error 级别消息
channel.basic_publish(exchange='direct_logs', routing_key='error', body='error log')

⑤ 主题模式(Topic)

按 Routing Key 通配符匹配,最灵活。

*  匹配一个单词
#  匹配零个或多个单词

Routing Key 示例:
  order.create.success  →  匹配 order.#  /  order.*.success
  user.login            →  匹配 user.*   /  #.login
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.queue_bind(exchange='topic_logs', queue='order_queue', routing_key='order.#')
channel.queue_bind(exchange='topic_logs', queue='error_queue', routing_key='#.error')

channel.basic_publish(exchange='topic_logs', routing_key='order.create.error', body='msg')

⑥ Headers 模式

根据消息头(headers)属性匹配,不用 Routing Key(少用)。

channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')
channel.queue_bind(
    exchange='headers_exchange',
    queue='my_queue',
    arguments={'x-match': 'all', 'format': 'pdf', 'type': 'report'}  # all=全匹配,any=任一匹配
)
channel.basic_publish(
    exchange='headers_exchange',
    routing_key='',
    body='report content',
    properties=pika.BasicProperties(headers={'format': 'pdf', 'type': 'report'})
)

1.4 消息确认机制

消费者确认(Consumer Ack)

# 手动确认(推荐)
def callback(ch, method, properties, body):
    try:
        process(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)       # 确认
    except Exception:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)  # 拒绝并重新入队

channel.basic_consume(queue='task', on_message_callback=callback, auto_ack=False)

方法

说明

basic_ack

确认消息,从队列删除

basic_nack

拒绝消息,可选重新入队

basic_reject

拒绝单条消息

生产者确认(Publisher Confirm)

# 开启 Confirm 模式
channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body='message',
        properties=pika.BasicProperties(delivery_mode=2)  # 持久化消息
    )
    print("消息已确认投递")
except pika.exceptions.UnroutableError:
    print("消息无法路由")

1.5 持久化

# 1. 声明持久化队列
channel.queue_declare(queue='durable_queue', durable=True)

# 2. 发送持久化消息
channel.basic_publish(
    exchange='',
    routing_key='durable_queue',
    body='persistent message',
    properties=pika.BasicProperties(
        delivery_mode=2  # 2=持久化,1=非持久化
    )
)

三者缺一不可:队列持久化 + 消息持久化 + Publisher Confirm,才能保证消息不丢失。


二、进阶篇

2.1 死信队列(DLX)

消息变为死信的三种情况:消息被拒绝且不重新入队、消息 TTL 过期、队列达到最大长度。

# 声明死信交换机和死信队列
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')
channel.queue_declare(queue='dlx_queue')
channel.queue_bind(queue='dlx_queue', exchange='dlx_exchange', routing_key='dead')

# 声明业务队列,绑定死信交换机
channel.queue_declare(
    queue='business_queue',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx_exchange',   # 死信交换机
        'x-dead-letter-routing-key': 'dead',         # 死信路由键
        'x-message-ttl': 30000,                      # 消息TTL(ms)
        'x-max-length': 1000                         # 队列最大长度
    }
)

死信队列应用场景

  • 消息处理失败后转入死信队列,人工排查

  • 实现延迟消息(TTL + 死信队列)

  • 防止消息无限重试


2.2 延迟队列

方案一:TTL + 死信队列(原生实现)

生产者 → 延迟队列(设置TTL,无消费者)→ 消息过期 → 死信交换机 → 实际处理队列 → 消费者
# 延迟10秒处理
channel.queue_declare(
    queue='delay_10s',
    arguments={
        'x-message-ttl': 10000,
        'x-dead-letter-exchange': 'work_exchange',
        'x-dead-letter-routing-key': 'work'
    }
)
channel.basic_publish(exchange='', routing_key='delay_10s', body='delayed task')

方案二:rabbitmq-delayed-message-exchange 插件(推荐)

# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 声明延迟交换机
channel.exchange_declare(
    exchange='delayed_exchange',
    exchange_type='x-delayed-message',
    arguments={'x-delayed-type': 'direct'}
)

# 发送延迟消息
channel.basic_publish(
    exchange='delayed_exchange',
    routing_key='task',
    body='delayed message',
    properties=pika.BasicProperties(
        headers={'x-delay': 10000}  # 延迟10秒(ms)
    )
)

2.3 优先级队列

# 声明优先级队列(最大优先级10)
channel.queue_declare(
    queue='priority_queue',
    arguments={'x-max-priority': 10}
)

# 发送高优先级消息
channel.basic_publish(
    exchange='',
    routing_key='priority_queue',
    body='urgent task',
    properties=pika.BasicProperties(priority=9)   # 优先级 0-10
)

2.4 TTL

# 队列级别 TTL(所有消息统一过期时间)
channel.queue_declare(
    queue='ttl_queue',
    arguments={'x-message-ttl': 60000}   # 60秒
)

# 消息级别 TTL(单条消息单独设置)
channel.basic_publish(
    exchange='',
    routing_key='my_queue',
    body='expiring message',
    properties=pika.BasicProperties(expiration='30000')  # 30秒(字符串)
)

两者同时设置时,以较小值为准。


2.5 消息幂等性

消费者需自行保证幂等,常用方案:

def callback(ch, method, properties, body):
    msg_id = properties.message_id     # 生产者设置唯一ID

    # 检查是否已处理(Redis 或 DB)
    if redis.exists(f"msg:{msg_id}"):
        ch.basic_ack(delivery_tag=method.delivery_tag)
        return

    try:
        process(body)
        redis.setex(f"msg:{msg_id}", 86400, 1)  # 记录已处理
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

# 生产者设置唯一消息ID
channel.basic_publish(
    exchange='',
    routing_key='task',
    body='message',
    properties=pika.BasicProperties(message_id=str(uuid.uuid4()))
)

2.6 事务与 Publisher Confirm

事务模式(性能差,一般不推荐)

try:
    channel.tx_select()                              # 开启事务
    channel.basic_publish(exchange='', routing_key='q', body='msg')
    channel.tx_commit()                              # 提交
except Exception:
    channel.tx_rollback()                            # 回滚

Publisher Confirm 模式(推荐,性能好)

channel.confirm_delivery()

for i in range(100):
    channel.basic_publish(
        exchange='',
        routing_key='task',
        body=f'msg {i}',
        mandatory=True,
    )

pika.BlockingConnection 中,confirm_delivery() 开启后,basic_publish() 会等待 Broker 确认;如果同时设置 mandatory=True 且消息不可路由,会抛出 pika.exceptions.UnroutableError


2.7 Python 客户端接口实战

Python 同步程序通常用 pika.BlockingConnection;异步服务(FastAPI、aiohttp、后台常驻服务)更推荐 aio-pikaconnect_robust()

连接参数

import pika

credentials = pika.PlainCredentials("appuser", "strongpassword")

params = pika.ConnectionParameters(
    host="localhost",
    port=5672,
    virtual_host="/prod",
    credentials=credentials,
    heartbeat=60,
    blocked_connection_timeout=30,
    connection_attempts=3,
    retry_delay=2,
)

connection = pika.BlockingConnection(params)
channel = connection.channel()

常用参数:

参数

说明

host / port

RabbitMQ 地址和 AMQP 端口

virtual_host

vhost,生产建议按业务隔离

credentials

用户名密码

heartbeat

心跳,避免半开连接长期挂住

blocked_connection_timeout

Broker 内存/磁盘告警阻塞连接时的超时

connection_attempts / retry_delay

初始连接重试

可靠发布模板

import json
import uuid
import pika

def publish_order_created(channel, event: dict) -> None:
    body = json.dumps(event, ensure_ascii=False).encode("utf-8")

    channel.exchange_declare(
        exchange="order.events",
        exchange_type="topic",
        durable=True,
    )

    channel.confirm_delivery()

    channel.basic_publish(
        exchange="order.events",
        routing_key="order.created",
        body=body,
        mandatory=True,
        properties=pika.BasicProperties(
            content_type="application/json",
            content_encoding="utf-8",
            delivery_mode=pika.DeliveryMode.Persistent,
            message_id=str(uuid.uuid4()),
            correlation_id=str(event["order_id"]),
            type="order.created",
            app_id="order-service",
        ),
    )

发布端要点:

  • exchange_declare(..., durable=True):交换机元数据持久化。

  • queue_declare(..., durable=True):队列元数据持久化。

  • delivery_mode=pika.DeliveryMode.Persistent:消息持久化。

  • confirm_delivery():确认 Broker 已接收发布。

  • mandatory=True:不可路由时让客户端立刻感知。

可靠消费模板

import json
import pika

def consume_orders(channel):
    channel.exchange_declare("order.events", exchange_type="topic", durable=True)
    channel.queue_declare(
        queue="inventory.order_created",
        durable=True,
        arguments={
            "x-dead-letter-exchange": "order.dlx",
            "x-dead-letter-routing-key": "order.created.failed",
        },
    )
    channel.queue_bind(
        queue="inventory.order_created",
        exchange="order.events",
        routing_key="order.created",
    )
    channel.basic_qos(prefetch_count=10)

    def callback(ch, method, properties, body):
        try:
            event = json.loads(body.decode("utf-8"))
            handle_order_created(event, message_id=properties.message_id)
        except json.JSONDecodeError:
            # 坏消息直接进死信,避免无限重试。
            ch.basic_nack(method.delivery_tag, requeue=False)
        except TemporaryError:
            # 临时错误可以重新入队,但要避免无限循环,生产建议用重试队列。
            ch.basic_nack(method.delivery_tag, requeue=True)
        except Exception:
            ch.basic_nack(method.delivery_tag, requeue=False)
        else:
            ch.basic_ack(method.delivery_tag)

    channel.basic_consume(
        queue="inventory.order_created",
        on_message_callback=callback,
        auto_ack=False,
    )
    channel.start_consuming()

消费端要点:

  • 业务处理成功后再 basic_ack()

  • 不可恢复错误用 basic_nack(..., requeue=False) 进入死信。

  • 可恢复错误谨慎 requeue=True,否则可能形成热循环。

  • 使用 basic_qos(prefetch_count=N) 控制单消费者未确认消息数。

重试队列模式

# 业务队列失败后投递到 retry exchange,重试队列 TTL 到期后再回到业务 exchange。
channel.exchange_declare("order.events", "topic", durable=True)
channel.exchange_declare("order.retry", "direct", durable=True)
channel.exchange_declare("order.dlx", "direct", durable=True)

channel.queue_declare(
    "inventory.order_created",
    durable=True,
    arguments={
        "x-dead-letter-exchange": "order.retry",
        "x-dead-letter-routing-key": "inventory.order_created.retry",
    },
)

channel.queue_declare(
    "inventory.order_created.retry.30s",
    durable=True,
    arguments={
        "x-message-ttl": 30000,
        "x-dead-letter-exchange": "order.events",
        "x-dead-letter-routing-key": "order.created",
    },
)

channel.queue_bind(
    "inventory.order_created.retry.30s",
    "order.retry",
    "inventory.order_created.retry",
)

生产实践中通常配合消息头记录重试次数,例如 x-retry-count。超过上限后投递到最终死信队列,避免无限重试。

同步 pika 封装示例

class RabbitPublisher:
    def __init__(self, params: pika.ConnectionParameters):
        self._params = params
        self._connection = None
        self._channel = None

    def connect(self):
        self._connection = pika.BlockingConnection(self._params)
        self._channel = self._connection.channel()
        self._channel.confirm_delivery()

    def publish_json(self, exchange: str, routing_key: str, payload: dict):
        if self._channel is None or self._channel.is_closed:
            self.connect()

        body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
        self._channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=body,
            mandatory=True,
            properties=pika.BasicProperties(
                content_type="application/json",
                delivery_mode=pika.DeliveryMode.Persistent,
                message_id=str(uuid.uuid4()),
            ),
        )

    def close(self):
        if self._connection and self._connection.is_open:
            self._connection.close()

pikaBlockingConnectionChannel 不适合跨线程共享;多线程程序建议每个线程独立 Channel,或由单独发布线程接收内部队列后统一发布。

aio-pika 异步接口

import json
import aio_pika

async def publish_order_created(event: dict):
    connection = await aio_pika.connect_robust(
        "amqp://appuser:strongpassword@localhost:5672/prod"
    )

    async with connection:
        channel = await connection.channel(publisher_confirms=True)
        exchange = await channel.declare_exchange(
            "order.events",
            aio_pika.ExchangeType.TOPIC,
            durable=True,
        )

        message = aio_pika.Message(
            json.dumps(event, ensure_ascii=False).encode("utf-8"),
            content_type="application/json",
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
            message_id=str(event["event_id"]),
        )

        await exchange.publish(message, routing_key="order.created", mandatory=True)
async def consume_order_created():
    connection = await aio_pika.connect_robust(
        "amqp://appuser:strongpassword@localhost:5672/prod"
    )

    channel = await connection.channel()
    await channel.set_qos(prefetch_count=10)

    queue = await channel.declare_queue("inventory.order_created", durable=True)

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process(requeue=False):
                event = json.loads(message.body.decode("utf-8"))
                await handle_order_created(event)

aio_pika.connect_robust() 会在连接断开后自动恢复连接、Channel、声明和绑定,更适合长期运行的异步服务。


三、性能优化篇

3.1 预取数量(QoS)

控制消费者每次预取的消息数量,避免消费者堆积大量未处理消息。

# 每次只预取1条(公平分发)
channel.basic_qos(prefetch_count=1)

# 高吞吐量场景适当增大
channel.basic_qos(prefetch_count=50)

# 全局设置(影响整个 Connection)
channel.basic_qos(prefetch_count=100, global_qos=True)

选择依据

场景

推荐值

任务处理时间长、不均匀

1~5

任务处理快、吞吐优先

50~200

批量消费

与批量大小相同


3.2 批量操作

# 批量确认(减少网络往返)
messages = []
def callback(ch, method, properties, body):
    messages.append((method.delivery_tag, body))
    if len(messages) >= 100:
        process_batch(messages)
        # 批量确认:确认最后一条,multiple=True 确认之前所有
        ch.basic_ack(delivery_tag=messages[-1][0], multiple=True)
        messages.clear()

3.3 连接与信道复用

# ✅ 正确:一个连接,多个信道(不同线程用不同信道)
connection = pika.BlockingConnection(params)
channel1 = connection.channel()   # 生产者
channel2 = connection.channel()   # 消费者

# ❌ 错误:每次操作都新建连接(开销极大)
for msg in messages:
    conn = pika.BlockingConnection(params)
    conn.channel().basic_publish(...)
    conn.close()

连接池(使用 pika pool 或手动管理)

# 推荐使用 aio-pika(异步)或连接池库
import aio_pika

async def publish():
    connection = await aio_pika.connect_robust("amqp://user:pass@localhost/")
    async with connection:
        channel = await connection.channel()
        await channel.default_exchange.publish(
            aio_pika.Message(b"Hello"),
            routing_key="task_queue"
        )

3.4 配置调优

# rabbitmq.conf

# 内存水位(超过则阻止生产者)
vm_memory_high_watermark.relative = 0.6     # 可用内存的 60%
vm_memory_high_watermark.absolute = 4GB     # 或绝对值

# 磁盘水位(低于此值则阻止生产者)
disk_free_limit.relative = 2.0             # 内存的2倍
disk_free_limit.absolute = 5GB

# 文件描述符
total_memory_available_override_value = 8GB

# 心跳
heartbeat = 60

# 帧大小
frame_max = 131072

# 预取
channel_max = 2047
# 系统级配置(Linux)
ulimit -n 65536                            # 文件描述符
echo "net.core.somaxconn = 65535" >> /etc/sysctl.conf

3.5 监控慢操作

# 查看队列积压
rabbitmqctl list_queues name messages consumers

# 查看连接
rabbitmqctl list_connections

# 查看信道
rabbitmqctl list_channels

# 查看绑定关系
rabbitmqctl list_bindings

四、高可用篇

4.1 普通集群

多节点共享元数据(交换机、队列定义),但队列内容只存在一个节点

# 节点1(主)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

# 节点2 加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# 查看集群状态
rabbitmqctl cluster_status

缺点:队列节点宕机后,该队列不可用。需配合镜像队列或 Quorum Queue。


4.2 镜像队列(Classic Mirroring)

队列数据同步到多个节点,节点宕机后其他节点可接替。

# 设置镜像策略(所有队列同步到所有节点)
rabbitmqctl set_policy ha-all "^" \
    '{"ha-mode":"all"}' \
    --priority 0 --apply-to queues

# 同步到2个节点(推荐)
rabbitmqctl set_policy ha-two "^ha\." \
    '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' \
    --priority 1 --apply-to queues

注意:镜像队列在 RabbitMQ 3.13 中已被废弃,推荐使用 Quorum Queue


4.3 Quorum Queue(推荐)

基于 Raft 协议的强一致性队列,是镜像队列的替代方案(3.8+)。

# 声明 Quorum Queue
channel.queue_declare(
    queue='quorum_queue',
    durable=True,
    arguments={'x-queue-type': 'quorum'}
)
# 设置策略
rabbitmqctl set_policy quorum-queues "^" \
    '{"x-queue-type":"quorum"}' \
    --apply-to queues

Quorum Queue vs 镜像队列

镜像队列

Quorum Queue

一致性协议

自研

Raft(强一致)

脑裂处理

较弱

性能

较高

略低

推荐状态

已废弃

✅ 推荐


4.4 备份与恢复

# 导出定义(交换机、队列、绑定、策略等)
rabbitmqadmin export definitions.json

# 导入定义
rabbitmqadmin import definitions.json

# 通过管理 API
curl -u admin:password http://localhost:15672/api/definitions > definitions.json
curl -u admin:password -X POST http://localhost:15672/api/definitions \
     -H "Content-Type: application/json" -d @definitions.json

# 消息数据备份(需停机)
cp -r /var/lib/rabbitmq/mnesia /backup/

# 恢复
sudo systemctl stop rabbitmq-server
cp -r /backup/mnesia /var/lib/rabbitmq/
sudo systemctl start rabbitmq-server

五、分布式篇

5.1 Federation(联邦)

跨集群、跨数据中心的消息传递,允许不同 RabbitMQ 实例之间同步消息。

# 安装插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

# 配置上游
rabbitmqctl set_parameter federation-upstream my-upstream \
    '{"uri":"amqp://user:pass@remote-host","prefetch-count":1000}'

# 设置联邦策略
rabbitmqctl set_policy federated-ex "^federated\." \
    '{"federation-upstream-set":"all"}' \
    --apply-to exchanges

5.2 Shovel(铲子)

将消息从一个队列转移到另一个队列(可跨实例),适合数据迁移和跨机房转发。

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

# 配置 Shovel
rabbitmqctl set_parameter shovel my-shovel \
    '{"src-protocol":"amqp091",
      "src-uri":"amqp://source-host",
      "src-queue":"source_queue",
      "dest-protocol":"amqp091",
      "dest-uri":"amqp://dest-host",
      "dest-queue":"dest_queue"}'

5.3 多数据中心方案

数据中心 A                    数据中心 B
RabbitMQ Cluster A  ←→  Federation/Shovel  ←→  RabbitMQ Cluster B
      ↕                                               ↕
  本地消费者                                       本地消费者

架构选择

场景

方案

跨机房消息同步

Federation

消息迁移、数据搬运

Shovel

同机房高可用

Quorum Queue

超大规模消息队列

考虑 Kafka


5.4 常见方案对比

方案

高可用

水平扩展

一致性

适用场景

单节点

-

开发测试

普通集群

部分

元数据HA,内容不HA

镜像队列

有限

中(已废弃)

旧版兼容

Quorum Queue

强(Raft)

生产推荐

Federation

最终一致

跨机房


六、常见业务场景

6.1 异步解耦

用户下单 → 订单服务 → [MQ] → 库存服务
                            ↘ 通知服务
                            ↘ 积分服务
# 订单服务:发消息
channel.basic_publish(
    exchange='order_exchange',
    routing_key='order.created',
    body=json.dumps({'order_id': 123, 'user_id': 1, 'amount': 99.9}),
    properties=pika.BasicProperties(
        delivery_mode=2,
        content_type='application/json',
        message_id=str(uuid.uuid4())
    )
)

6.2 流量削峰

突发请求(1万QPS)→ [MQ队列] → 消费者按能力消费(500QPS)
# 限制消费速度
channel.basic_qos(prefetch_count=10)   # 每次最多处理10条

def callback(ch, method, properties, body):
    process_order(body)                 # 处理业务
    time.sleep(0.01)                    # 控制处理速率
    ch.basic_ack(delivery_tag=method.delivery_tag)

6.3 延迟任务

# 订单30分钟未支付自动取消
channel.basic_publish(
    exchange='delayed_exchange',
    routing_key='order.timeout',
    body=json.dumps({'order_id': 123}),
    properties=pika.BasicProperties(
        headers={'x-delay': 1800000}    # 30分钟(ms)
    )
)

6.4 可靠消息投递(至少一次)

生产者 → 本地事务 + 消息落库 → 发送MQ → 消费者确认
                ↓
         定时扫描未确认消息 → 重发
# 1. 业务操作和消息入库放在同一事务
with db.transaction():
    create_order(order_data)
    db.insert('mq_messages', {'id': msg_id, 'status': 'pending', 'body': msg_body})

# 2. 发送消息(开启 Publisher Confirm)
channel.confirm_delivery()
channel.basic_publish(...)

# 3. 确认后更新消息状态
db.update('mq_messages', {'status': 'sent'}, where={'id': msg_id})

# 4. 定时任务补偿:扫描 pending 超过5分钟的消息重发

6.5 RPC 模式

# RPC 服务端
def on_request(ch, method, props, body):
    result = process(body)
    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,           # 回复到临时队列
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=str(result)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)

# RPC 客户端
corr_id = str(uuid.uuid4())
reply_queue = channel.queue_declare(queue='', exclusive=True).method.queue
channel.basic_publish(
    exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(reply_to=reply_queue, correlation_id=corr_id),
    body='request data'
)
# 等待响应...

七、运维与监控

7.1 常用 CLI 命令

# 节点状态
rabbitmqctl status
rabbitmqctl cluster_status
rabbitmqctl node_health_check

# 用户管理
rabbitmqctl list_users
rabbitmqctl add_user username password
rabbitmqctl set_user_tags username administrator
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"

# 队列管理
rabbitmqctl list_queues name messages consumers memory
rabbitmqctl purge_queue queue_name           # 清空队列消息
rabbitmqctl delete_queue queue_name

# 策略管理
rabbitmqctl list_policies
rabbitmqctl set_policy name pattern definition

# 连接/信道
rabbitmqctl list_connections
rabbitmqctl list_channels
rabbitmqctl close_connection conn_name "reason"

# vhost
rabbitmqctl add_vhost myvhost
rabbitmqctl list_vhosts

7.2 管理 API

# 查看所有队列
curl -u admin:password http://localhost:15672/api/queues

# 查看指定队列
curl -u admin:password http://localhost:15672/api/queues/%2F/my_queue

# 查看节点
curl -u admin:password http://localhost:15672/api/nodes

# 发送消息(测试用)
curl -u admin:password -X POST http://localhost:15672/api/exchanges/%2F/amq.default/publish \
     -H "Content-Type: application/json" \
     -d '{"properties":{},"routing_key":"my_queue","payload":"test","payload_encoding":"string"}'

# 获取消息(测试用)
curl -u admin:password -X POST http://localhost:15672/api/queues/%2F/my_queue/get \
     -H "Content-Type: application/json" \
     -d '{"count":1,"ackmode":"ack_requeue_true","encoding":"auto"}'

7.3 监控指标

指标

说明

告警阈值

messages_ready

等待消费的消息数

持续增长告警

messages_unacknowledged

未确认消息数

超过预期告警

consumers

消费者数量

为0告警

memory

节点内存占用

> 80% 告警

disk_free

磁盘剩余空间

< 水位线告警

fd_used

文件描述符使用

> 80% 告警

deliver_rate

消息投递速率

骤降告警

publish_rate

消息发布速率

监控趋势


7.4 监控工具

工具

说明

RabbitMQ Management UI

官方 Web 界面(15672端口),最直观

Prometheus + rabbitmq_prometheus 插件

指标采集,配合 Grafana

Datadog

云监控,有官方 RabbitMQ 集成

Zabbix

企业级监控

# 开启 Prometheus 插件
rabbitmq-plugins enable rabbitmq_prometheus
# 访问 http://localhost:15692/metrics

7.5 安全加固

# 1. 删除默认 guest 用户
rabbitmqctl delete_user guest

# 2. 创建专用用户并限制权限
rabbitmqctl add_user appuser strongpassword
rabbitmqctl set_permissions -p /prod appuser "^app\." "^app\." "^app\."

# 3. 开启 TLS
# rabbitmq.conf
listeners.ssl.default = 5671
ssl_options.cacertfile = /path/to/ca.pem
ssl_options.certfile   = /path/to/cert.pem
ssl_options.keyfile    = /path/to/key.pem
ssl_options.verify     = verify_peer

# 4. 限制管理界面访问
management.listener.port = 15672
management.listener.ip   = 127.0.0.1   # 只允许本地访问

7.6 日常维护清单

任务

频率

方式

检查队列积压

实时

Management UI / 告警

检查消费者在线

实时

list_queues consumers

检查节点内存/磁盘

实时

rabbitmqctl status

检查死信队列消息

每天

Management UI

导出定义备份

每天

rabbitmqadmin export

清理无用队列

每周

Management UI

检查未确认消息

每天

messages_unacknowledged

证书有效期检查

每月

TLS 证书到期提醒


参考资源