Kafka Broker 和集群
本章将详细介绍 Kafka Broker 的工作原理、集群管理和高可用机制。
Broker 概述
什么是 Broker?
Broker 是 Kafka 集群中的服务节点,负责接收生产者发送的消息并持久化到磁盘,同时响应消费者的拉取请求。
┌─────────────────────────────────────────────────────────────┐
│ Kafka 集群架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Zookeeper / KRaft │ │
│ │ (集群元数据管理) │ │
│ │ - Broker 注册 │ │
│ │ - 主题/分区元数据 │ │
│ │ - ACL 配置 │ │
│ │ - 控制器选举 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ :9092 │ │ :9092 │ │ :9092 │ │
│ │ │ │ │ │ │ │
│ │ Leader: │ │ Leader: │ │ Leader: │ │
│ │ P0, P3 │ │ P1, P4 │ │ P2, P5 │ │
│ │ │ │ │ │ │ │
│ │ Follower:│ │ Follower:│ │ Follower:│ │
│ │ P1, P2 │ │ P0, P2 │ │ P0, P1 │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Broker 职责
- 消息存储:接收并持久化消息到磁盘
- 消息复制:管理分区副本同步
- Leader 选举:处理分区 Leader 选举
- 元数据管理:维护主题、分区、ACL 等元数据
- 消费者协调:协调消费者组 Rebalance
Broker 配置
基本配置
# broker.id(每个 Broker 唯一)
broker.id=0
# 监听地址
listeners=PLAINTEXT://0.0.0.0:9092
# 对外公告地址(客户端连接使用)
advertised.listeners=PLAINTEXT://localhost:9092
# 日志目录(可以配置多个,用逗号分隔)
log.dirs=/var/lib/kafka/logs
# 主题默认配置
num.partitions=3
default.replication.factor=1
内存和性能配置
# JVM 堆内存(建议 4-6GB)
KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
# 页面缓存(建议留足够内存给 OS)
# Kafka 大量使用页面缓存,OS 内存要充足
# 网络和日志处理线程数
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
日志配置
# 日志段大小(默认 1GB)
log.segment.bytes=1073741824
# 日志段检查间隔
log.segment.check.interval.ms=300000
# 消息保留时间(默认 7 天)
log.retention.hours=168
# 日志保留大小(-1 表示不限制)
log.retention.bytes=-1
# 日志压缩
log.cleanup.policy=delete
# 最小清洁比率(用于压缩)
log.cleaner.min.cleanable.ratio=0.5
副本配置
# 副本Fetcher线程数
replica.lag.time.max.ms=30000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
# 最小 ISR 数量
min.insync.replicas=1
# 控制器副本数
controller.quorum.voters=1@localhost:9093
控制器(Controller)
什么是控制器?
控制器是 Kafka 集群中的一个特殊 Broker,负责管理整个集群的元数据操作:
- 分区 Leader 选举:当 Leader 不可用时,选举新的 Leader
- 主题管理:创建、删除主题
- 分区重分配:管理分区副本分配
- Broker 管理:处理 Broker 加入和离开
┌─────────────────────────────────────────────────────────────┐
│ 控制器职责 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Controller (Broker 1) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Topic │ │ Partition │ │ │
│ │ │ Manager │ │ Manager │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ Leader Election │ │ │
│ │ │ (分区 Leader 故障时选举新 Leader) │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
控制器选举
KRaft 模式
Kafka 3.0+ 推荐使用 KRaft 模式,不再依赖 Zookeeper:
# KRaft 模式配置
process.roles=broker,controller
node.id=1
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
controller.listener.names=CONTROLLER
controller.quorum.voters=1@localhost:9093
对比:
| 特性 | Zookeeper 模式 | KRaft 模式 |
|---|---|---|
| 依赖 | 需要 Zookeeper | 无需额外依赖 |
| 元数据 | 存储在 ZK | 存储在 Kafka |
| 部署 | 更复杂 | 更简单 |
| 扩展性 | 受限 | 更好 |
| 稳定性 | 成熟稳定 | 3.0+ 生产可用 |
集群管理
Broker 管理
# 查看 Broker 列表
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# 查看 Broker 日志
tail -f /var/lib/kafka/logs/server.log
# 优雅停止 Broker
kafka-server-stop.sh
主题管理
# 创建主题(指定分区和副本)
kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3
# 查看主题
kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092
# 修改分区数
kafka-topics.sh --alter \
--topic my-topic \
--partitions 9 \
--bootstrap-server localhost:9092
# 删除主题
kafka-topics.sh --delete \
--topic my-topic \
--bootstrap-server localhost:9092
分区重分配
# 生成分区重分配方案
kafka-reassign-partitions.sh --generate \
--topics-to-move-json-file topics.json \
--bootstrap-server localhost:9092 \
--broker-list "0,1,2"
# 执行重分配
kafka-reassign-partitions.sh --execute \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092
# 验证重分配
kafka-reassign-partitions.sh --verify \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092
Leader 选举
Leader 选举机制
┌─────────────────────────────────────────────────────────────┐
│ Leader 选举过程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 场景:分区 P0 的 Leader (Broker 1) 故障 │
│ │
│ 1. Controller 检测到 Leader 不可用 │
│ │ │
│ ▼ │
│ 2. 检查 ISR(In-Sync Replicas)列表 │
│ ┌─────────────────────────────────────┐ │
│ │ ISR: [Broker 2, Broker 3] │ │
│ │ (与 Leader 保持同步的副本) │ │
│ └─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 3. 从 ISR 中选择新的 Leader │
│ - 优先选择 AR(Assigned Replicas)中最早的 │
│ - 确保新 Leader 有完整的消息 │
│ │ │
│ ▼ │
│ 4. 更新元数据 │
│ ┌─────────────────────────────────────┐ │
│ │ Partition: P0 │ │
│ │ Leader: Broker 2 │ │
│ │ ISR: [Broker 2, Broker 3] │ │
│ └─────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
首选副本选举
# 执行首选副本选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--topic my-topic \
--partition 0 \
--election-type preferred
手动指定 Leader
# 手动指定分区 Leader
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--election-type unclean \
--topic my-topic \
--partition 1
注意
使用 --election-type unclean 可能导致数据丢失,仅在紧急情况下使用。
高可用机制
故障检测
故障处理流程
-
Broker 故障:
- Controller 检测到 Broker 离线
- 触发受影响分区的 Leader 选举
- 选举新 Leader,更新 ISR
-
网络抖动:
- 缩短
replica.lag.time.max.ms检测时间 - 避免频繁切换
- 缩短
-
磁盘故障:
- 配置多个日志目录
- 使用 RAID
配置高可用
# 确保最小 ISR 数量
min.insync.replicas=2
# 副本滞后检测
replica.lag.time.max.ms=30000
replica.socket.timeout.ms=30000
# 失败重试
default.replication.factor=3
集群调优
性能优化配置
# 网络线程优化
num.network.threads=8
# IO 线程优化
num.io.threads=16
# 缓冲区优化
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
# 批量处理
batch.size=32768
linger.ms=10
容量规划
| 指标 | 说明 | 规划建议 |
|---|---|---|
| 存储容量 | 消息保留大小 | 每日数据量 × 保留天数 × 2 |
| Broker 数量 | 集群规模 | 根据吞吐量和容错需求 |
| 分区数量 | 并行度 | 目标吞吐量 / 单分区吞吐 |
| 副本数 | 容错能力 | 2-3(根据重要性) |
监控关键指标
# 查看 Broker 指标
kafka-broker-metrics.sh --bootstrap-server localhost:9092
# 关键指标
# - UnderReplicatedPartitions: 复制滞后的分区
# - OfflinePartitionsCount: 离线分区数
# - ActiveControllerCount: 活跃控制器数量
# - LeaderElectionRate: Leader 选举频率
常见问题处理
1. 分区不可用
# 检查副本状态
kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092
# 检查 ISR
# 如果 ISR 为空,可能需要使用 unclean 选举
2. Leader 分布不均
# 均衡 Leader 分布
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--all-topics \
--election-type preferred
3. 磁盘空间不足
# 清理旧日志
kafka-logDirs.sh --describe \
--bootstrap-server localhost:9092
# 删除主题
kafka-topics.sh --delete \
--topic old-topic \
--bootstrap-server localhost:9092
多集群复制
MirrorMaker 2
# 创建复制连接器
cat connect-mirror-source.properties
bootstrap.servers=source-cluster:9092
topics=topic1,topic2
groups=group1
replication.factor=3
# 启动 MirrorMaker
./bin/connect-mirror-maker.sh connect-mirror-source.properties
集群联邦
小结
- Broker 是 Kafka 集群中的服务节点,负责消息存储和复制
- 控制器负责集群元数据管理和 Leader 选举
- KRaft 模式简化了 Kafka 部署,不再依赖 Zookeeper
- 高可用通过副本机制和 Leader 选举实现
- 合理的容量规划和监控对生产环境至关重要
下一步
接下来让我们学习 副本机制 的详细原理。