Kafka 副本机制
副本机制是 Kafka 实现高可用和数据持久化的核心机制。本章将详细介绍 Kafka 副本的工作原理、同步机制和配置优化。
副本概述
什么是副本?
Kafka 通过将分区数据复制到多个 Broker 来实现高可用。每个分区可以有多个副本,分布在不同的 Broker 上。
┌─────────────────────────────────────────────────────────────┐
│ 分区副本分布 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Topic: orders, Partitions: 3, Replication Factor: 3 │
│ │
│ Partition 0: │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────┐ │
│ │ Broker 1 (L) │───▶│ Broker 2 (F) │───▶│ Broker 3 │ │
│ │ [0,1,2,3] │ │ [0,1,2] │ │ [0,1,2] │ │
│ └───────────────┘ └───────────────┘ └───────────┘ │
│ │
│ Partition 1: │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────┐ │
│ │ Broker 2 (L) │───▶│ Broker 3 (F) │───▶│ Broker 1 │ │
│ │ [0,1,2] │ │ [0,1,2] │ │ [0,1,2] │ │
│ └───────────────┘ └───────────────┘ └───────────┘ │
│ │
│ Partition 2: │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────┐ │
│ │ Broker 3 (L) │───▶│ Broker 1 (F) │───▶│ Broker 2 │ │
│ │ [0,1,2] │ │ [0,1,2] │ │ [0,1,2] │ │
│ └───────────────┘ └───────────────┘ └───────────┘ │
│ │
│ L = Leader (主副本) │
│ F = Follower (从副本) │
│ │
└─────────────────────────────────────────────────────────────┘
副本角色
| 角色 | 说明 | 职责 |
|---|---|---|
| Leader | 主副本 | 处理所有读写请求 |
| Follower | 从副本 | 被动复制 Leader 数据 |
| ISR | 同步副本集合 | 与 Leader 保持同步的副本 |
副本同步机制
同步原理
┌─────────────────────────────────────────────────────────────┐
│ 副本同步机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer ──[消息]──> Leader │
│ │ │ │
│ │ │ 1. 写入本地日志 │
│ │ ▼ │
│ │ ┌─────────┐ │
│ │ │ Log │ │
│ │ │ [0,1,2] │ │
│ │ └─────────┘ │
│ │ │ │
│ │<───[ACK]──────────┘ │
│ │ │
│ │ │ 2. 复制到 Follower │
│ │ ▼ │
│ │ ┌─────────┐ ┌─────────┐ │
│ │ │Follower1│ │Follower2│ │
│ │ │ [0,1] │ │ [0,1] │ │
│ │ └─────────┘ └─────────┘ │
│ │ │
│ │ │ 3. 等待 ISR 确认 │
│ │ ▼ │
│ │ ┌─────────┐ │
│ │ │ ISR │ │
│ │ │ [L,F1,F2] │
│ │ └─────────┘ │
│ │ │ │
│ │<───[ACK]──────────┘ │
│ │ │
│ ▼ │
│ 消息确认完成 │
│ │
└─────────────────────────────────────────────────────────────┘
同步流程
- 写入 Leader:生产者发送消息,Leader 写入本地日志
- 复制到 Follower:Follower 主动从 Leader 拉取消息
- ISR 确认:等待 ISR 中所有副本确认后,返回 ACK 给生产者
副本Fetcher
// Follower 使用 Fetch 协议拉取消息
// 关键配置
props.put("replica.socket.timeout.ms", 30000);
props.put("replica.socket.receive.buffer.bytes", 65536);
props.put("replica.fetch.max.bytes", 1048576);
props.put("replica.fetch.min.bytes", 1);
props.put("replica.fetch.wait.max.ms", 500);
ISR(In-Sync Replicas)
什么是 ISR?
ISR(In-Sync Replicas)是当前与 Leader 保持同步的副本集合。只有 ISR 中的副本才有资格成为新的 Leader。
┌─────────────────────────────────────────────────────────────┐
│ ISR 机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 场景:分区 P0,副本分布在 Broker 1,2,3 │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 初始状态 │ │
│ │ Broker 1: Leader, ISR=[1,2,3] │ │
│ │ Broker 2: Follower, LogEndOffset=100 │ │
│ │ Broker 3: Follower, LogEndOffset=100 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Broker 2 滞后(网络延迟) │ │
│ │ Broker 1: Leader, ISR=[1,3] │ │
│ │ Broker 2: 滞后, LEO=80 │ │
│ │ Broker 3: Follower, LEO=100 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Broker 2 恢复同步 │ │
│ │ Broker 1: Leader, ISR=[1,2,3] │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
ISR 判定条件
# 判定副本是否在 ISR 中的时间阈值(默认 30s)
replica.lag.time.max.ms=30000
# 只有在 ISR 中的副本才能参与 Leader 选举
# 最小 ISR 数量(默认 1)
min.insync.replicas=1
min.insync.replicas
# 生产环境建议设置为 2
min.insync.replicas=2
# 配合 acks=all 使用
props.put("acks", "all");
作用:
- 确保消息写入足够多的副本
- 提高数据可靠性
- 牺牲一定的可用性
消息确认机制
acks 配置
// acks=0:不等待确认
props.put("acks", "0");
// 最高吞吐,最低可靠
// 适用于可丢失消息(如日志收集)
// acks=1:Leader 确认
props.put("acks", "1");
// 默认配置
// Leader 写入成功即返回,可能丢失数据
// acks=all:所有 ISR 确认
props.put("acks", "all");
// 最高可靠,稍低吞吐
// 配合 min.insync.replicas 使用
可靠性对比
| acks | 可靠性 | 吞吐量 | 数据丢失场景 |
|---|---|---|---|
| 0 | 最低 | 最高 | Broker 故障 |
| 1 | 中等 | 高 | Leader 故障 |
| all | 最高 | 中 | ISR 全部故障 |
配置建议
// 高可靠配置
Properties highReliabilityProps = new Properties();
highReliabilityProps.put("bootstrap.servers", "localhost:9092");
highReliabilityProps.put("key.serializer", "StringSerializer");
highReliabilityProps.put("value.serializer", "StringSerializer");
// 确保消息复制到多个副本
highReliabilityProps.put("acks", "all");
// 最小 ISR
highReliabilityProps.put("min.insync.replicas", "2");
// 重试
highReliabilityProps.put("retries", 3);
highReliabilityProps.put("enable.idempotence", "true");
Leader 选举
选举触发条件
- Leader 故障:Controller 检测到 Leader 不可用
- 控制器变更:新的 Broker 成为 Controller
- 手动触发:执行手动选举
选举策略
// 选举策略配置
// 1. Controller 选举 Leader(默认)
// 2. 从 ISR 中选择
// 优先副本
// AR (Assigned Replicas) 中的第一个在 ISR 中的副本
// 优先成为 Leader
Unclean Leader 选举
# 允许非 ISR 副本成为 Leader(危险)
# 默认关闭
unclean.leader.election.enable=false
# 紧急情况下启用
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--election-type unclean \
--topic my-topic \
--partition 0
警告
Unclean 选举可能导致数据丢失,仅在可用性优先且可接受数据丢失的场景使用。
副本分配策略
自动分配
Kafka 自动分配副本到 Broker,遵循以下原则:
- 副本分散:副本分布在不同 Broker
- Leader 均衡:Leader 均匀分布
- 机架感知:考虑机架位置(如果配置)
手动分配
# 创建主题时指定副本分配
kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3 \
--replica-assignment "1:2,2:3,3:1,1:3,2:1,3:2"
副本重分配
# 1. 生成重分配方案
cat > reassign.json << EOF
{
"partitions": [
{"topic": "my-topic", "partition": 0, "replicas": [1,2,3]},
{"topic": "my-topic", "partition": 1, "replicas": [2,3,1]}
],
"version": 1
}
EOF
# 2. 执行重分配
kafka-reassign-partitions.sh --execute \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092
# 3. 验证
kafka-reassign-partitions.sh --verify \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092
副本监控
关键指标
# 查看副本状态
kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092
输出说明:
Topic: my-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
↑ ↑
│ AR (Assigned Replicas)
│ ISR (In-Sync Replicas)
常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| ISR 数量减少 | Follower 滞后或故障 | 检查网络和性能 |
| 副本不同步 | 复制速度慢 | 调整 replica.lag.time.max.ms |
| Leader 不均衡 | 分配不均 | 执行首选副本选举 |
监控脚本
#!/bin/bash
# 检查副本状态
for topic in $(kafka-topics.sh --list --bootstrap-server localhost:9092); do
echo "=== Topic: $topic ==="
kafka-topics.sh --describe --topic $topic --bootstrap-server localhost:9092
# 检查 UnderReplicated 分区
under_replicated=$(kafka-topics.sh --describe --topic $topic --bootstrap-server localhost:9092 | grep -c "UnderReplicated")
if [ $under_replicated -gt 0 ]; then
echo "WARNING: $under_replicated partitions under-replicated"
fi
done
故障恢复
Broker 故障
恢复后同步
- Broker 重新上线
- 从新的 Leader 同步数据
- 追上 Leader 后重新加入 ISR
数据恢复
# 检查日志截断
kafka-dump-log.sh --files /var/lib/kafka/data/topic-0/00000000000000000000.log \
--print-data-log
# 修复副本
# 如果发现副本数据不一致,需要重新同步
最佳实践
副本配置
# 生产环境推荐配置
default.replication.factor=3
min.insync.replicas=2
# 允许 unclean 选举(可选)
unclean.leader.election.enable=false
容量规划
| 场景 | 副本数 | min.insync.replicas |
|---|---|---|
| 开发测试 | 1 | 1 |
| 一般生产 | 2 | 1 |
| 高可用 | 3 | 2 |
监控告警
- ISR 变化告警
- UnderReplicated 分区告警
- 副本滞后告警
小结
- 副本机制是 Kafka 高可用的基础
- Leader 处理读写,Follower 被动复制
- ISR 是与 Leader 同步的副本集合
- acks 配置影响消息可靠性
- 合理配置副本数和min.insync.replicas保证高可用
下一步
接下来让我们学习 Kafka Streams 流处理框架。