Kafka Broker 和集群
本章将详细介绍 Kafka Broker 的工作原理、集群管理和高可用机制。
Broker 概述
什么是 Broker?
Broker 是 Kafka 集群中的服务节点,负责接收生产者发送的消息并持久化到磁盘,同时响应消费者的拉取请求。一个 Kafka 集群由多个 Broker 组成,它们协同工作提供高吞吐量和容错能力。
Broker 职责
- 消息存储:接收并持久化消息到磁盘,支持高效顺序写入
- 消息复制:管理分区副本同步,保证数据冗余
- Leader 选举:处理分区 Leader 选举,保证服务可用性
- 元数据管理:维护主题、分区、ACL 等元数据
- 消费者协调:协调消费者组 Rebalance,管理偏移量提交
Broker 与客户端的交互
Broker 配置
基本配置
# broker.id(每个 Broker 唯一)
broker.id=0
# 监听地址
listeners=PLAINTEXT://0.0.0.0:9092
# 对外公告地址(客户端连接使用)
advertised.listeners=PLAINTEXT://localhost:9092
# 日志目录(可以配置多个,用逗号分隔)
log.dirs=/var/lib/kafka/logs
# 主题默认配置
num.partitions=3
default.replication.factor=1
内存和性能配置
# JVM 堆内存(建议 4-6GB)
# 环境变量设置: KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
# 网络和日志处理线程数
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 后台任务线程
background.threads=10
日志配置
# 日志段大小(默认 1GB)
log.segment.bytes=1073741824
# 日志段检查间隔
log.segment.check.interval.ms=300000
# 消息保留时间(默认 7 天)
log.retention.hours=168
# 日志保留大小(-1 表示不限制)
log.retention.bytes=-1
# 日志清理策略
log.cleanup.policy=delete
# 日志刷新配置
log.flush.interval.messages=10000
log.flush.interval.ms=1000
副本配置
# 副本 Fetcher 线程数
num.replica.fetchers=1
# 副本滞后检测
replica.lag.time.max.ms=30000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
# 最小 ISR 数量
min.insync.replicas=1
控制器(Controller)
什么是控制器?
控制器是 Kafka 集群中的一个特殊 Broker,负责管理整个集群的元数据操作:
- 分区 Leader 选举:当 Leader 不可用时,选举新的 Leader
- 主题管理:创建、删除主题
- 分区重分配:管理分区副本分布
- Broker 管理:处理 Broker 加入和离开
- 首选 Leader 选举:触发首选副本成为 Leader
控制器选举
选举过程:
- Broker 启动时,尝试在 KRaft 中注册为 Controller 候选
- 通过 Raft 协议选举出唯一的 Controller
- Controller 加载集群元数据并开始管理工作
控制器职责
当 Broker 发生故障时,Controller 负责处理:
KRaft 模式
Kafka 3.0+ 推荐使用 KRaft 模式,不再依赖 Zookeeper。
KRaft 架构
KRaft 模式配置
# KRaft 模式配置
process.roles=broker,controller # 或 broker, controller 分离
node.id=1
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
controller.listener.names=CONTROLLER
controller.quorum.voters=1@localhost:9093,2@localhost:9094,3@localhost:9095
Zookeeper vs KRaft 对比
| 特性 | Zookeeper 模式 | KRaft 模式 |
|---|---|---|
| 依赖 | 需要 Zookeeper 集群 | 无需额外依赖 |
| 元数据存储 | 存储在 Zookeeper | 存储在 Kafka 内部主题 |
| 部署复杂度 | 较高 | 更简单 |
| 扩展性 | 受限于 Zookeeper | 更好 |
| 稳定性 | 成熟稳定 | 3.0+ 生产可用 |
| Controller 选举 | 通过 Zookeeper | 通过 Raft 协议 |
集群管理
Broker 管理
# 查看 Broker 列表
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# 查看 Broker 日志目录
kafka-log-dirs.sh --describe \
--bootstrap-server localhost:9092 \
--broker-list 0,1,2
# 优雅停止 Broker
kafka-server-stop.sh
集群健康检查
# 检查集群元数据
kafka-metadata.sh --snapshot /var/lib/kafka/data/__cluster_metadata-0/00000000000000000000.log \
--command "clusterId;broker;topic"
# 检查分区分布
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--under-replicated-partitions
# 检查离线分区
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--unavailable-partitions
分区重分配
# 1. 创建重分配计划 JSON
cat > topics-to-move.json << EOF
{
"version": 1,
"topics": [
{"topic": "orders"}
]
}
EOF
# 2. 生成分配方案
kafka-reassign-partitions.sh --generate \
--topics-to-move-json-file topics-to-move.json \
--broker-list "0,1,2" \
--bootstrap-server localhost:9092
# 3. 执行重分配
kafka-reassign-partitions.sh --execute \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092
# 4. 验证重分配进度
kafka-reassign-partitions.sh --verify \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092
Leader 选举
Leader 选举机制
选举触发条件:
- Broker 故障(心跳超时)
- 手动触发首选副本选举
- 手动指定 Leader
首选副本选举
首选副本是创建分区时指定的第一个副本,通常应成为 Leader:
# 对特定分区执行首选副本选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--topic orders \
--partition 0 \
--election-type preferred
# 对所有分区执行
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--all-topic-partitions \
--election-type preferred
Unclean Leader 选举
当 ISR 为空时,允许从非 ISR 副本中选举 Leader:
# 手动触发 Unclean 选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--election-type unclean \
--topic orders \
--partition 0
注意
Unclean 选举可能导致数据丢失,仅在紧急情况下使用。默认情况下 unclean.leader.election.enable=false。
高可用机制
故障检测
Kafka 通过心跳机制检测 Broker 故障:
# Broker 心跳相关配置
# session.timeout.ms: 会话超时时间
# heartbeat.interval.ms: 心跳间隔
# 在 KRaft 模式下
controller.quorum.fetch.timeout.ms=2000
故障处理流程
高可用配置
# 确保最小 ISR 数量
min.insync.replicas=2
# 副本滞后检测
replica.lag.time.max.ms=30000
# 副本 Fetcher 配置
num.replica.fetchers=2
replica.fetch.max.bytes=1048576
# 失败重试
default.replication.factor=3
集群调优
性能优化配置
# 网络线程优化
num.network.threads=8
num.io.threads=16
# 缓冲区优化
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
# 批量处理
batch.size=32768
linger.ms=10
# 日志优化
log.flush.interval.messages=10000
log.flush.interval.ms=1000
容量规划
| 指标 | 说明 | 规划建议 |
|---|---|---|
| 存储容量 | 消息保留大小 | 每日数据量 × 保留天数 × 副本数 × 1.2 |
| Broker 数量 | 集群规模 | 根据吞吐量和容错需求确定 |
| 分区数量 | 并行度 | 目标吞吐量 / 单分区吞吐量 |
| 副本数 | 容错能力 | 2-3(根据重要性确定) |
JVM 调优
# 设置 JVM 参数
export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
# 启动 Kafka
bin/kafka-server-start.sh config/kraft/server.properties
监控关键指标
Broker 指标
| 指标名称 | 说明 | 正常范围 |
|---|---|---|
MessagesInPerSec | 每秒消息数 | 根据业务预期 |
BytesInPerSec | 每秒入站字节 | < 网络带宽 |
BytesOutPerSec | 每秒出站字节 | < 网络带宽 |
UnderReplicatedPartitions | 复制不足分区数 | 0 |
OfflinePartitionsCount | 离线分区数 | 0 |
ActiveControllerCount | 活跃控制器数 | 1 |
LeaderElectionRate | Leader 选举频率 | 接近 0 |
JMX 监控
# 启用 JMX
export JMX_PORT=9999
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
# 使用 jmxterm 查看指标
java -jar jmxterm-1.0.0.jar -l localhost:9999
常见问题处理
1. 分区不可用
# 检查副本状态
kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092
# 如果 ISR 为空,可能需要使用 Unclean 选举
# 或等待 Follower 追上 Leader
2. Leader 分布不均
# 检查 Leader 分布
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 | grep Leader
# 执行首选副本选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--all-topic-partitions \
--election-type preferred
3. 磁盘空间不足
# 检查日志目录大小
kafka-log-dirs.sh --describe \
--bootstrap-server localhost:9092
# 清理旧日志(调整保留策略)
kafka-configs.sh --alter \
--entity-type topics \
--entity-name large-topic \
--add-config retention.hours=24 \
--bootstrap-server localhost:9092
4. Broker 无法启动
# 检查日志
tail -f /var/log/kafka/server.log
# 常见原因:
# 1. broker.id 冲突
# 2. 端口被占用
# 3. 日志目录权限问题
# 4. KRaft 元数据损坏
多集群复制
MirrorMaker 2
# mm2.properties
clusters = source, target
source.bootstrap.servers = source-kafka:9092
target.bootstrap.servers = target-kafka:9092
# 复制配置
source->target.enabled = true
source->target.topics = .*
source->target.replication.factor = 3
启动 MirrorMaker 2:
./bin/connect-mirror-maker.sh mm2.properties
小结
- Broker 是 Kafka 集群中的服务节点,负责消息存储和复制
- Controller 负责集群元数据管理和 Leader 选举
- KRaft 模式 简化了 Kafka 部署,不再依赖 Zookeeper
- 高可用 通过副本机制和 Leader 选举实现
- 合理的 容量规划 和 监控 对生产环境至关重要
下一步
接下来让我们学习 副本机制 的详细原理。