跳到主要内容

Kafka 副本机制

副本机制是 Kafka 实现高可用和数据持久化的核心机制。本章将详细介绍 Kafka 副本的工作原理、同步机制和配置优化。

副本概述

什么是副本?

Kafka 通过将分区数据复制到多个 Broker 来实现高可用。每个分区可以有多个副本,分布在不同的 Broker 上。

┌─────────────────────────────────────────────────────────────┐
│ 分区副本分布 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Topic: orders, Partitions: 3, Replication Factor: 3 │
│ │
│ Partition 0: │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────┐ │
│ │ Broker 1 (L) │───▶│ Broker 2 (F) │───▶│ Broker 3 │ │
│ │ [0,1,2,3] │ │ [0,1,2] │ │ [0,1,2] │ │
│ └───────────────┘ └───────────────┘ └───────────┘ │
│ │
│ Partition 1: │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────┐ │
│ │ Broker 2 (L) │───▶│ Broker 3 (F) │───▶│ Broker 1 │ │
│ │ [0,1,2] │ │ [0,1,2] │ │ [0,1,2] │ │
│ └───────────────┘ └───────────────┘ └───────────┘ │
│ │
│ Partition 2: │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────┐ │
│ │ Broker 3 (L) │───▶│ Broker 1 (F) │───▶│ Broker 2 │ │
│ │ [0,1,2] │ │ [0,1,2] │ │ [0,1,2] │ │
│ └───────────────┘ └───────────────┘ └───────────┘ │
│ │
│ L = Leader (主副本) │
│ F = Follower (从副本) │
│ │
└─────────────────────────────────────────────────────────────┘

副本角色

角色说明职责
Leader主副本处理所有读写请求
Follower从副本被动复制 Leader 数据
ISR同步副本集合与 Leader 保持同步的副本

副本同步机制

同步原理

┌─────────────────────────────────────────────────────────────┐
│ 副本同步机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer ──[消息]──> Leader │
│ │ │ │
│ │ │ 1. 写入本地日志 │
│ │ ▼ │
│ │ ┌─────────┐ │
│ │ │ Log │ │
│ │ │ [0,1,2] │ │
│ │ └─────────┘ │
│ │ │ │
│ │<───[ACK]──────────┘ │
│ │ │
│ │ │ 2. 复制到 Follower │
│ │ ▼ │
│ │ ┌─────────┐ ┌─────────┐ │
│ │ │Follower1│ │Follower2│ │
│ │ │ [0,1] │ │ [0,1] │ │
│ │ └─────────┘ └─────────┘ │
│ │ │
│ │ │ 3. 等待 ISR 确认 │
│ │ ▼ │
│ │ ┌─────────┐ │
│ │ │ ISR │ │
│ │ │ [L,F1,F2] │
│ │ └─────────┘ │
│ │ │ │
│ │<───[ACK]──────────┘ │
│ │ │
│ ▼ │
│ 消息确认完成 │
│ │
└─────────────────────────────────────────────────────────────┘

同步流程

  1. 写入 Leader:生产者发送消息,Leader 写入本地日志
  2. 复制到 Follower:Follower 主动从 Leader 拉取消息
  3. ISR 确认:等待 ISR 中所有副本确认后,返回 ACK 给生产者

副本Fetcher

// Follower 使用 Fetch 协议拉取消息
// 关键配置
props.put("replica.socket.timeout.ms", 30000);
props.put("replica.socket.receive.buffer.bytes", 65536);
props.put("replica.fetch.max.bytes", 1048576);
props.put("replica.fetch.min.bytes", 1);
props.put("replica.fetch.wait.max.ms", 500);

ISR(In-Sync Replicas)

什么是 ISR?

ISR(In-Sync Replicas)是当前与 Leader 保持同步的副本集合。只有 ISR 中的副本才有资格成为新的 Leader。

┌─────────────────────────────────────────────────────────────┐
│ ISR 机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 场景:分区 P0,副本分布在 Broker 1,2,3 │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 初始状态 │ │
│ │ Broker 1: Leader, ISR=[1,2,3] │ │
│ │ Broker 2: Follower, LogEndOffset=100 │ │
│ │ Broker 3: Follower, LogEndOffset=100 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Broker 2 滞后(网络延迟) │ │
│ │ Broker 1: Leader, ISR=[1,3] │ │
│ │ Broker 2: 滞后, LEO=80 │ │
│ │ Broker 3: Follower, LEO=100 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Broker 2 恢复同步 │ │
│ │ Broker 1: Leader, ISR=[1,2,3] │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

ISR 判定条件

# 判定副本是否在 ISR 中的时间阈值(默认 30s)
replica.lag.time.max.ms=30000

# 只有在 ISR 中的副本才能参与 Leader 选举
# 最小 ISR 数量(默认 1)
min.insync.replicas=1

min.insync.replicas

# 生产环境建议设置为 2
min.insync.replicas=2

# 配合 acks=all 使用
props.put("acks", "all");

作用:

  • 确保消息写入足够多的副本
  • 提高数据可靠性
  • 牺牲一定的可用性

