跳到主要内容

Kafka Broker 和集群

本章将详细介绍 Kafka Broker 的工作原理、集群管理和高可用机制。

Broker 概述

什么是 Broker?

Broker 是 Kafka 集群中的服务节点,负责接收生产者发送的消息并持久化到磁盘,同时响应消费者的拉取请求。一个 Kafka 集群由多个 Broker 组成,它们协同工作提供高吞吐量和容错能力。

Broker 职责

  1. 消息存储:接收并持久化消息到磁盘,支持高效顺序写入
  2. 消息复制:管理分区副本同步,保证数据冗余
  3. Leader 选举:处理分区 Leader 选举,保证服务可用性
  4. 元数据管理:维护主题、分区、ACL 等元数据
  5. 消费者协调:协调消费者组 Rebalance,管理偏移量提交

Broker 与客户端的交互

Broker 配置

基本配置

# broker.id(每个 Broker 唯一)
broker.id=0

# 监听地址
listeners=PLAINTEXT://0.0.0.0:9092

# 对外公告地址(客户端连接使用)
advertised.listeners=PLAINTEXT://localhost:9092

# 日志目录(可以配置多个,用逗号分隔)
log.dirs=/var/lib/kafka/logs

# 主题默认配置
num.partitions=3
default.replication.factor=1

内存和性能配置

# JVM 堆内存(建议 4-6GB)
# 环境变量设置: KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"

# 网络和日志处理线程数
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 后台任务线程
background.threads=10

日志配置

# 日志段大小(默认 1GB)
log.segment.bytes=1073741824

# 日志段检查间隔
log.segment.check.interval.ms=300000

# 消息保留时间(默认 7 天)
log.retention.hours=168

# 日志保留大小(-1 表示不限制)
log.retention.bytes=-1

# 日志清理策略
log.cleanup.policy=delete

# 日志刷新配置
log.flush.interval.messages=10000
log.flush.interval.ms=1000

副本配置

# 副本 Fetcher 线程数
num.replica.fetchers=1

# 副本滞后检测
replica.lag.time.max.ms=30000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536

# 最小 ISR 数量
min.insync.replicas=1

控制器(Controller)

什么是控制器?

控制器是 Kafka 集群中的一个特殊 Broker,负责管理整个集群的元数据操作:

  • 分区 Leader 选举:当 Leader 不可用时,选举新的 Leader
  • 主题管理:创建、删除主题
  • 分区重分配:管理分区副本分布
  • Broker 管理:处理 Broker 加入和离开
  • 首选 Leader 选举:触发首选副本成为 Leader

控制器选举

选举过程

  1. Broker 启动时,尝试在 KRaft 中注册为 Controller 候选
  2. 通过 Raft 协议选举出唯一的 Controller
  3. Controller 加载集群元数据并开始管理工作

控制器职责

当 Broker 发生故障时,Controller 负责处理:

KRaft 模式

Kafka 3.0+ 推荐使用 KRaft 模式,不再依赖 Zookeeper。

KRaft 架构

KRaft 模式配置

# KRaft 模式配置
process.roles=broker,controller # 或 broker, controller 分离
node.id=1
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
controller.listener.names=CONTROLLER
controller.quorum.voters=1@localhost:9093,2@localhost:9094,3@localhost:9095

Zookeeper vs KRaft 对比

特性Zookeeper 模式KRaft 模式
依赖需要 Zookeeper 集群无需额外依赖
元数据存储存储在 Zookeeper存储在 Kafka 内部主题
部署复杂度较高更简单
扩展性受限于 Zookeeper更好
稳定性成熟稳定3.0+ 生产可用
Controller 选举通过 Zookeeper通过 Raft 协议

集群管理

Broker 管理

# 查看 Broker 列表
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# 查看 Broker 日志目录
kafka-log-dirs.sh --describe \
--bootstrap-server localhost:9092 \
--broker-list 0,1,2

# 优雅停止 Broker
kafka-server-stop.sh

集群健康检查

# 检查集群元数据
kafka-metadata.sh --snapshot /var/lib/kafka/data/__cluster_metadata-0/00000000000000000000.log \
--command "clusterId;broker;topic"

# 检查分区分布
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--under-replicated-partitions

# 检查离线分区
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--unavailable-partitions

分区重分配

# 1. 创建重分配计划 JSON
cat > topics-to-move.json << EOF
{
"version": 1,
"topics": [
{"topic": "orders"}
]
}
EOF

# 2. 生成分配方案
kafka-reassign-partitions.sh --generate \
--topics-to-move-json-file topics-to-move.json \
--broker-list "0,1,2" \
--bootstrap-server localhost:9092

