RabbitMQ¶
从基础到高级到分布式,涵盖安装、核心概念、交换机模式、持久化、高可用、集群等核心内容。以 RabbitMQ 3.12.x 为基准。
目录¶
一、基础篇¶
1.1 安装与初始化¶
# Ubuntu/Debian
sudo apt install rabbitmq-server
# 启动 & 开机自启
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
# 开启 Web 管理界面(默认端口 15672)
sudo rabbitmq-plugins enable rabbitmq_management
# 创建管理员用户(默认 guest 只能本地访问)
rabbitmqctl add_user admin yourpassword
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 删除默认 guest 用户(生产建议)
rabbitmqctl delete_user guest
端口说明
端口 |
说明 |
|---|---|
5672 |
AMQP 协议(客户端连接) |
15672 |
HTTP 管理界面 |
25672 |
集群节点间通信(Erlang) |
5671 |
AMQPS(TLS 加密) |
1883 |
MQTT 插件 |
61613 |
STOMP 插件 |
1.2 核心概念¶
Producer(生产者)
↓ 发布消息
Exchange(交换机)── 根据 Routing Key + Binding → Queue(队列)
↓
Consumer(消费者)
概念 |
说明 |
|---|---|
Producer |
消息生产者,发消息到 Exchange |
Consumer |
消息消费者,从 Queue 读取消息 |
Exchange |
交换机,接收消息并按规则路由到队列 |
Queue |
队列,存储消息 |
Binding |
Exchange 与 Queue 的绑定关系,含路由规则 |
Routing Key |
消息的路由键,Exchange 根据它决定路由到哪个队列 |
Virtual Host(vhost) |
虚拟主机,逻辑隔离,类似数据库的 schema |
Connection |
TCP 连接 |
Channel |
信道,Connection 内的虚拟连接,复用 TCP |
1.3 六种消息模式¶
① 简单模式(Simple)¶
一个生产者,一个队列,一个消费者。
import pika
# 生产者
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
connection.close()
# 消费者
def callback(ch, method, properties, body):
print(f"收到: {body.decode()}")
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
② 工作队列模式(Work Queue)¶
一个队列,多个消费者,轮询分发(默认)或公平分发。
# 公平分发:消费者处理完一条再接收下一条
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
③ 发布订阅模式(Fanout)¶
消息广播到所有绑定的队列,忽略 Routing Key。
# 声明 fanout 交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 生产者:发布到交换机(不指定队列)
channel.basic_publish(exchange='logs', routing_key='', body='log message')
# 消费者:声明临时队列并绑定
result = channel.queue_declare(queue='', exclusive=True) # 随机临时队列
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
④ 路由模式(Direct)¶
按 Routing Key 精确匹配,路由到特定队列。
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 绑定:error 级别的日志到 error 队列
channel.queue_bind(exchange='direct_logs', queue='error_queue', routing_key='error')
channel.queue_bind(exchange='direct_logs', queue='all_queue', routing_key='info')
channel.queue_bind(exchange='direct_logs', queue='all_queue', routing_key='error')
# 发送 error 级别消息
channel.basic_publish(exchange='direct_logs', routing_key='error', body='error log')
⑤ 主题模式(Topic)¶
按 Routing Key 通配符匹配,最灵活。
* 匹配一个单词
# 匹配零个或多个单词
Routing Key 示例:
order.create.success → 匹配 order.# / order.*.success
user.login → 匹配 user.* / #.login
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.queue_bind(exchange='topic_logs', queue='order_queue', routing_key='order.#')
channel.queue_bind(exchange='topic_logs', queue='error_queue', routing_key='#.error')
channel.basic_publish(exchange='topic_logs', routing_key='order.create.error', body='msg')
⑥ Headers 模式¶
根据消息头(headers)属性匹配,不用 Routing Key(少用)。
channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')
channel.queue_bind(
exchange='headers_exchange',
queue='my_queue',
arguments={'x-match': 'all', 'format': 'pdf', 'type': 'report'} # all=全匹配,any=任一匹配
)
channel.basic_publish(
exchange='headers_exchange',
routing_key='',
body='report content',
properties=pika.BasicProperties(headers={'format': 'pdf', 'type': 'report'})
)
1.4 消息确认机制¶
消费者确认(Consumer Ack)¶
# 手动确认(推荐)
def callback(ch, method, properties, body):
try:
process(body)
ch.basic_ack(delivery_tag=method.delivery_tag) # 确认
except Exception:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # 拒绝并重新入队
channel.basic_consume(queue='task', on_message_callback=callback, auto_ack=False)
方法 |
说明 |
|---|---|
|
确认消息,从队列删除 |
|
拒绝消息,可选重新入队 |
|
拒绝单条消息 |
生产者确认(Publisher Confirm)¶
# 开启 Confirm 模式
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='',
routing_key='task_queue',
body='message',
properties=pika.BasicProperties(delivery_mode=2) # 持久化消息
)
print("消息已确认投递")
except pika.exceptions.UnroutableError:
print("消息无法路由")
1.5 持久化¶
# 1. 声明持久化队列
channel.queue_declare(queue='durable_queue', durable=True)
# 2. 发送持久化消息
channel.basic_publish(
exchange='',
routing_key='durable_queue',
body='persistent message',
properties=pika.BasicProperties(
delivery_mode=2 # 2=持久化,1=非持久化
)
)
三者缺一不可:队列持久化 + 消息持久化 + Publisher Confirm,才能保证消息不丢失。
二、进阶篇¶
2.1 死信队列(DLX)¶
消息变为死信的三种情况:消息被拒绝且不重新入队、消息 TTL 过期、队列达到最大长度。
# 声明死信交换机和死信队列
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')
channel.queue_declare(queue='dlx_queue')
channel.queue_bind(queue='dlx_queue', exchange='dlx_exchange', routing_key='dead')
# 声明业务队列,绑定死信交换机
channel.queue_declare(
queue='business_queue',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx_exchange', # 死信交换机
'x-dead-letter-routing-key': 'dead', # 死信路由键
'x-message-ttl': 30000, # 消息TTL(ms)
'x-max-length': 1000 # 队列最大长度
}
)
死信队列应用场景
消息处理失败后转入死信队列,人工排查
实现延迟消息(TTL + 死信队列)
防止消息无限重试
2.2 延迟队列¶
方案一:TTL + 死信队列(原生实现)
生产者 → 延迟队列(设置TTL,无消费者)→ 消息过期 → 死信交换机 → 实际处理队列 → 消费者
# 延迟10秒处理
channel.queue_declare(
queue='delay_10s',
arguments={
'x-message-ttl': 10000,
'x-dead-letter-exchange': 'work_exchange',
'x-dead-letter-routing-key': 'work'
}
)
channel.basic_publish(exchange='', routing_key='delay_10s', body='delayed task')
方案二:rabbitmq-delayed-message-exchange 插件(推荐)
# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 声明延迟交换机
channel.exchange_declare(
exchange='delayed_exchange',
exchange_type='x-delayed-message',
arguments={'x-delayed-type': 'direct'}
)
# 发送延迟消息
channel.basic_publish(
exchange='delayed_exchange',
routing_key='task',
body='delayed message',
properties=pika.BasicProperties(
headers={'x-delay': 10000} # 延迟10秒(ms)
)
)
2.3 优先级队列¶
# 声明优先级队列(最大优先级10)
channel.queue_declare(
queue='priority_queue',
arguments={'x-max-priority': 10}
)
# 发送高优先级消息
channel.basic_publish(
exchange='',
routing_key='priority_queue',
body='urgent task',
properties=pika.BasicProperties(priority=9) # 优先级 0-10
)
2.4 TTL¶
# 队列级别 TTL(所有消息统一过期时间)
channel.queue_declare(
queue='ttl_queue',
arguments={'x-message-ttl': 60000} # 60秒
)
# 消息级别 TTL(单条消息单独设置)
channel.basic_publish(
exchange='',
routing_key='my_queue',
body='expiring message',
properties=pika.BasicProperties(expiration='30000') # 30秒(字符串)
)
两者同时设置时,以较小值为准。
2.5 消息幂等性¶
消费者需自行保证幂等,常用方案:
def callback(ch, method, properties, body):
msg_id = properties.message_id # 生产者设置唯一ID
# 检查是否已处理(Redis 或 DB)
if redis.exists(f"msg:{msg_id}"):
ch.basic_ack(delivery_tag=method.delivery_tag)
return
try:
process(body)
redis.setex(f"msg:{msg_id}", 86400, 1) # 记录已处理
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# 生产者设置唯一消息ID
channel.basic_publish(
exchange='',
routing_key='task',
body='message',
properties=pika.BasicProperties(message_id=str(uuid.uuid4()))
)
2.6 事务与 Publisher Confirm¶
事务模式(性能差,一般不推荐)
try:
channel.tx_select() # 开启事务
channel.basic_publish(exchange='', routing_key='q', body='msg')
channel.tx_commit() # 提交
except Exception:
channel.tx_rollback() # 回滚
Publisher Confirm 模式(推荐,性能好)
channel.confirm_delivery()
for i in range(100):
channel.basic_publish(
exchange='',
routing_key='task',
body=f'msg {i}',
mandatory=True,
)
在 pika.BlockingConnection 中,confirm_delivery() 开启后,basic_publish() 会等待 Broker 确认;如果同时设置 mandatory=True 且消息不可路由,会抛出 pika.exceptions.UnroutableError。
2.7 Python 客户端接口实战¶
Python 同步程序通常用 pika.BlockingConnection;异步服务(FastAPI、aiohttp、后台常驻服务)更推荐 aio-pika 的 connect_robust()。
连接参数¶
import pika
credentials = pika.PlainCredentials("appuser", "strongpassword")
params = pika.ConnectionParameters(
host="localhost",
port=5672,
virtual_host="/prod",
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=30,
connection_attempts=3,
retry_delay=2,
)
connection = pika.BlockingConnection(params)
channel = connection.channel()
常用参数:
参数 |
说明 |
|---|---|
|
RabbitMQ 地址和 AMQP 端口 |
|
vhost,生产建议按业务隔离 |
|
用户名密码 |
|
心跳,避免半开连接长期挂住 |
|
Broker 内存/磁盘告警阻塞连接时的超时 |
|
初始连接重试 |
可靠发布模板¶
import json
import uuid
import pika
def publish_order_created(channel, event: dict) -> None:
body = json.dumps(event, ensure_ascii=False).encode("utf-8")
channel.exchange_declare(
exchange="order.events",
exchange_type="topic",
durable=True,
)
channel.confirm_delivery()
channel.basic_publish(
exchange="order.events",
routing_key="order.created",
body=body,
mandatory=True,
properties=pika.BasicProperties(
content_type="application/json",
content_encoding="utf-8",
delivery_mode=pika.DeliveryMode.Persistent,
message_id=str(uuid.uuid4()),
correlation_id=str(event["order_id"]),
type="order.created",
app_id="order-service",
),
)
发布端要点:
exchange_declare(..., durable=True):交换机元数据持久化。queue_declare(..., durable=True):队列元数据持久化。delivery_mode=pika.DeliveryMode.Persistent:消息持久化。confirm_delivery():确认 Broker 已接收发布。mandatory=True:不可路由时让客户端立刻感知。
可靠消费模板¶
import json
import pika
def consume_orders(channel):
channel.exchange_declare("order.events", exchange_type="topic", durable=True)
channel.queue_declare(
queue="inventory.order_created",
durable=True,
arguments={
"x-dead-letter-exchange": "order.dlx",
"x-dead-letter-routing-key": "order.created.failed",
},
)
channel.queue_bind(
queue="inventory.order_created",
exchange="order.events",
routing_key="order.created",
)
channel.basic_qos(prefetch_count=10)
def callback(ch, method, properties, body):
try:
event = json.loads(body.decode("utf-8"))
handle_order_created(event, message_id=properties.message_id)
except json.JSONDecodeError:
# 坏消息直接进死信,避免无限重试。
ch.basic_nack(method.delivery_tag, requeue=False)
except TemporaryError:
# 临时错误可以重新入队,但要避免无限循环,生产建议用重试队列。
ch.basic_nack(method.delivery_tag, requeue=True)
except Exception:
ch.basic_nack(method.delivery_tag, requeue=False)
else:
ch.basic_ack(method.delivery_tag)
channel.basic_consume(
queue="inventory.order_created",
on_message_callback=callback,
auto_ack=False,
)
channel.start_consuming()
消费端要点:
业务处理成功后再
basic_ack()。不可恢复错误用
basic_nack(..., requeue=False)进入死信。可恢复错误谨慎
requeue=True,否则可能形成热循环。使用
basic_qos(prefetch_count=N)控制单消费者未确认消息数。
重试队列模式¶
# 业务队列失败后投递到 retry exchange,重试队列 TTL 到期后再回到业务 exchange。
channel.exchange_declare("order.events", "topic", durable=True)
channel.exchange_declare("order.retry", "direct", durable=True)
channel.exchange_declare("order.dlx", "direct", durable=True)
channel.queue_declare(
"inventory.order_created",
durable=True,
arguments={
"x-dead-letter-exchange": "order.retry",
"x-dead-letter-routing-key": "inventory.order_created.retry",
},
)
channel.queue_declare(
"inventory.order_created.retry.30s",
durable=True,
arguments={
"x-message-ttl": 30000,
"x-dead-letter-exchange": "order.events",
"x-dead-letter-routing-key": "order.created",
},
)
channel.queue_bind(
"inventory.order_created.retry.30s",
"order.retry",
"inventory.order_created.retry",
)
生产实践中通常配合消息头记录重试次数,例如 x-retry-count。超过上限后投递到最终死信队列,避免无限重试。
同步 pika 封装示例¶
class RabbitPublisher:
def __init__(self, params: pika.ConnectionParameters):
self._params = params
self._connection = None
self._channel = None
def connect(self):
self._connection = pika.BlockingConnection(self._params)
self._channel = self._connection.channel()
self._channel.confirm_delivery()
def publish_json(self, exchange: str, routing_key: str, payload: dict):
if self._channel is None or self._channel.is_closed:
self.connect()
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self._channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
mandatory=True,
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=pika.DeliveryMode.Persistent,
message_id=str(uuid.uuid4()),
),
)
def close(self):
if self._connection and self._connection.is_open:
self._connection.close()
pika 的 BlockingConnection 和 Channel 不适合跨线程共享;多线程程序建议每个线程独立 Channel,或由单独发布线程接收内部队列后统一发布。
aio-pika 异步接口¶
import json
import aio_pika
async def publish_order_created(event: dict):
connection = await aio_pika.connect_robust(
"amqp://appuser:strongpassword@localhost:5672/prod"
)
async with connection:
channel = await connection.channel(publisher_confirms=True)
exchange = await channel.declare_exchange(
"order.events",
aio_pika.ExchangeType.TOPIC,
durable=True,
)
message = aio_pika.Message(
json.dumps(event, ensure_ascii=False).encode("utf-8"),
content_type="application/json",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
message_id=str(event["event_id"]),
)
await exchange.publish(message, routing_key="order.created", mandatory=True)
async def consume_order_created():
connection = await aio_pika.connect_robust(
"amqp://appuser:strongpassword@localhost:5672/prod"
)
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("inventory.order_created", durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process(requeue=False):
event = json.loads(message.body.decode("utf-8"))
await handle_order_created(event)
aio_pika.connect_robust() 会在连接断开后自动恢复连接、Channel、声明和绑定,更适合长期运行的异步服务。
三、性能优化篇¶
3.1 预取数量(QoS)¶
控制消费者每次预取的消息数量,避免消费者堆积大量未处理消息。
# 每次只预取1条(公平分发)
channel.basic_qos(prefetch_count=1)
# 高吞吐量场景适当增大
channel.basic_qos(prefetch_count=50)
# 全局设置(影响整个 Connection)
channel.basic_qos(prefetch_count=100, global_qos=True)
选择依据
场景 |
推荐值 |
|---|---|
任务处理时间长、不均匀 |
1~5 |
任务处理快、吞吐优先 |
50~200 |
批量消费 |
与批量大小相同 |
3.2 批量操作¶
# 批量确认(减少网络往返)
messages = []
def callback(ch, method, properties, body):
messages.append((method.delivery_tag, body))
if len(messages) >= 100:
process_batch(messages)
# 批量确认:确认最后一条,multiple=True 确认之前所有
ch.basic_ack(delivery_tag=messages[-1][0], multiple=True)
messages.clear()
3.3 连接与信道复用¶
# ✅ 正确:一个连接,多个信道(不同线程用不同信道)
connection = pika.BlockingConnection(params)
channel1 = connection.channel() # 生产者
channel2 = connection.channel() # 消费者
# ❌ 错误:每次操作都新建连接(开销极大)
for msg in messages:
conn = pika.BlockingConnection(params)
conn.channel().basic_publish(...)
conn.close()
连接池(使用 pika pool 或手动管理)
# 推荐使用 aio-pika(异步)或连接池库
import aio_pika
async def publish():
connection = await aio_pika.connect_robust("amqp://user:pass@localhost/")
async with connection:
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(b"Hello"),
routing_key="task_queue"
)
3.4 配置调优¶
# rabbitmq.conf
# 内存水位(超过则阻止生产者)
vm_memory_high_watermark.relative = 0.6 # 可用内存的 60%
vm_memory_high_watermark.absolute = 4GB # 或绝对值
# 磁盘水位(低于此值则阻止生产者)
disk_free_limit.relative = 2.0 # 内存的2倍
disk_free_limit.absolute = 5GB
# 文件描述符
total_memory_available_override_value = 8GB
# 心跳
heartbeat = 60
# 帧大小
frame_max = 131072
# 预取
channel_max = 2047
# 系统级配置(Linux)
ulimit -n 65536 # 文件描述符
echo "net.core.somaxconn = 65535" >> /etc/sysctl.conf
3.5 监控慢操作¶
# 查看队列积压
rabbitmqctl list_queues name messages consumers
# 查看连接
rabbitmqctl list_connections
# 查看信道
rabbitmqctl list_channels
# 查看绑定关系
rabbitmqctl list_bindings
四、高可用篇¶
4.1 普通集群¶
多节点共享元数据(交换机、队列定义),但队列内容只存在一个节点。
# 节点1(主)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
# 节点2 加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 查看集群状态
rabbitmqctl cluster_status
缺点:队列节点宕机后,该队列不可用。需配合镜像队列或 Quorum Queue。
4.2 镜像队列(Classic Mirroring)¶
队列数据同步到多个节点,节点宕机后其他节点可接替。
# 设置镜像策略(所有队列同步到所有节点)
rabbitmqctl set_policy ha-all "^" \
'{"ha-mode":"all"}' \
--priority 0 --apply-to queues
# 同步到2个节点(推荐)
rabbitmqctl set_policy ha-two "^ha\." \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' \
--priority 1 --apply-to queues
注意:镜像队列在 RabbitMQ 3.13 中已被废弃,推荐使用 Quorum Queue。
4.3 Quorum Queue(推荐)¶
基于 Raft 协议的强一致性队列,是镜像队列的替代方案(3.8+)。
# 声明 Quorum Queue
channel.queue_declare(
queue='quorum_queue',
durable=True,
arguments={'x-queue-type': 'quorum'}
)
# 设置策略
rabbitmqctl set_policy quorum-queues "^" \
'{"x-queue-type":"quorum"}' \
--apply-to queues
Quorum Queue vs 镜像队列
镜像队列 |
Quorum Queue |
|
|---|---|---|
一致性协议 |
自研 |
Raft(强一致) |
脑裂处理 |
较弱 |
强 |
性能 |
较高 |
略低 |
推荐状态 |
已废弃 |
✅ 推荐 |
4.4 备份与恢复¶
# 导出定义(交换机、队列、绑定、策略等)
rabbitmqadmin export definitions.json
# 导入定义
rabbitmqadmin import definitions.json
# 通过管理 API
curl -u admin:password http://localhost:15672/api/definitions > definitions.json
curl -u admin:password -X POST http://localhost:15672/api/definitions \
-H "Content-Type: application/json" -d @definitions.json
# 消息数据备份(需停机)
cp -r /var/lib/rabbitmq/mnesia /backup/
# 恢复
sudo systemctl stop rabbitmq-server
cp -r /backup/mnesia /var/lib/rabbitmq/
sudo systemctl start rabbitmq-server
五、分布式篇¶
5.1 Federation(联邦)¶
跨集群、跨数据中心的消息传递,允许不同 RabbitMQ 实例之间同步消息。
# 安装插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
# 配置上游
rabbitmqctl set_parameter federation-upstream my-upstream \
'{"uri":"amqp://user:pass@remote-host","prefetch-count":1000}'
# 设置联邦策略
rabbitmqctl set_policy federated-ex "^federated\." \
'{"federation-upstream-set":"all"}' \
--apply-to exchanges
5.2 Shovel(铲子)¶
将消息从一个队列转移到另一个队列(可跨实例),适合数据迁移和跨机房转发。
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
# 配置 Shovel
rabbitmqctl set_parameter shovel my-shovel \
'{"src-protocol":"amqp091",
"src-uri":"amqp://source-host",
"src-queue":"source_queue",
"dest-protocol":"amqp091",
"dest-uri":"amqp://dest-host",
"dest-queue":"dest_queue"}'
5.3 多数据中心方案¶
数据中心 A 数据中心 B
RabbitMQ Cluster A ←→ Federation/Shovel ←→ RabbitMQ Cluster B
↕ ↕
本地消费者 本地消费者
架构选择
场景 |
方案 |
|---|---|
跨机房消息同步 |
Federation |
消息迁移、数据搬运 |
Shovel |
同机房高可用 |
Quorum Queue |
超大规模消息队列 |
考虑 Kafka |
5.4 常见方案对比¶
方案 |
高可用 |
水平扩展 |
一致性 |
适用场景 |
|---|---|---|---|---|
单节点 |
❌ |
❌ |
- |
开发测试 |
普通集群 |
部分 |
✅ |
弱 |
元数据HA,内容不HA |
镜像队列 |
✅ |
有限 |
中(已废弃) |
旧版兼容 |
Quorum Queue |
✅ |
✅ |
强(Raft) |
生产推荐 |
Federation |
✅ |
✅ |
最终一致 |
跨机房 |
六、常见业务场景¶
6.1 异步解耦¶
用户下单 → 订单服务 → [MQ] → 库存服务
↘ 通知服务
↘ 积分服务
# 订单服务:发消息
channel.basic_publish(
exchange='order_exchange',
routing_key='order.created',
body=json.dumps({'order_id': 123, 'user_id': 1, 'amount': 99.9}),
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json',
message_id=str(uuid.uuid4())
)
)
6.2 流量削峰¶
突发请求(1万QPS)→ [MQ队列] → 消费者按能力消费(500QPS)
# 限制消费速度
channel.basic_qos(prefetch_count=10) # 每次最多处理10条
def callback(ch, method, properties, body):
process_order(body) # 处理业务
time.sleep(0.01) # 控制处理速率
ch.basic_ack(delivery_tag=method.delivery_tag)
6.3 延迟任务¶
# 订单30分钟未支付自动取消
channel.basic_publish(
exchange='delayed_exchange',
routing_key='order.timeout',
body=json.dumps({'order_id': 123}),
properties=pika.BasicProperties(
headers={'x-delay': 1800000} # 30分钟(ms)
)
)
6.4 可靠消息投递(至少一次)¶
生产者 → 本地事务 + 消息落库 → 发送MQ → 消费者确认
↓
定时扫描未确认消息 → 重发
# 1. 业务操作和消息入库放在同一事务
with db.transaction():
create_order(order_data)
db.insert('mq_messages', {'id': msg_id, 'status': 'pending', 'body': msg_body})
# 2. 发送消息(开启 Publisher Confirm)
channel.confirm_delivery()
channel.basic_publish(...)
# 3. 确认后更新消息状态
db.update('mq_messages', {'status': 'sent'}, where={'id': msg_id})
# 4. 定时任务补偿:扫描 pending 超过5分钟的消息重发
6.5 RPC 模式¶
# RPC 服务端
def on_request(ch, method, props, body):
result = process(body)
ch.basic_publish(
exchange='',
routing_key=props.reply_to, # 回复到临时队列
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(result)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
# RPC 客户端
corr_id = str(uuid.uuid4())
reply_queue = channel.queue_declare(queue='', exclusive=True).method.queue
channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=reply_queue, correlation_id=corr_id),
body='request data'
)
# 等待响应...
七、运维与监控¶
7.1 常用 CLI 命令¶
# 节点状态
rabbitmqctl status
rabbitmqctl cluster_status
rabbitmqctl node_health_check
# 用户管理
rabbitmqctl list_users
rabbitmqctl add_user username password
rabbitmqctl set_user_tags username administrator
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
# 队列管理
rabbitmqctl list_queues name messages consumers memory
rabbitmqctl purge_queue queue_name # 清空队列消息
rabbitmqctl delete_queue queue_name
# 策略管理
rabbitmqctl list_policies
rabbitmqctl set_policy name pattern definition
# 连接/信道
rabbitmqctl list_connections
rabbitmqctl list_channels
rabbitmqctl close_connection conn_name "reason"
# vhost
rabbitmqctl add_vhost myvhost
rabbitmqctl list_vhosts
7.2 管理 API¶
# 查看所有队列
curl -u admin:password http://localhost:15672/api/queues
# 查看指定队列
curl -u admin:password http://localhost:15672/api/queues/%2F/my_queue
# 查看节点
curl -u admin:password http://localhost:15672/api/nodes
# 发送消息(测试用)
curl -u admin:password -X POST http://localhost:15672/api/exchanges/%2F/amq.default/publish \
-H "Content-Type: application/json" \
-d '{"properties":{},"routing_key":"my_queue","payload":"test","payload_encoding":"string"}'
# 获取消息(测试用)
curl -u admin:password -X POST http://localhost:15672/api/queues/%2F/my_queue/get \
-H "Content-Type: application/json" \
-d '{"count":1,"ackmode":"ack_requeue_true","encoding":"auto"}'
7.3 监控指标¶
指标 |
说明 |
告警阈值 |
|---|---|---|
|
等待消费的消息数 |
持续增长告警 |
|
未确认消息数 |
超过预期告警 |
|
消费者数量 |
为0告警 |
|
节点内存占用 |
> 80% 告警 |
|
磁盘剩余空间 |
< 水位线告警 |
|
文件描述符使用 |
> 80% 告警 |
|
消息投递速率 |
骤降告警 |
|
消息发布速率 |
监控趋势 |
7.4 监控工具¶
工具 |
说明 |
|---|---|
RabbitMQ Management UI |
官方 Web 界面(15672端口),最直观 |
Prometheus + rabbitmq_prometheus 插件 |
指标采集,配合 Grafana |
Datadog |
云监控,有官方 RabbitMQ 集成 |
Zabbix |
企业级监控 |
# 开启 Prometheus 插件
rabbitmq-plugins enable rabbitmq_prometheus
# 访问 http://localhost:15692/metrics
7.5 安全加固¶
# 1. 删除默认 guest 用户
rabbitmqctl delete_user guest
# 2. 创建专用用户并限制权限
rabbitmqctl add_user appuser strongpassword
rabbitmqctl set_permissions -p /prod appuser "^app\." "^app\." "^app\."
# 3. 开启 TLS
# rabbitmq.conf
listeners.ssl.default = 5671
ssl_options.cacertfile = /path/to/ca.pem
ssl_options.certfile = /path/to/cert.pem
ssl_options.keyfile = /path/to/key.pem
ssl_options.verify = verify_peer
# 4. 限制管理界面访问
management.listener.port = 15672
management.listener.ip = 127.0.0.1 # 只允许本地访问
7.6 日常维护清单¶
任务 |
频率 |
方式 |
|---|---|---|
检查队列积压 |
实时 |
Management UI / 告警 |
检查消费者在线 |
实时 |
|
检查节点内存/磁盘 |
实时 |
|
检查死信队列消息 |
每天 |
Management UI |
导出定义备份 |
每天 |
|
清理无用队列 |
每周 |
Management UI |
检查未确认消息 |
每天 |
|
证书有效期检查 |
每月 |
TLS 证书到期提醒 |
参考资源¶
AMQP 协议:https://www.amqp.org/
pika(Python 客户端):https://pika.readthedocs.io/
aio-pika(异步 Python 客户端):https://aio-pika.readthedocs.io/
RabbitMQ Tutorials:https://www.rabbitmq.com/tutorials