消息确认机制

acks 配置

// acks=0:不等待确认
props.put("acks", "0");
// 最高吞吐,最低可靠
// 适用于可丢失消息(如日志收集)

// acks=1:Leader 确认
props.put("acks", "1");
// 默认配置
// Leader 写入成功即返回,可能丢失数据

// acks=all:所有 ISR 确认
props.put("acks", "all");
// 最高可靠,稍低吞吐
// 配合 min.insync.replicas 使用

可靠性对比

acks可靠性吞吐量数据丢失场景
0最低最高Broker 故障
1中等Leader 故障
all最高ISR 全部故障

配置建议

// 高可靠配置
Properties highReliabilityProps = new Properties();
highReliabilityProps.put("bootstrap.servers", "localhost:9092");
highReliabilityProps.put("key.serializer", "StringSerializer");
highReliabilityProps.put("value.serializer", "StringSerializer");

// 确保消息复制到多个副本
highReliabilityProps.put("acks", "all");

// 最小 ISR
highReliabilityProps.put("min.insync.replicas", "2");

// 重试
highReliabilityProps.put("retries", 3);
highReliabilityProps.put("enable.idempotence", "true");

Leader 选举

选举触发条件

  1. Leader 故障:Controller 检测到 Leader 不可用
  2. 控制器变更:新的 Broker 成为 Controller
  3. 手动触发:执行手动选举

选举策略

// 选举策略配置
// 1. Controller 选举 Leader(默认)
// 2. 从 ISR 中选择

// 优先副本
// AR (Assigned Replicas) 中的第一个在 ISR 中的副本
// 优先成为 Leader

Unclean Leader 选举

# 允许非 ISR 副本成为 Leader(危险)
# 默认关闭
unclean.leader.election.enable=false

# 紧急情况下启用
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--election-type unclean \
--topic my-topic \
--partition 0
警告

Unclean 选举可能导致数据丢失,仅在可用性优先且可接受数据丢失的场景使用。

副本分配策略

自动分配

Kafka 自动分配副本到 Broker,遵循以下原则:

  1. 副本分散:副本分布在不同 Broker
  2. Leader 均衡:Leader 均匀分布
  3. 机架感知:考虑机架位置(如果配置)

手动分配

# 创建主题时指定副本分配
kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3 \
--replica-assignment "1:2,2:3,3:1,1:3,2:1,3:2"

副本重分配

# 1. 生成重分配方案
cat > reassign.json << EOF
{
"partitions": [
{"topic": "my-topic", "partition": 0, "replicas": [1,2,3]},
{"topic": "my-topic", "partition": 1, "replicas": [2,3,1]}
],
"version": 1
}
EOF

# 2. 执行重分配
kafka-reassign-partitions.sh --execute \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092

# 3. 验证
kafka-reassign-partitions.sh --verify \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092

副本监控

关键指标

# 查看副本状态
kafka-topics.sh --describe \
--topic my-topic \
--bootstrap-server localhost:9092

输出说明:

Topic: my-topic    Partition: 0    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
↑ ↑
│ AR (Assigned Replicas)
│ ISR (In-Sync Replicas)

常见问题

问题原因解决方案
ISR 数量减少Follower 滞后或故障检查网络和性能
副本不同步复制速度慢调整 replica.lag.time.max.ms
Leader 不均衡分配不均执行首选副本选举

监控脚本

#!/bin/bash
# 检查副本状态

for topic in $(kafka-topics.sh --list --bootstrap-server localhost:9092); do
echo "=== Topic: $topic ==="
kafka-topics.sh --describe --topic $topic --bootstrap-server localhost:9092

# 检查 UnderReplicated 分区
under_replicated=$(kafka-topics.sh --describe --topic $topic --bootstrap-server localhost:9092 | grep -c "UnderReplicated")
if [ $under_replicated -gt 0 ]; then
echo "WARNING: $under_replicated partitions under-replicated"
fi
done

故障恢复

Broker 故障

恢复后同步

  1. Broker 重新上线
  2. 从新的 Leader 同步数据
  3. 追上 Leader 后重新加入 ISR

数据恢复

# 检查日志截断
kafka-dump-log.sh --files /var/lib/kafka/data/topic-0/00000000000000000000.log \
--print-data-log

# 修复副本
# 如果发现副本数据不一致,需要重新同步

最佳实践

副本配置

# 生产环境推荐配置
default.replication.factor=3
min.insync.replicas=2

# 允许 unclean 选举(可选)
unclean.leader.election.enable=false

容量规划

场景副本数min.insync.replicas
开发测试11
一般生产21
高可用32

监控告警

  • ISR 变化告警
  • UnderReplicated 分区告警
  • 副本滞后告警

小结

  1. 副本机制是 Kafka 高可用的基础
  2. Leader 处理读写,Follower 被动复制
  3. ISR 是与 Leader 同步的副本集合
  4. acks 配置影响消息可靠性
  5. 合理配置副本数min.insync.replicas保证高可用

下一步

接下来让我们学习 Kafka Streams 流处理框架。