# 3. 执行重分配
kafka-reassign-partitions.sh --execute \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092

# 4. 验证重分配进度
kafka-reassign-partitions.sh --verify \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092

Leader 选举

Leader 选举机制

选举触发条件

  1. Broker 故障(心跳超时)
  2. 手动触发首选副本选举
  3. 手动指定 Leader

首选副本选举

首选副本是创建分区时指定的第一个副本,通常应成为 Leader:

# 对特定分区执行首选副本选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--topic orders \
--partition 0 \
--election-type preferred

# 对所有分区执行
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--all-topic-partitions \
--election-type preferred

Unclean Leader 选举

当 ISR 为空时,允许从非 ISR 副本中选举 Leader:

# 手动触发 Unclean 选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--election-type unclean \
--topic orders \
--partition 0
注意

Unclean 选举可能导致数据丢失,仅在紧急情况下使用。默认情况下 unclean.leader.election.enable=false

高可用机制

故障检测

Kafka 通过心跳机制检测 Broker 故障:

# Broker 心跳相关配置
# session.timeout.ms: 会话超时时间
# heartbeat.interval.ms: 心跳间隔

# 在 KRaft 模式下
controller.quorum.fetch.timeout.ms=2000

故障处理流程

高可用配置

# 确保最小 ISR 数量
min.insync.replicas=2

# 副本滞后检测
replica.lag.time.max.ms=30000

# 副本 Fetcher 配置
num.replica.fetchers=2
replica.fetch.max.bytes=1048576

# 失败重试
default.replication.factor=3

集群调优

性能优化配置

# 网络线程优化
num.network.threads=8
num.io.threads=16

# 缓冲区优化
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576

# 批量处理
batch.size=32768
linger.ms=10

# 日志优化
log.flush.interval.messages=10000
log.flush.interval.ms=1000

容量规划

指标说明规划建议
存储容量消息保留大小每日数据量 × 保留天数 × 副本数 × 1.2
Broker 数量集群规模根据吞吐量和容错需求确定
分区数量并行度目标吞吐量 / 单分区吞吐量
副本数容错能力2-3(根据重要性确定)

JVM 调优

# 设置 JVM 参数
export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"

# 启动 Kafka
bin/kafka-server-start.sh config/kraft/server.properties

监控关键指标

Broker 指标

指标名称说明正常范围
MessagesInPerSec每秒消息数根据业务预期
BytesInPerSec每秒入站字节< 网络带宽
BytesOutPerSec每秒出站字节< 网络带宽
UnderReplicatedPartitions复制不足分区数0
OfflinePartitionsCount离线分区数0
ActiveControllerCount活跃控制器数1
LeaderElectionRateLeader 选举频率接近 0

JMX 监控

# 启用 JMX
export JMX_PORT=9999
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

# 使用 jmxterm 查看指标
java -jar jmxterm-1.0.0.jar -l localhost:9999

常见问题处理

1. 分区不可用

# 检查副本状态
kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092

# 如果 ISR 为空,可能需要使用 Unclean 选举
# 或等待 Follower 追上 Leader

2. Leader 分布不均

# 检查 Leader 分布
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 | grep Leader

# 执行首选副本选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--all-topic-partitions \
--election-type preferred

3. 磁盘空间不足

# 检查日志目录大小
kafka-log-dirs.sh --describe \
--bootstrap-server localhost:9092

# 清理旧日志(调整保留策略)
kafka-configs.sh --alter \
--entity-type topics \
--entity-name large-topic \
--add-config retention.hours=24 \
--bootstrap-server localhost:9092

4. Broker 无法启动

# 检查日志
tail -f /var/log/kafka/server.log

# 常见原因:
# 1. broker.id 冲突
# 2. 端口被占用
# 3. 日志目录权限问题
# 4. KRaft 元数据损坏

多集群复制

MirrorMaker 2

# mm2.properties
clusters = source, target

source.bootstrap.servers = source-kafka:9092
target.bootstrap.servers = target-kafka:9092

# 复制配置
source->target.enabled = true
source->target.topics = .*
source->target.replication.factor = 3

启动 MirrorMaker 2:

./bin/connect-mirror-maker.sh mm2.properties

小结

  1. Broker 是 Kafka 集群中的服务节点,负责消息存储和复制
  2. Controller 负责集群元数据管理和 Leader 选举
  3. KRaft 模式 简化了 Kafka 部署,不再依赖 Zookeeper
  4. 高可用 通过副本机制和 Leader 选举实现
  5. 合理的 容量规划监控 对生产环境至关重要

下一步

接下来让我们学习 副本机制 的详细原理。