Kafka¶
Kafka 是高吞吐、可持久化、可水平扩展的分布式事件流平台,适合日志采集、消息队列、流式处理、数据同步和事件驱动架构。以 Kafka 3.x 为参考。
目录¶
一、基础篇¶
1.1 安装与启动¶
Kafka 3.x 推荐使用 KRaft 模式,不再强依赖 ZooKeeper。
Docker Compose 示例:
services:
kafka:
image: apache/kafka:3.7.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
启动:
docker compose up -d
常用端口:
端口 |
说明 |
|---|---|
9092 |
Broker 客户端访问 |
9093 |
KRaft Controller 通信 |
2181 |
ZooKeeper 模式使用 |
1.2 核心概念¶
Producer -> Topic -> Partition -> Broker
|
v
Consumer Group
概念 |
说明 |
|---|---|
Broker |
Kafka 服务节点 |
Topic |
逻辑消息主题 |
Partition |
Topic 的物理分片 |
Offset |
消息在分区内的递增位置 |
Producer |
生产者,发送消息 |
Consumer |
消费者,读取消息 |
Consumer Group |
消费者组,同组内分摊分区 |
Replica |
分区副本 |
Leader |
分区主副本,负责读写 |
Follower |
从副本,复制 Leader 数据 |
ISR |
与 Leader 保持同步的副本集合 |
1.3 Topic、Partition、Offset¶
Topic 是逻辑分类,Partition 是并行和扩展的基础。
topic: orders
partition-0: offset 0,1,2,3...
partition-1: offset 0,1,2,3...
partition-2: offset 0,1,2,3...
特点:
同一分区内消息有序。
不同分区之间不保证全局顺序。
Offset 只在分区内有意义。
分区数量决定最大并行消费能力。
1.4 Kafka 与传统消息队列¶
对比 |
Kafka |
RabbitMQ / Redis Queue |
|---|---|---|
模型 |
日志流 |
队列 |
消息保存 |
按时间/大小保留 |
通常消费后删除 |
消费模式 |
多消费者组可重复消费 |
多为竞争消费 |
吞吐 |
极高 |
中高 |
顺序性 |
分区内有序 |
队列内有序 |
场景 |
日志、流处理、数据管道 |
任务队列、业务解耦 |
二、命令行篇¶
2.1 Topic 管理¶
# 创建 Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic orders \
--partitions 3 \
--replication-factor 1
# 查看 Topic
kafka-topics.sh --bootstrap-server localhost:9092 --list
# 查看详情
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders
# 删除 Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--delete --topic orders
2.2 生产与消费¶
# 生产消息
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic orders
# 消费消息
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning
# 指定消费者组
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--group order-service
2.3 消费者组¶
# 查看消费者组
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
# 查看消费进度
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group order-service
重点字段:
字段 |
说明 |
|---|---|
|
当前已提交 offset |
|
分区最新 offset |
|
积压消息数 |
|
消费者实例 |
三、生产者篇¶
3.1 发送流程¶
send()
-> Serializer
-> Partitioner
-> RecordAccumulator
-> Sender thread
-> Broker
-> ACK
生产者先把消息写入本地缓冲区,后台 Sender 线程批量发送到 Broker。
3.2 ACK 机制¶
参数 |
说明 |
可靠性 |
性能 |
|---|---|---|---|
|
不等待 Broker 确认 |
最低 |
最高 |
|
Leader 写入即确认 |
中 |
高 |
|
ISR 副本确认后返回 |
最高 |
较低 |
生产推荐:
acks=all
enable.idempotence=true
retries=2147483647
max.in.flight.requests.per.connection=5
3.3 分区策略¶
有 key:
partition = hash(key) % partition_count
无 key:
默认使用粘性分区策略,提高批量发送效率。
一批消息尽量发往同一分区,批次满后切换。
建议:
需要同一业务实体有序时,使用稳定 key,如
order_id。key 低基数会导致分区热点。
分区数增加后,key 到分区的映射可能变化。
3.4 幂等生产者¶
enable.idempotence=true
acks=all
retries=2147483647
幂等生产者可避免生产者重试导致的单分区重复写入。
注意:
幂等只保证单 Producer 会话、单分区内不重复。
跨分区、跨会话的端到端幂等仍需业务唯一键。
3.5 事务生产者¶
事务可保证多分区写入原子性,并配合消费端实现 Exactly Once 流处理。
transactional.id=order-tx-1
enable.idempotence=true
常见于 Kafka Streams、Flink 等流处理框架。
四、消费者篇¶
4.1 消费者组¶
同一个消费者组内,一个分区同一时刻只能被一个消费者消费。
Topic orders: P0 P1 P2
Group order-service:
consumer-1 -> P0
consumer-2 -> P1
consumer-3 -> P2
如果消费者数量大于分区数量,多余消费者会空闲。
4.2 Offset 提交¶
自动提交:
enable.auto.commit=true
auto.commit.interval.ms=5000
手动提交更可控:
poll -> 处理消息 -> commit offset
语义:
提交时机 |
可能问题 |
|---|---|
处理前提交 |
处理失败会丢消息 |
处理后提交 |
失败重启可能重复消费 |
多数业务选择“至少一次”,即处理成功后提交 offset,并通过幂等处理重复消息。
4.3 Rebalance¶
Rebalance 触发场景:
消费者加入。
消费者离开或心跳超时。
Topic 分区变化。
订阅 Topic 变化。
影响:
Rebalance 期间消费暂停。
可能造成重复消费。
频繁 Rebalance 会影响吞吐和延迟。
优化:
session.timeout.ms=30000
heartbeat.interval.ms=10000
max.poll.interval.ms=300000
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
4.4 消费语义¶
语义 |
说明 |
实现方式 |
|---|---|---|
At most once |
最多一次,可能丢失 |
处理前提交 |
At least once |
至少一次,可能重复 |
处理后提交 |
Exactly once |
精确一次 |
幂等、事务、流处理框架 |
业务系统最常见的是 At least once + 幂等。
五、存储与可靠性篇¶
5.1 顺序写与 Page Cache¶
Kafka 性能高的原因:
追加写日志,磁盘顺序 IO。
大量利用操作系统 Page Cache。
批量发送和批量拉取。
零拷贝传输。
分区并行。
5.2 Segment¶
Kafka 分区日志由多个 Segment 文件组成。
partition-0/
00000000000000000000.log
00000000000000000000.index
00000000000000000000.timeindex
00000000000000123456.log
Segment 便于:
快速定位 offset。
按时间或大小删除过期数据。
降低单文件大小。
5.3 副本与 ISR¶
Partition 0
Leader: broker-1
Follower: broker-2
Follower: broker-3
ISR: broker-1, broker-2, broker-3
关键参数:
replication.factor=3
min.insync.replicas=2
acks=all
当 ISR 数量小于 min.insync.replicas 时,acks=all 的写入会失败,以避免数据丢失。
5.4 消息保留策略¶
按时间:
retention.ms=604800000
按大小:
retention.bytes=107374182400
日志压缩:
cleanup.policy=compact
compact 会按 key 保留最新值,适合配置、维表、状态变更日志。
六、性能优化篇¶
6.1 Producer 优化¶
batch.size=32768
linger.ms=10
compression.type=lz4
buffer.memory=67108864
acks=all
说明:
batch.size增大可提升吞吐。linger.ms允许等待更多消息组成批次。compression.type推荐lz4或zstd。buffer.memory控制生产者缓冲区。
6.2 Consumer 优化¶
fetch.min.bytes=1048576
fetch.max.wait.ms=500
max.poll.records=500
建议:
批量拉取、批量处理。
慢业务逻辑放线程池,但注意 offset 提交顺序。
消费者数不要超过分区数。
处理耗时超过
max.poll.interval.ms会触发 Rebalance。
6.3 Topic 与分区设计¶
分区数影响:
并行度。
文件句柄数量。
Leader 选举时间。
Rebalance 成本。
建议:
根据吞吐和消费者并行度估算分区数。
预留增长空间,但不要无限增大。
强顺序业务用同一 key 进入同一分区。
单 Topic 分区数过多会增加集群管理成本。
七、高可用与集群篇¶
7.1 Broker 集群¶
生产建议:
至少 3 个 Broker。
Topic 副本因子设置为 3。
min.insync.replicas=2。Broker 分布在不同机器或可用区。
监控分区 Leader 均衡。
7.2 Controller¶
Controller 负责:
Broker 上下线管理。
分区 Leader 选举。
元数据维护。
Kafka 早期依赖 ZooKeeper,Kafka 3.x 后 KRaft 模式逐渐成为主流。
7.3 数据可靠配置¶
生产者:
acks=all
enable.idempotence=true
retries=2147483647
Broker:
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
Topic:
replication.factor=3
min.insync.replicas=2
八、Python 使用¶
8.1 客户端选择与安装¶
Python 生产环境优先选 confluent-kafka,它基于 librdkafka,吞吐、稳定性和协议支持更好。kafka-python 是纯 Python 实现,适合学习或轻量场景,但生产高吞吐更常用 confluent-kafka。
pip install confluent-kafka
常用对象:
对象 |
说明 |
|---|---|
|
异步发送消息,依赖 |
|
消费者组消费,使用 |
|
创建、删除、查询 Topic |
|
指定 Topic、分区、offset |
|
异常和错误码 |
8.2 Producer 接口¶
Producer.produce() 是异步接口,消息先进入本地队列;需要调用 producer.poll(0) 触发 delivery callback,退出前用 producer.flush() 等待未发送消息完成。
import json
import socket
from confluent_kafka import Producer
producer = Producer({
"bootstrap.servers": "localhost:9092",
"acks": "all",
"enable.idempotence": True,
"retries": 2147483647,
"compression.type": "lz4",
"client.id": socket.gethostname(),
})
def delivery_report(err, msg):
if err is not None:
print(f"delivery failed: {err}")
else:
print(f"delivered to {msg.topic()} [{msg.partition()}]")
producer.produce(
"orders",
key="order-1001",
value=json.dumps({"order_id": 1001, "status": "paid"}).encode("utf-8"),
headers={
"event_type": "order.paid",
"schema_version": "1",
},
on_delivery=delivery_report,
)
# 触发发送队列和回调处理;高频发送时通常每次 produce 后 poll(0)。
producer.poll(0)
producer.flush()
常用接口:
接口 |
说明 |
|---|---|
|
发送消息 |
|
触发发送、错误和 delivery callback |
|
等待所有待发送消息完成 |
|
本地待发送队列长度,可用于背压判断 |
处理本地发送队列满:
try:
producer.produce("orders", key="order-1001", value=b"...")
except BufferError:
producer.poll(1.0)
producer.produce("orders", key="order-1001", value=b"...")
业务建议:
需要同一订单内事件有序时,
key使用稳定的order_id。value传 bytes,JSON 要显式.encode("utf-8")。delivery_report成功只表示 Kafka 接收成功,不代表下游已消费。flush()不要在每条消息后调用,否则会严重降低吞吐。
8.3 Consumer 接口¶
推荐关闭自动提交,业务处理成功后再提交 offset,实现“至少一次”。
import json
from confluent_kafka import Consumer
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "order-service",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"max.poll.interval.ms": 300000,
"session.timeout.ms": 30000,
})
consumer.subscribe(["orders"])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(msg.error())
continue
key = msg.key().decode("utf-8") if msg.key() else None
event = json.loads(msg.value().decode("utf-8"))
handle_order_event(key, event)
# 业务处理成功后提交 offset。asynchronous=False 便于失败时感知异常。
consumer.commit(message=msg, asynchronous=False)
finally:
consumer.close()
常用接口:
接口 |
说明 |
|---|---|
|
加入消费者组并订阅 Topic |
|
拉取消息;返回 |
|
提交指定消息的下一位 offset |
|
手动分配分区,不使用消费者组自动分配 |
|
跳转到指定 offset |
|
查询当前消费位置 |
|
查询已提交 offset |
|
关闭消费者并离开消费者组 |
Rebalance 回调:
from confluent_kafka import Consumer
def on_assign(consumer, partitions):
print("assigned:", partitions)
def on_revoke(consumer, partitions):
print("revoked:", partitions)
consumer.commit(asynchronous=False)
consumer.subscribe(["orders"], on_assign=on_assign, on_revoke=on_revoke)
8.4 批量消费与手动提交¶
consume() 可以一次拉取多条消息,适合批量写数据库、批量调用下游接口。
messages = consumer.consume(num_messages=100, timeout=1.0)
events = []
last_msg = None
if messages:
for msg in messages:
if msg.error():
continue
events.append(json.loads(msg.value().decode("utf-8")))
last_msg = msg
if events:
save_events(events)
consumer.commit(message=last_msg, asynchronous=False)
注意:
批量提交只适合每个分区内连续处理成功的场景。
如果一批里部分成功、部分失败,不要简单提交最后一条,否则会跳过失败消息。
批量处理耗时不能超过
max.poll.interval.ms,否则会被踢出消费者组。
精细控制 offset 存储:
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "order-service",
"enable.auto.commit": True,
"enable.auto.offset.store": False,
})
msg = consumer.poll(1.0)
if msg and not msg.error():
handle(msg)
consumer.store_offsets(msg)
store_offsets() 只把 offset 存到本地缓存,最终由后台自动提交线程按 auto.commit.interval.ms 提交。
8.5 事务生产者¶
事务生产者用于把多条消息原子写入 Kafka;配合消费端 offset 事务提交,可以实现 consume-process-produce 场景的 Exactly Once。
from confluent_kafka import Producer, KafkaException
producer = Producer({
"bootstrap.servers": "localhost:9092",
"transactional.id": "order-pipeline-1",
"enable.idempotence": True,
})
producer.init_transactions()
try:
producer.begin_transaction()
producer.produce("order-events", key="order-1001", value=b"...")
producer.produce("audit-events", key="order-1001", value=b"...")
producer.commit_transaction()
except KafkaException:
producer.abort_transaction()
raise
如果只是普通业务生产消息,通常开启幂等生产者已足够;事务会增加复杂度和延迟。
8.6 AdminClient 管理 Topic¶
from confluent_kafka.admin import AdminClient, NewTopic
admin = AdminClient({"bootstrap.servers": "localhost:9092"})
futures = admin.create_topics([
NewTopic(
"orders",
num_partitions=3,
replication_factor=1,
config={
"retention.ms": "604800000",
"cleanup.policy": "delete",
},
)
])
for topic, future in futures.items():
try:
future.result()
print(f"created topic: {topic}")
except Exception as exc:
print(f"failed to create {topic}: {exc}")
常用管理接口:
接口 |
说明 |
|---|---|
|
查看集群元数据、Topic、分区 |
|
创建 Topic |
|
删除 Topic |
|
增加分区 |
|
查看配置 |
|
修改配置 |
8.7 Python 使用注意点¶
confluent-kafka的Producer.produce()是异步的,必须定期poll()或最后flush()。Consumer 处理成功后再提交 offset,否则会丢消息。
Kafka 默认至少一次,重复消费是正常情况,消费者必须幂等。
消息体建议带
event_id、event_type、schema_version、occurred_at。不要在每条消息后
flush(),高吞吐场景用批量和压缩。消费者数量不要超过 Topic 分区数,超过后多余消费者会空闲。
对慢处理任务,考虑先落库再异步处理,或增加分区和消费者并行度。
如果使用 Django/FastAPI,Producer 适合做成应用级单例,进程退出时统一
flush()。
九、常见业务场景¶
9.1 日志采集¶
App -> Filebeat/Fluent Bit -> Kafka -> Flink/Spark/ClickHouse/ES
优点:
削峰填谷。
多下游独立消费。
可回放历史数据。
9.2 订单事件¶
order.created
order.paid
order.shipped
order.cancelled
设计建议:
key 使用
order_id,保证同一订单事件有序。消费端以事件 ID 做幂等。
消息体带版本号,方便兼容演进。
9.3 数据同步¶
MySQL Binlog -> Debezium -> Kafka -> 下游系统
适合搜索索引构建、缓存刷新、数据仓库同步。
9.4 延迟消息¶
Kafka 原生不擅长任意延迟消息。
可选方案:
按延迟级别设计多个 Topic。
使用时间轮服务。
使用 Flink 定时器。
对简单场景使用业务表轮询。
十、运维与监控篇¶
10.1 核心指标¶
Broker:
CPU、内存、磁盘、网络。
请求延迟。
Under Replicated Partitions。
Offline Partitions。
Active Controller Count。
Topic/Partition:
Log End Offset。
消费 Lag。
分区 Leader 分布。
分区大小。
Producer:
发送速率。
错误率。
重试次数。
请求延迟。
Consumer:
消费速率。
Lag。
Rebalance 次数。
处理耗时。
10.2 Lag 排查¶
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group order-service
排查步骤:
确认消费者是否在线。
查看单个分区是否 Lag 特别高。
检查消费者处理耗时。
检查是否频繁 Rebalance。
增加消费者数量,但不能超过分区数。
必要时增加分区或优化业务处理。
10.3 常见故障¶
故障 |
可能原因 |
|---|---|
生产失败 |
Broker 不可用、ISR 不足、权限问题 |
消费重复 |
处理成功但 offset 未提交 |
消息丢失 |
|
Lag 持续增长 |
消费能力不足或处理卡住 |
频繁 Rebalance |
心跳超时、处理太慢、消费者反复重启 |
分区热点 |
key 分布不均 |
十一、面试要点¶
11.1 Kafka 为什么吞吐高?¶
顺序写磁盘。
Page Cache。
批量发送和批量拉取。
零拷贝。
分区并行。
消息压缩。
11.2 Kafka 如何保证消息不丢?¶
生产端:
acks=all。开启幂等生产者。
设置足够重试。
Broker:
副本因子大于 1,生产通常为 3。
min.insync.replicas=2。禁用不干净 Leader 选举。
消费端:
业务处理成功后再提交 offset。
消费逻辑幂等。
11.3 Kafka 如何保证顺序?¶
Kafka 只保证单分区内有序。
同一业务 key 发送到同一分区。
消费端单线程或按 key 串行处理。
全局有序只能使用一个分区,但吞吐受限。
11.4 Kafka 为什么会重复消费?¶
消费成功后提交 offset 前宕机。
Rebalance 导致分区重新分配。
手动提交失败。
生产者重试在未开启幂等时可能重复写入。
解决方式:
消费端幂等。
业务唯一键去重。
处理成功后提交 offset。
生产端开启幂等。
11.5 Kafka 和 RabbitMQ 怎么选?¶
场景 |
推荐 |
|---|---|
高吞吐日志流 |
Kafka |
事件回放 |
Kafka |
多下游独立消费 |
Kafka |
复杂路由 |
RabbitMQ |
任务队列 |
RabbitMQ / Django RQ / Celery |
低延迟业务命令消息 |
RabbitMQ |
参考资源¶
Kafka 官方文档:https://kafka.apache.org/documentation/
Confluent Kafka Python:https://docs.confluent.io/platform/current/clients/confluent-kafka-python/
Kafka Improvement Proposals:https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals