Kafka

Kafka 是高吞吐、可持久化、可水平扩展的分布式事件流平台,适合日志采集、消息队列、流式处理、数据同步和事件驱动架构。以 Kafka 3.x 为参考。


目录

  1. 一、基础篇

  2. 二、命令行篇

  3. 三、生产者篇

  4. 四、消费者篇

  5. 五、存储与可靠性篇

  6. 六、性能优化篇

  7. 七、高可用与集群篇

  8. 八、Python 使用

  9. 九、常见业务场景

  10. 十、运维与监控篇

  11. 十一、面试要点

  12. 参考资源


一、基础篇

1.1 安装与启动

Kafka 3.x 推荐使用 KRaft 模式,不再强依赖 ZooKeeper。

Docker Compose 示例:

services:
  kafka:
    image: apache/kafka:3.7.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

启动:

docker compose up -d

常用端口:

端口

说明

9092

Broker 客户端访问

9093

KRaft Controller 通信

2181

ZooKeeper 模式使用


1.2 核心概念

Producer -> Topic -> Partition -> Broker
                         |
                         v
                    Consumer Group

概念

说明

Broker

Kafka 服务节点

Topic

逻辑消息主题

Partition

Topic 的物理分片

Offset

消息在分区内的递增位置

Producer

生产者,发送消息

Consumer

消费者,读取消息

Consumer Group

消费者组,同组内分摊分区

Replica

分区副本

Leader

分区主副本,负责读写

Follower

从副本,复制 Leader 数据

ISR

与 Leader 保持同步的副本集合


1.3 Topic、Partition、Offset

Topic 是逻辑分类,Partition 是并行和扩展的基础。

topic: orders
  partition-0: offset 0,1,2,3...
  partition-1: offset 0,1,2,3...
  partition-2: offset 0,1,2,3...

特点:

  • 同一分区内消息有序。

  • 不同分区之间不保证全局顺序。

  • Offset 只在分区内有意义。

  • 分区数量决定最大并行消费能力。


1.4 Kafka 与传统消息队列

对比

Kafka

RabbitMQ / Redis Queue

模型

日志流

队列

消息保存

按时间/大小保留

通常消费后删除

消费模式

多消费者组可重复消费

多为竞争消费

吞吐

极高

中高

顺序性

分区内有序

队列内有序

场景

日志、流处理、数据管道

任务队列、业务解耦


二、命令行篇

2.1 Topic 管理

# 创建 Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic orders \
  --partitions 3 \
  --replication-factor 1

# 查看 Topic
kafka-topics.sh --bootstrap-server localhost:9092 --list

# 查看详情
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders

# 删除 Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
  --delete --topic orders

2.2 生产与消费

# 生产消息
kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders

# 消费消息
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning

# 指定消费者组
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --group order-service

2.3 消费者组

# 查看消费者组
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --list

# 查看消费进度
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group order-service

重点字段:

字段

说明

CURRENT-OFFSET

当前已提交 offset

LOG-END-OFFSET

分区最新 offset

LAG

积压消息数

CONSUMER-ID

消费者实例


三、生产者篇

3.1 发送流程

send()
  -> Serializer
  -> Partitioner
  -> RecordAccumulator
  -> Sender thread
  -> Broker
  -> ACK

生产者先把消息写入本地缓冲区,后台 Sender 线程批量发送到 Broker。


3.2 ACK 机制

参数

说明

可靠性

性能

acks=0

不等待 Broker 确认

最低

最高

acks=1

Leader 写入即确认

acks=all

ISR 副本确认后返回

最高

较低

生产推荐:

acks=all
enable.idempotence=true
retries=2147483647
max.in.flight.requests.per.connection=5

3.3 分区策略

有 key:

partition = hash(key) % partition_count

无 key:

  • 默认使用粘性分区策略,提高批量发送效率。

  • 一批消息尽量发往同一分区,批次满后切换。

建议:

  • 需要同一业务实体有序时,使用稳定 key,如 order_id

  • key 低基数会导致分区热点。

  • 分区数增加后,key 到分区的映射可能变化。


3.4 幂等生产者

enable.idempotence=true
acks=all
retries=2147483647

幂等生产者可避免生产者重试导致的单分区重复写入。

注意:

  • 幂等只保证单 Producer 会话、单分区内不重复。

  • 跨分区、跨会话的端到端幂等仍需业务唯一键。


3.5 事务生产者

事务可保证多分区写入原子性,并配合消费端实现 Exactly Once 流处理。

transactional.id=order-tx-1
enable.idempotence=true

常见于 Kafka Streams、Flink 等流处理框架。


四、消费者篇

4.1 消费者组

同一个消费者组内,一个分区同一时刻只能被一个消费者消费。

Topic orders: P0 P1 P2

Group order-service:
  consumer-1 -> P0
  consumer-2 -> P1
  consumer-3 -> P2

如果消费者数量大于分区数量,多余消费者会空闲。


4.2 Offset 提交

自动提交:

enable.auto.commit=true
auto.commit.interval.ms=5000

手动提交更可控:

poll -> 处理消息 -> commit offset

语义:

提交时机

可能问题

处理前提交

处理失败会丢消息

处理后提交

失败重启可能重复消费

多数业务选择“至少一次”,即处理成功后提交 offset,并通过幂等处理重复消息。


4.3 Rebalance

Rebalance 触发场景:

  • 消费者加入。

  • 消费者离开或心跳超时。

  • Topic 分区变化。

  • 订阅 Topic 变化。

影响:

  • Rebalance 期间消费暂停。

  • 可能造成重复消费。

  • 频繁 Rebalance 会影响吞吐和延迟。

优化:

session.timeout.ms=30000
heartbeat.interval.ms=10000
max.poll.interval.ms=300000
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

4.4 消费语义

语义

说明

实现方式

At most once

最多一次,可能丢失

处理前提交

At least once

至少一次,可能重复

处理后提交

Exactly once

精确一次

幂等、事务、流处理框架

业务系统最常见的是 At least once + 幂等。


五、存储与可靠性篇

5.1 顺序写与 Page Cache

Kafka 性能高的原因:

  • 追加写日志,磁盘顺序 IO。

  • 大量利用操作系统 Page Cache。

  • 批量发送和批量拉取。

  • 零拷贝传输。

  • 分区并行。


5.2 Segment

Kafka 分区日志由多个 Segment 文件组成。

partition-0/
  00000000000000000000.log
  00000000000000000000.index
  00000000000000000000.timeindex
  00000000000000123456.log

Segment 便于:

  • 快速定位 offset。

  • 按时间或大小删除过期数据。

  • 降低单文件大小。


5.3 副本与 ISR

Partition 0
  Leader: broker-1
  Follower: broker-2
  Follower: broker-3
  ISR: broker-1, broker-2, broker-3

关键参数:

replication.factor=3
min.insync.replicas=2
acks=all

当 ISR 数量小于 min.insync.replicas 时,acks=all 的写入会失败,以避免数据丢失。


5.4 消息保留策略

按时间:

retention.ms=604800000

按大小:

retention.bytes=107374182400

日志压缩:

cleanup.policy=compact

compact 会按 key 保留最新值,适合配置、维表、状态变更日志。


六、性能优化篇

6.1 Producer 优化

batch.size=32768
linger.ms=10
compression.type=lz4
buffer.memory=67108864
acks=all

说明:

  • batch.size 增大可提升吞吐。

  • linger.ms 允许等待更多消息组成批次。

  • compression.type 推荐 lz4zstd

  • buffer.memory 控制生产者缓冲区。


6.2 Consumer 优化

fetch.min.bytes=1048576
fetch.max.wait.ms=500
max.poll.records=500

建议:

  • 批量拉取、批量处理。

  • 慢业务逻辑放线程池,但注意 offset 提交顺序。

  • 消费者数不要超过分区数。

  • 处理耗时超过 max.poll.interval.ms 会触发 Rebalance。


6.3 Topic 与分区设计

分区数影响:

  • 并行度。

  • 文件句柄数量。

  • Leader 选举时间。

  • Rebalance 成本。

建议:

  • 根据吞吐和消费者并行度估算分区数。

  • 预留增长空间,但不要无限增大。

  • 强顺序业务用同一 key 进入同一分区。

  • 单 Topic 分区数过多会增加集群管理成本。


七、高可用与集群篇

7.1 Broker 集群

生产建议:

  • 至少 3 个 Broker。

  • Topic 副本因子设置为 3。

  • min.insync.replicas=2

  • Broker 分布在不同机器或可用区。

  • 监控分区 Leader 均衡。


7.2 Controller

Controller 负责:

  • Broker 上下线管理。

  • 分区 Leader 选举。

  • 元数据维护。

Kafka 早期依赖 ZooKeeper,Kafka 3.x 后 KRaft 模式逐渐成为主流。


7.3 数据可靠配置

生产者:

acks=all
enable.idempotence=true
retries=2147483647

Broker:

default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false

Topic:

replication.factor=3
min.insync.replicas=2

八、Python 使用

8.1 客户端选择与安装

Python 生产环境优先选 confluent-kafka,它基于 librdkafka,吞吐、稳定性和协议支持更好。kafka-python 是纯 Python 实现,适合学习或轻量场景,但生产高吞吐更常用 confluent-kafka

pip install confluent-kafka

常用对象:

对象

说明

Producer

异步发送消息,依赖 poll()/flush() 驱动回调

Consumer

消费者组消费,使用 subscribe() + poll()

AdminClient

创建、删除、查询 Topic

TopicPartition

指定 Topic、分区、offset

KafkaException / KafkaError

异常和错误码


8.2 Producer 接口

Producer.produce() 是异步接口,消息先进入本地队列;需要调用 producer.poll(0) 触发 delivery callback,退出前用 producer.flush() 等待未发送消息完成。

import json
import socket
from confluent_kafka import Producer

producer = Producer({
    "bootstrap.servers": "localhost:9092",
    "acks": "all",
    "enable.idempotence": True,
    "retries": 2147483647,
    "compression.type": "lz4",
    "client.id": socket.gethostname(),
})

def delivery_report(err, msg):
    if err is not None:
        print(f"delivery failed: {err}")
    else:
        print(f"delivered to {msg.topic()} [{msg.partition()}]")

producer.produce(
    "orders",
    key="order-1001",
    value=json.dumps({"order_id": 1001, "status": "paid"}).encode("utf-8"),
    headers={
        "event_type": "order.paid",
        "schema_version": "1",
    },
    on_delivery=delivery_report,
)

# 触发发送队列和回调处理;高频发送时通常每次 produce 后 poll(0)。
producer.poll(0)

producer.flush()

常用接口:

接口

说明

produce(topic, key, value, headers, partition, timestamp, on_delivery)

发送消息

poll(timeout)

触发发送、错误和 delivery callback

flush(timeout)

等待所有待发送消息完成

len(producer)

本地待发送队列长度,可用于背压判断

处理本地发送队列满:

try:
    producer.produce("orders", key="order-1001", value=b"...")
except BufferError:
    producer.poll(1.0)
    producer.produce("orders", key="order-1001", value=b"...")

业务建议:

  • 需要同一订单内事件有序时,key 使用稳定的 order_id

  • value 传 bytes,JSON 要显式 .encode("utf-8")

  • delivery_report 成功只表示 Kafka 接收成功,不代表下游已消费。

  • flush() 不要在每条消息后调用,否则会严重降低吞吐。


8.3 Consumer 接口

推荐关闭自动提交,业务处理成功后再提交 offset,实现“至少一次”。

import json
from confluent_kafka import Consumer

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "order-service",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    "max.poll.interval.ms": 300000,
    "session.timeout.ms": 30000,
})

consumer.subscribe(["orders"])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(msg.error())
            continue

        key = msg.key().decode("utf-8") if msg.key() else None
        event = json.loads(msg.value().decode("utf-8"))

        handle_order_event(key, event)

        # 业务处理成功后提交 offset。asynchronous=False 便于失败时感知异常。
        consumer.commit(message=msg, asynchronous=False)
finally:
    consumer.close()

常用接口:

接口

说明

subscribe(["topic"], on_assign=..., on_revoke=...)

加入消费者组并订阅 Topic

poll(timeout)

拉取消息;返回 None 表示超时无消息

commit(message=msg, asynchronous=False)

提交指定消息的下一位 offset

assign([TopicPartition(...)])

手动分配分区,不使用消费者组自动分配

seek(TopicPartition(...))

跳转到指定 offset

position(partitions)

查询当前消费位置

committed(partitions)

查询已提交 offset

close()

关闭消费者并离开消费者组

Rebalance 回调:

from confluent_kafka import Consumer

def on_assign(consumer, partitions):
    print("assigned:", partitions)

def on_revoke(consumer, partitions):
    print("revoked:", partitions)
    consumer.commit(asynchronous=False)

consumer.subscribe(["orders"], on_assign=on_assign, on_revoke=on_revoke)

8.4 批量消费与手动提交

consume() 可以一次拉取多条消息,适合批量写数据库、批量调用下游接口。

messages = consumer.consume(num_messages=100, timeout=1.0)
events = []
last_msg = None

if messages:
    for msg in messages:
        if msg.error():
            continue
        events.append(json.loads(msg.value().decode("utf-8")))
        last_msg = msg

    if events:
        save_events(events)
        consumer.commit(message=last_msg, asynchronous=False)

注意:

  • 批量提交只适合每个分区内连续处理成功的场景。

  • 如果一批里部分成功、部分失败,不要简单提交最后一条,否则会跳过失败消息。

  • 批量处理耗时不能超过 max.poll.interval.ms,否则会被踢出消费者组。

精细控制 offset 存储:

consumer = Consumer({
    "bootstrap.servers": "localhost:9092",
    "group.id": "order-service",
    "enable.auto.commit": True,
    "enable.auto.offset.store": False,
})

msg = consumer.poll(1.0)
if msg and not msg.error():
    handle(msg)
    consumer.store_offsets(msg)

store_offsets() 只把 offset 存到本地缓存,最终由后台自动提交线程按 auto.commit.interval.ms 提交。


8.5 事务生产者

事务生产者用于把多条消息原子写入 Kafka;配合消费端 offset 事务提交,可以实现 consume-process-produce 场景的 Exactly Once。

from confluent_kafka import Producer, KafkaException

producer = Producer({
    "bootstrap.servers": "localhost:9092",
    "transactional.id": "order-pipeline-1",
    "enable.idempotence": True,
})

producer.init_transactions()

try:
    producer.begin_transaction()
    producer.produce("order-events", key="order-1001", value=b"...")
    producer.produce("audit-events", key="order-1001", value=b"...")
    producer.commit_transaction()
except KafkaException:
    producer.abort_transaction()
    raise

如果只是普通业务生产消息,通常开启幂等生产者已足够;事务会增加复杂度和延迟。


8.6 AdminClient 管理 Topic

from confluent_kafka.admin import AdminClient, NewTopic

admin = AdminClient({"bootstrap.servers": "localhost:9092"})

futures = admin.create_topics([
    NewTopic(
        "orders",
        num_partitions=3,
        replication_factor=1,
        config={
            "retention.ms": "604800000",
            "cleanup.policy": "delete",
        },
    )
])

for topic, future in futures.items():
    try:
        future.result()
        print(f"created topic: {topic}")
    except Exception as exc:
        print(f"failed to create {topic}: {exc}")

常用管理接口:

接口

说明

list_topics()

查看集群元数据、Topic、分区

create_topics([NewTopic(...)])

创建 Topic

delete_topics(["topic"])

删除 Topic

create_partitions(...)

增加分区

describe_configs(...)

查看配置

alter_configs(...)

修改配置


8.7 Python 使用注意点

  • confluent-kafkaProducer.produce() 是异步的,必须定期 poll() 或最后 flush()

  • Consumer 处理成功后再提交 offset,否则会丢消息。

  • Kafka 默认至少一次,重复消费是正常情况,消费者必须幂等。

  • 消息体建议带 event_idevent_typeschema_versionoccurred_at

  • 不要在每条消息后 flush(),高吞吐场景用批量和压缩。

  • 消费者数量不要超过 Topic 分区数,超过后多余消费者会空闲。

  • 对慢处理任务,考虑先落库再异步处理,或增加分区和消费者并行度。

  • 如果使用 Django/FastAPI,Producer 适合做成应用级单例,进程退出时统一 flush()


九、常见业务场景

9.1 日志采集

App -> Filebeat/Fluent Bit -> Kafka -> Flink/Spark/ClickHouse/ES

优点:

  • 削峰填谷。

  • 多下游独立消费。

  • 可回放历史数据。


9.2 订单事件

order.created
order.paid
order.shipped
order.cancelled

设计建议:

  • key 使用 order_id,保证同一订单事件有序。

  • 消费端以事件 ID 做幂等。

  • 消息体带版本号,方便兼容演进。


9.3 数据同步

MySQL Binlog -> Debezium -> Kafka -> 下游系统

适合搜索索引构建、缓存刷新、数据仓库同步。


9.4 延迟消息

Kafka 原生不擅长任意延迟消息。

可选方案:

  • 按延迟级别设计多个 Topic。

  • 使用时间轮服务。

  • 使用 Flink 定时器。

  • 对简单场景使用业务表轮询。


十、运维与监控篇

10.1 核心指标

Broker:

  • CPU、内存、磁盘、网络。

  • 请求延迟。

  • Under Replicated Partitions。

  • Offline Partitions。

  • Active Controller Count。

Topic/Partition:

  • Log End Offset。

  • 消费 Lag。

  • 分区 Leader 分布。

  • 分区大小。

Producer:

  • 发送速率。

  • 错误率。

  • 重试次数。

  • 请求延迟。

Consumer:

  • 消费速率。

  • Lag。

  • Rebalance 次数。

  • 处理耗时。


10.2 Lag 排查

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group order-service

排查步骤:

  1. 确认消费者是否在线。

  2. 查看单个分区是否 Lag 特别高。

  3. 检查消费者处理耗时。

  4. 检查是否频繁 Rebalance。

  5. 增加消费者数量,但不能超过分区数。

  6. 必要时增加分区或优化业务处理。


10.3 常见故障

故障

可能原因

生产失败

Broker 不可用、ISR 不足、权限问题

消费重复

处理成功但 offset 未提交

消息丢失

acks=0/1、未开启幂等、错误提交 offset

Lag 持续增长

消费能力不足或处理卡住

频繁 Rebalance

心跳超时、处理太慢、消费者反复重启

分区热点

key 分布不均


十一、面试要点

11.1 Kafka 为什么吞吐高?

  • 顺序写磁盘。

  • Page Cache。

  • 批量发送和批量拉取。

  • 零拷贝。

  • 分区并行。

  • 消息压缩。

11.2 Kafka 如何保证消息不丢?

生产端:

  • acks=all

  • 开启幂等生产者。

  • 设置足够重试。

Broker:

  • 副本因子大于 1,生产通常为 3。

  • min.insync.replicas=2

  • 禁用不干净 Leader 选举。

消费端:

  • 业务处理成功后再提交 offset。

  • 消费逻辑幂等。


11.3 Kafka 如何保证顺序?

  • Kafka 只保证单分区内有序。

  • 同一业务 key 发送到同一分区。

  • 消费端单线程或按 key 串行处理。

  • 全局有序只能使用一个分区,但吞吐受限。


11.4 Kafka 为什么会重复消费?

  • 消费成功后提交 offset 前宕机。

  • Rebalance 导致分区重新分配。

  • 手动提交失败。

  • 生产者重试在未开启幂等时可能重复写入。

解决方式:

  • 消费端幂等。

  • 业务唯一键去重。

  • 处理成功后提交 offset。

  • 生产端开启幂等。


11.5 Kafka 和 RabbitMQ 怎么选?

场景

推荐

高吞吐日志流

Kafka

事件回放

Kafka

多下游独立消费

Kafka

复杂路由

RabbitMQ

任务队列

RabbitMQ / Django RQ / Celery

低延迟业务命令消息

RabbitMQ


参考资源