跳到主要内容

Kafka Broker 和集群

本章将详细介绍 Kafka Broker 的工作原理、集群管理和高可用机制。

Broker 概述

什么是 Broker?

Broker 是 Kafka 集群中的服务节点,负责接收生产者发送的消息并持久化到磁盘,同时响应消费者的拉取请求。

┌─────────────────────────────────────────────────────────────┐
│ Kafka 集群架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Zookeeper / KRaft │ │
│ │ (集群元数据管理) │ │
│ │ - Broker 注册 │ │
│ │ - 主题/分区元数据 │ │
│ │ - ACL 配置 │ │
│ │ - 控制器选举 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
│ │ :9092 │ │ :9092 │ │ :9092 │ │
│ │ │ │ │ │ │ │
│ │ Leader: │ │ Leader: │ │ Leader: │ │
│ │ P0, P3 │ │ P1, P4 │ │ P2, P5 │ │
│ │ │ │ │ │ │ │
│ │ Follower:│ │ Follower:│ │ Follower:│ │
│ │ P1, P2 │ │ P0, P2 │ │ P0, P1 │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

Broker 职责

  1. 消息存储:接收并持久化消息到磁盘
  2. 消息复制:管理分区副本同步
  3. Leader 选举:处理分区 Leader 选举
  4. 元数据管理:维护主题、分区、ACL 等元数据
  5. 消费者协调:协调消费者组 Rebalance

Broker 配置

基本配置

# broker.id(每个 Broker 唯一)
broker.id=0

# 监听地址
listeners=PLAINTEXT://0.0.0.0:9092

# 对外公告地址(客户端连接使用)
advertised.listeners=PLAINTEXT://localhost:9092

# 日志目录(可以配置多个,用逗号分隔)
log.dirs=/var/lib/kafka/logs

# 主题默认配置
num.partitions=3
default.replication.factor=1

内存和性能配置

# JVM 堆内存(建议 4-6GB)
KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"

# 页面缓存(建议留足够内存给 OS)
# Kafka 大量使用页面缓存,OS 内存要充足

# 网络和日志处理线程数
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

日志配置

# 日志段大小(默认 1GB)
log.segment.bytes=1073741824

# 日志段检查间隔
log.segment.check.interval.ms=300000

# 消息保留时间(默认 7 天)
log.retention.hours=168

# 日志保留大小(-1 表示不限制)
log.retention.bytes=-1

# 日志压缩
log.cleanup.policy=delete

# 最小清洁比率(用于压缩)
log.cleaner.min.cleanable.ratio=0.5

副本配置

# 副本Fetcher线程数
replica.lag.time.max.ms=30000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536

# 最小 ISR 数量
min.insync.replicas=1

# 控制器副本数
controller.quorum.voters=1@localhost:9093

控制器(Controller)

什么是控制器?

控制器是 Kafka 集群中的一个特殊 Broker,负责管理整个集群的元数据操作:

  • 分区 Leader 选举:当 Leader 不可用时,选举新的 Leader
  • 主题管理:创建、删除主题
  • 分区重分配:管理分区副本分配
  • Broker 管理:处理 Broker 加入和离开
┌─────────────────────────────────────────────────────────────┐
│ 控制器职责 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Controller (Broker 1) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Topic │ │ Partition │ │ │
│ │ │ Manager │ │ Manager │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌─────────────────────────────────────────┐ │ │
│ │ │ Leader Election │ │ │
│ │ │ (分区 Leader 故障时选举新 Leader) │ │ │
│ │ └─────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

控制器选举

KRaft 模式

Kafka 3.0+ 推荐使用 KRaft 模式,不再依赖 Zookeeper:

# KRaft 模式配置
process.roles=broker,controller
node.id=1
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
controller.listener.names=CONTROLLER
controller.quorum.voters=1@localhost:9093

对比:

特性Zookeeper 模式KRaft 模式
依赖需要 Zookeeper无需额外依赖
元数据存储在 ZK存储在 Kafka
部署更复杂更简单
扩展性受限更好
稳定性成熟稳定3.0+ 生产可用

集群管理

Broker 管理

# 查看 Broker 列表
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# 查看 Broker 日志
tail -f /var/lib/kafka/logs/server.log

# 优雅停止 Broker
kafka-server-stop.sh

主题管理

# 创建主题(指定分区和副本)
kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3

# 查看主题
kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092

# 修改分区数
kafka-topics.sh --alter \
--topic my-topic \
--partitions 9 \
--bootstrap-server localhost:9092

# 删除主题
kafka-topics.sh --delete \
--topic my-topic \
--bootstrap-server localhost:9092

分区重分配

# 生成分区重分配方案
kafka-reassign-partitions.sh --generate \
--topics-to-move-json-file topics.json \
--bootstrap-server localhost:9092 \
--broker-list "0,1,2"

# 执行重分配
kafka-reassign-partitions.sh --execute \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092

# 验证重分配
kafka-reassign-partitions.sh --verify \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092

Leader 选举

Leader 选举机制

┌─────────────────────────────────────────────────────────────┐
│ Leader 选举过程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 场景:分区 P0 的 Leader (Broker 1) 故障 │
│ │
│ 1. Controller 检测到 Leader 不可用 │
│ │ │
│ ▼ │
│ 2. 检查 ISR(In-Sync Replicas)列表 │
│ ┌─────────────────────────────────────┐ │
│ │ ISR: [Broker 2, Broker 3] │ │
│ │ (与 Leader 保持同步的副本) │ │
│ └─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 3. 从 ISR 中选择新的 Leader │
│ - 优先选择 AR(Assigned Replicas)中最早的 │
│ - 确保新 Leader 有完整的消息 │
│ │ │
│ ▼ │
│ 4. 更新元数据 │
│ ┌─────────────────────────────────────┐ │
│ │ Partition: P0 │ │
│ │ Leader: Broker 2 │ │
│ │ ISR: [Broker 2, Broker 3] │ │
│ └─────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

首选副本选举

# 执行首选副本选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--topic my-topic \
--partition 0 \
--election-type preferred

手动指定 Leader

# 手动指定分区 Leader
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--election-type unclean \
--topic my-topic \
--partition 1
注意

使用 --election-type unclean 可能导致数据丢失,仅在紧急情况下使用。

高可用机制

故障检测

故障处理流程

  1. Broker 故障

    • Controller 检测到 Broker 离线
    • 触发受影响分区的 Leader 选举
    • 选举新 Leader,更新 ISR
  2. 网络抖动

    • 缩短 replica.lag.time.max.ms 检测时间
    • 避免频繁切换
  3. 磁盘故障

    • 配置多个日志目录
    • 使用 RAID

配置高可用

# 确保最小 ISR 数量
min.insync.replicas=2

# 副本滞后检测
replica.lag.time.max.ms=30000
replica.socket.timeout.ms=30000

# 失败重试
default.replication.factor=3

集群调优

性能优化配置

# 网络线程优化
num.network.threads=8

# IO 线程优化
num.io.threads=16

# 缓冲区优化
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576

# 批量处理
batch.size=32768
linger.ms=10

容量规划

指标说明规划建议
存储容量消息保留大小每日数据量 × 保留天数 × 2
Broker 数量集群规模根据吞吐量和容错需求
分区数量并行度目标吞吐量 / 单分区吞吐
副本数容错能力2-3(根据重要性)

监控关键指标

# 查看 Broker 指标
kafka-broker-metrics.sh --bootstrap-server localhost:9092

# 关键指标
# - UnderReplicatedPartitions: 复制滞后的分区
# - OfflinePartitionsCount: 离线分区数
# - ActiveControllerCount: 活跃控制器数量
# - LeaderElectionRate: Leader 选举频率

常见问题处理

1. 分区不可用

# 检查副本状态
kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092

# 检查 ISR
# 如果 ISR 为空,可能需要使用 unclean 选举

2. Leader 分布不均

# 均衡 Leader 分布
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--all-topics \
--election-type preferred

3. 磁盘空间不足

# 清理旧日志
kafka-logDirs.sh --describe \
--bootstrap-server localhost:9092

# 删除主题
kafka-topics.sh --delete \
--topic old-topic \
--bootstrap-server localhost:9092

多集群复制

MirrorMaker 2

# 创建复制连接器
cat connect-mirror-source.properties

bootstrap.servers=source-cluster:9092
topics=topic1,topic2
groups=group1
replication.factor=3

# 启动 MirrorMaker
./bin/connect-mirror-maker.sh connect-mirror-source.properties

集群联邦

小结

  1. Broker 是 Kafka 集群中的服务节点,负责消息存储和复制
  2. 控制器负责集群元数据管理和 Leader 选举
  3. KRaft 模式简化了 Kafka 部署,不再依赖 Zookeeper
  4. 高可用通过副本机制和 Leader 选举实现
  5. 合理的容量规划监控对生产环境至关重要

下一步

接下来让我们学习 副本机制 的详细原理。