Kafka 消息队列
Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,后来成为 Apache 顶级项目。Kafka 以高吞吐、低延迟、高可用著称,是现代大数据架构的核心组件之一。
什么是 Kafka?
Kafka 是一个分布式事件流平台,具备三个核心能力:
- 发布和订阅:可以像消息队列一样发布和订阅消息流
- 持久化存储:以容错的方式存储消息流
- 流处理:在消息发生时或事后进行处理
Kafka 与传统消息队列的区别
| 维度 | Kafka | 传统消息队列(如 RabbitMQ) |
|---|---|---|
| 消息模型 | 发布-订阅模型,消息持久化 | 点对点模型,消息消费后删除 |
| 吞吐量 | 百万级/秒 | 万级/秒 |
| 消息存储 | 基于磁盘,可配置保留时间 | 内存为主,消费后删除 |
| 消费模式 | 拉取模式,消费者主动拉取 | 推送模式,主动推送给消费者 |
| 消息回溯 | 支持重复消费 | 不支持 |
| 适用场景 | 大数据、日志收集、流处理 | 传统业务消息、任务队列 |
Kafka 应用场景
| 场景 | 说明 |
|---|---|
| 日志收集 | 集中收集各服务的日志,统一处理 |
| 消息系统 | 服务间异步通信,解耦系统 |
| 流处理 | 实时数据处理、实时分析 |
| 事件溯源 | 记录系统状态变更事件 |
| 网站活动追踪 | 追踪用户行为,实时分析 |
核心概念
消息(Message/Event)
消息是 Kafka 中数据的基本单位,包含以下组成部分:
- Key(键):可选,用于分区路由和日志压缩
- Value(值):消息的实际内容
- Timestamp(时间戳):消息产生的时间
- Headers(头部):可选的元数据
消息示例:
Key: "user-001"
Value: {"action": "login", "ip": "192.168.1.1"}
Timestamp: 1704067200000
Headers: {"source": "web-app"}
主题(Topic)
主题是消息的逻辑分类,类似于数据库中的表。生产者将消息发送到特定主题,消费者订阅主题来接收消息。
主题命名示例:
- user-events(用户事件)
- order-payments(订单支付)
- system-logs(系统日志)
分区(Partition)
每个主题可以分为多个分区,分区是 Kafka 实现高吞吐和可扩展的关键:
分区的作用:
- 并行处理:多个分区可以被多个消费者并行消费
- 负载均衡:分区分布在不同的 Broker 上
- 有序性保证:同一分区内的消息有序
分区策略:
- 指定分区:直接指定消息发送到哪个分区
- 按键哈希:根据 Key 的哈希值确定分区(保证相同 Key 的消息进入同一分区)
- 轮询:轮流将消息分配到各分区(无 Key 时的默认策略)
主题:user-events(3个分区)
┌─────────────────────────────────────────────┐
│ Partition 0: [msg1] [msg4] [msg7] ... │
├─────────────────────────────────────────────┤
│ Partition 1: [msg2] [msg5] [msg8] ... │
├─────────────────────────────────────────────┤
│ Partition 2: [msg3] [msg6] [msg9] ... │
└─────────────────────────────────────────────┘
Broker
Broker 是 Kafka 集群中的服务节点,负责:
- 存储消息:每个 Broker 存储部分分区数据
- 处理请求:处理生产者和消费者的请求
- 副本同步:在副本之间同步数据
一个 Kafka 集群通常由多个 Broker 组成,每个 Broker 用 ID 标识。
副本(Replica)
副本是分区的备份,用于保证数据的高可用:
- Leader 副本:处理所有读写请求
- Follower 副本:从 Leader 同步数据,不处理请求
当 Leader 故障时,会从 Follower 中选举新的 Leader。
分区副本分布示例(复制因子=3):
Broker 1 Broker 2 Broker 3
┌────────┐ ┌────────┐ ┌────────┐
│P0 │ │P0 │ │P0 │
│(Leader)│ │(Follower)│ │(Follower)│
├────────┤ ├────────┤ ├────────┤
│P1 │ │P1 │ │P1 │
│(Follower)│ │(Leader)│ │(Follower)│
└────────┘ └────────┘ └────────┘
消费者组(Consumer Group)
消费者组是实现消息广播和单播的关键机制:
- 同一组内:每个分区只被一个消费者消费(单播)
- 不同组间:每个组都能收到消息的完整副本(广播)
主题(4个分区)被消费者组A消费:
分区: P0 P1 P2 P3
↓ ↓ ↓ ↓
消费者组A: C1 C1 C2 C2
如果添加消费者C3到组A:
分区: P0 P1 P2 P3
↓ ↓ ↓ ↓
消费者组A: C1 C2 C3 C3(重新分配)
共享组(Share Group,Kafka 4.0+)
共享组是 Kafka 4.0 引入的早期访问功能(KIP-932),目前不适合生产环境使用。该功能仍在积极开发中,API 和行为可能在未来版本中发生变化。
共享组是一种全新的消费模式,与传统的消费者组并存,提供了更灵活的消息消费方式。
共享组 vs 消费者组
| 特性 | 消费者组 | 共享组 |
|---|---|---|
| 分区分配 | 每个分区只分配给一个消费者 | 多个消费者可以从同一分区消费 |
| 消息顺序 | 保证分区内消息顺序 | 不保证消息顺序 |
| 消费模式 | 竞争消费(每条消息只被一个消费者处理) | 协作消费(支持更灵活的消费模式) |
| 适用场景 | 需要消息顺序保证的场景 | 任务队列、工作分发场景 |
| 消息确认 | Offset 提交 | 记录级确认(ACCEPT/RELEASE/REJECT) |
工作原理
共享组的核心机制是"获取锁"(Acquisition Lock):
- 消息获取:消费者调用
poll()时,消息被加上一个有时限的获取锁 - 锁超时:默认 30 秒,超时后锁自动释放,消息可被其他消费者获取
- 消息确认:消费者处理完消息后,可以确认(ACCEPT)、释放(RELEASE)或拒绝(REJECT)
共享组工作流程:
┌─────────────────────────────────────────────────────────────┐
│ Kafka Topic │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Partition 0: [msg1] [msg2] [msg3] [msg4] [msg5] │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────┼─────────────┐
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│Consumer 1│ │Consumer 2│ │Consumer 3│
│ msg1(L) │ │ msg2(L) │ │ msg3(L) │
│ msg4(L) │ │ msg5(L) │ │ │
└──────────┘ └──────────┘ └──────────┘
说明:(L) 表示消息被加锁,同一分区的消息可以同时被多个消费者消费
确认类型
共享组支持三种确认类型:
| 类型 | 说明 | 使用场景 |
|---|---|---|
| ACCEPT | 消息处理成功 | 正常处理完成 |
| RELEASE | 释放消息,可被重新消费 | 临时性错误,希望其他消费者重试 |
| REJECT | 拒绝消息,不再重新消费 | 永久性错误,消息无法处理 |
使用示例
隐式确认(批量确认):
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Properties;
import java.util.Arrays;
public class ShareConsumerImplicitAck {
public static void main(String[] args) {
// 配置共享组消费者
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-share-group"); // 共享组ID
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// 创建共享组消费者(注意使用 KafkaShareConsumer 而非 KafkaConsumer)
KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
// 订阅主题
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
// 拉取消息
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息
processMessage(record);
// 不需要显式确认,下一次 poll 或 commitSync 时自动确认
} catch (Exception e) {
// 处理失败,消息会在锁超时后自动释放
System.err.println("处理失败: " + e.getMessage());
}
}
// 可选:显式提交确认
consumer.commitSync();
}
} finally {
consumer.close();
}
}
private static void processMessage(ConsumerRecord<String, String> record) {
System.out.printf("处理消息: offset=%d, key=%s, value=%s%n",
record.offset(), record.key(), record.value());
}
}
显式确认(记录级确认):
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import java.time.Duration;
import java.util.Properties;
import java.util.Arrays;
public class ShareConsumerExplicitAck {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-share-group");
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息
processMessage(record);
// 确认处理成功
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (TransientException e) {
// 临时性错误,释放消息让其他消费者重试
System.err.println("临时错误,释放消息: " + e.getMessage());
consumer.acknowledge(record, AcknowledgeType.RELEASE);
} catch (PermanentException e) {
// 永久性错误,拒绝消息
System.err.println("永久错误,拒绝消息: " + e.getMessage());
consumer.acknowledge(record, AcknowledgeType.REJECT);
}
}
// 提交所有确认
consumer.commitSync();
}
} finally {
consumer.close();
}
}
private static void processMessage(ConsumerRecord<String, String> record)
throws TransientException, PermanentException {
// 业务处理逻辑
}
}
// 自定义异常类型
class TransientException extends Exception {
public TransientException(String message) { super(message); }
}
class PermanentException extends Exception {
public PermanentException(String message) { super(message); }
}
配置参数
共享组特有的配置参数:
| 参数 | 默认值 | 说明 |
|---|---|---|
group.share.record.lock.duration.ms | 30000 | 获取锁的持续时间(毫秒) |
group.share.isolation.level | read_uncommitted | 隔离级别,控制是否读取事务消息 |
管理命令
Kafka 4.0 提供了新的管理工具:
# 列出所有共享组
kafka-share-groups.sh --bootstrap-server localhost:9092 --list
# 查看共享组详情
kafka-share-groups.sh --bootstrap-server localhost:9092 \
--group my-share-group --describe
# 查看共享组成员
kafka-share-groups.sh --bootstrap-server localhost:9092 \
--group my-share-group --members --verbose
# 查看所有类型的组(消费者组 + 共享组)
kafka-groups.sh --bootstrap-server localhost:9092 --list
# 重置共享组的 Offset
kafka-share-groups.sh --bootstrap-server localhost:9092 \
--group my-share-group \
--topic my-topic \
--reset-offsets \
--to-earliest \
--execute
适用场景
适合共享组的场景:
- 任务队列:将任务分发给多个工作进程处理
- 工作分发:并行处理不需要严格顺序的工作项
- 微服务通信:服务间的请求-响应模式
- 负载均衡消费:消费者处理能力不均匀时自动平衡
不适合共享组的场景:
- 需要严格消息顺序的场景
- 需要精确一次语义的场景(目前不支持)
- 事务性消息处理(read_committed 隔离级别尚未支持)
注意事项
- 顺序性牺牲:共享组不保证消息顺序,如果业务需要顺序保证,请使用传统消费者组
- 事务支持有限:目前
read_committed隔离级别尚未完全支持 - 生产环境慎用:作为早期访问功能,API 可能变化,不建议在生产环境使用
- Broker 版本要求:需要 Kafka 4.0+ 的 Broker 才能使用共享组功能
- 客户端要求:需要使用 Kafka 4.0+ 的客户端库
- 协议版本:共享组使用新的 RPC 协议,与旧客户端不兼容
共享组 vs 传统消息队列
共享组使 Kafka 能够更好地与传统消息队列(如 RabbitMQ、ActiveMQ)竞争:
| 特性 | Kafka 共享组 | RabbitMQ | ActiveMQ |
|---|---|---|---|
| 消息模型 | 发布-订阅 + 队列 | 队列为主 | 队列为主 |
| 吞吐量 | 百万级/秒 | 万级/秒 | 万级/秒 |
| 消息持久化 | 磁盘持久化,支持回溯 | 内存+磁盘 | 内存+磁盘 |
| 消费确认 | ACCEPT/RELEASE/REJECT | ACK/NACK | ACK |
| 消息顺序 | 不保证 | 单消费者时保证 | 单消费者时保证 |
| 扩展性 | 水平扩展 | 有限扩展 | 有限扩展 |
迁移建议:
如果你的应用从传统消息队列迁移到 Kafka,可以考虑:
- 任务队列场景 → 使用共享组
- 事件流场景 → 使用传统消费者组
- 混合场景 → 同一 Kafka 集群可以同时支持两种模式
Offset
Offset 是消息在分区中的唯一标识,是一个递增的整数:
- 消费者通过 Offset 记录消费位置
- Kafka 不删除已消费的消息,支持重复消费
- Offset 可以保存在 Kafka 内部(__consumer_offsets 主题)或外部
分区中的消息和 Offset:
Offset: 0 1 2 3 4
消息: [msg1] [msg2] [msg3] [msg4] [msg5]
消费者A的读取位置: ↑
offset=2
Kafka 架构
整体架构
Kafka 采用分布式架构,主要组件包括:
┌─────────────────────────────────────────────────────────────┐
│ Kafka 集群 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Broker1 │ │ Broker2 │ │ Broker3 │ │
│ │ P0(L) │ │ P1(L) │ │ P2(L) │ │
│ │ P1(F) │ │ P2(F) │ │ P0(F) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────┘
↑ ↑ ↑
│ │ │
┌───┴───┐ ┌───┴───┐ ┌───┴───┐
│Producer│ │Producer│ │Consumer│
└───────┘ └───────┘ │ Group │
└────────┘
KRaft 模式 vs ZooKeeper 模式
Kafka 有两种部署模式:
ZooKeeper 模式(传统模式)
ZooKeeper 负责:
- 集群元数据管理
- Controller 选举
- Broker 状态监控
- Topic 配置管理
缺点:
- 运维复杂,需要管理两套系统
- ZooKeeper 可能成为性能瓶颈
- 元数据同步延迟较高
KRaft 模式(新模式,Kafka 2.8+)
KRaft 模式使用 Kafka 内置的 Raft 协议管理元数据:
优势:
- 架构简化,无需 ZooKeeper
- 元数据访问延迟降低
- 部署和运维更简单
- 扩展性更好
配置示例:
# KRaft 模式配置
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093
controller.listener.names=CONTROLLER
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
对于新项目,推荐使用 KRaft 模式。Kafka 4.0 已完全移除 ZooKeeper 支持。
消息存储
Kafka 的消息存储设计:
日志段(Log Segment):
- 每个分区由多个日志段组成
- 当前活跃段用于写入,其他段只读
- 日志段达到阈值后滚动创建新段
文件结构:
/kafka-logs/
└── topic-partition/
├── 00000000000000000000.log # 消息数据
├── 00000000000000000000.index # 偏移量索引
├── 00000000000000000000.timeindex # 时间索引
├── 00000000000000000005.log
├── 00000000000000000005.index
└── 00000000000000000005.timeindex
消息保留策略:
# 基于时间保留(默认7天)
log.retention.hours=168
# 基于大小保留(默认-1,不限制)
log.retention.bytes=1073741824
# 日志段大小
log.segment.bytes=1073741824
# 日志压缩(相同Key保留最新值)
log.cleanup.policy=compact
Kafka 安装与配置
快速启动(单机模式)
# 下载 Kafka
wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -xzf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1
# 启动 ZooKeeper(如果使用 ZooKeeper 模式)
bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties &
# 或者使用 KRaft 模式
# 生成集群ID
KAFKA_CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
# 格式化存储
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
# 启动
bin/kafka-server-start.sh config/kraft/server.properties
关键配置参数
Broker 配置
# 基础配置
broker.id=0 # Broker唯一标识
listeners=PLAINTEXT://:9092 # 监听地址
advertised.listeners=PLAINTEXT://host:9092 # 对外地址
num.network.threads=3 # 网络线程数
num.io.threads=8 # IO线程数
# 日志配置
log.dirs=/var/kafka-logs # 日志目录
num.partitions=3 # 默认分区数
default.replication.factor=3 # 默认副本数
log.retention.hours=168 # 消息保留时间
# 性能配置
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 副本配置
num.recovery.threads.per.data.dir=1 # 恢复线程数
replica.fetch.max.bytes=1048576 # 副本拉取最大字节数
replica.fetch.wait.max.ms=500 # 副本拉取等待时间
生产者配置
# 基础配置
bootstrap.servers=localhost:9092 # Kafka集群地址
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# 可靠性配置
acks=all # 确认机制:0, 1, all(-1)
retries=2147483647 # 重试次数(默认Integer.MAX_VALUE)
max.in.flight.requests.per.connection=5 # 连接最大未确认请求数
enable.idempotence=true # 幂等性(推荐开启)
# 批量发送配置
batch.size=16384 # 批量大小(默认16KB)
linger.ms=5 # 等待时间(Kafka 4.0默认从0改为5ms)
buffer.memory=33554432 # 缓冲区大小(默认32MB)
delivery.timeout.ms=120000 # 发送超时时间(默认2分钟)
# 压缩配置
compression.type=snappy # 压缩类型:none, gzip, snappy, lz4, zstd
linger.ms 控制生产者等待多长时间以累积更多消息形成更大的批次。Kafka 4.0 将默认值从 0 改为 5ms,因为更大的批次通常能带来更好的吞吐量,而延迟影响很小。在高吞吐场景下,可以适当增大此值(如 10-50ms)。
消费者配置
# 基础配置
bootstrap.servers=localhost:9092
group.id=my-group # 消费者组ID
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费配置
auto.offset.reset=earliest # 无Offset时的行为:earliest, latest, none
enable.auto.commit=true # 自动提交Offset
auto.commit.interval.ms=5000 # 自动提交间隔
max.poll.records=500 # 单次拉取最大记录数
max.poll.interval.ms=300000 # 拉取最大间隔(默认5分钟)
# 会话配置
session.timeout.ms=45000 # 会话超时时间(默认45秒)
heartbeat.interval.ms=3000 # 心跳间隔(建议不超过session.timeout.ms的1/3)
# 高级配置
group.protocol=classic # 消费者组协议:classic(经典)或 consumer(新版)
group.instance.id=static-consumer-1 # 静态成员ID,避免短暂离线触发再均衡
Kafka 生产者
生产者工作流程
生产者发送消息的基本流程:
- 序列化:将 Key 和 Value 序列化为字节数组
- 分区选择:确定消息发送到哪个分区
- 消息压缩:可选的批量压缩
- 批量发送:累积消息批量发送
- 确认接收:等待 Broker 确认
Java 生产者示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) {
// 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 5);
props.put("enable.idempotence", true);
// 创建生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息(异步)
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key1", "value1");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("发送成功: " +
"topic=" + metadata.topic() +
", partition=" + metadata.partition() +
", offset=" + metadata.offset());
} else {
System.err.println("发送失败: " + exception.getMessage());
}
}
});
// 发送消息(同步)
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("发送成功: " + metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
// 关闭生产者
producer.close();
}
}
消息发送方式
发送并忘记(Fire-and-Forget)
// 不关心发送结果
producer.send(new ProducerRecord<>("topic", "key", "value"));
异步发送(Async)
// 回调处理发送结果
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 处理发送失败
exception.printStackTrace();
} else {
// 处理发送成功
System.out.println("Offset: " + metadata.offset());
}
});
同步发送(Sync)
// 等待发送完成
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Offset: " + metadata.offset());
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}
分区策略
默认分区器
// 如果指定分区,使用指定分区
// 如果有 Key,对 Key 哈希后取模
// 如果无 Key,使用轮询(Sticky Partition Cache)
props.put("partitioner.class",
"org.apache.kafka.clients.producer.internals.DefaultPartitioner");
自定义分区器
public class CustomPartitioner implements Partitioner {
private int specialPartition;
@Override
public void configure(Map<String, ?> configs) {
// 初始化配置
specialPartition = Integer.parseInt(
configs.get("special.partition").toString());
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区逻辑
if (key != null && key.toString().startsWith("special-")) {
return specialPartition;
}
// 其他消息随机分配
return ThreadLocalRandom.current().nextInt(
cluster.partitionCountForTopic(topic));
}
@Override
public void close() {}
}
// 使用自定义分区器
props.put("partitioner.class", CustomPartitioner.class.getName());
props.put("special.partition", "0");
可靠性保证
Ack 确认机制
// acks=0: 不等待确认,最快但可能丢失
props.put("acks", "0");
// acks=1: 等待 Leader 确认,平衡性能和可靠性
props.put("acks", "1");
// acks=all: 等待所有 ISR 副本确认,最可靠
props.put("acks", "all");
幂等性生产者
// 启用幂等性,防止重复消息
props.put("enable.idempotence", true);
// 等价于:
// props.put("acks", "all");
// props.put("retries", Integer.MAX_VALUE);
// props.put("max.in.flight.requests.per.connection", 5);
事务性生产者
事务性生产者用于实现跨分区的原子写入,保证"精确一次"语义。这对于需要将消息写入多个主题、或需要将消费位置和输出结果一起提交的场景非常重要。
使用场景:
- 消费者-生产者模式:从 Kafka 消费消息,处理后写入另一个主题,需要保证消费位置和输出结果的原子性
- 跨主题写入:一条消息需要写入多个主题,要么全部成功要么全部失败
- 流处理应用:Kafka Streams 使用事务保证精确一次处理语义
工作原理:
事务性生产者使用两阶段提交协议:
- 阶段一:向事务协调器注册事务,发送消息到各分区
- 阶段二:提交或回滚事务,事务协调器通知所有分区
// 配置事务
props.put("transactional.id", "my-transactional-id"); // 必须唯一,用于标识生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);
// 初始化事务(只需执行一次)
// 这会清理该 transactional.id 之前的未完成事务
producer.initTransactions();
try {
// 开始事务
producer.beginTransaction();
// 发送多条消息到不同主题(原子性保证)
producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
producer.send(new ProducerRecord<>("topic2", "key2", "value2"));
// 可以同时发送消费偏移量(用于消费-处理-生产模式)
// producer.sendOffsetsToTransaction(offsets, consumerGroupId);
// 提交事务(原子写入)
producer.commitTransaction();
} catch (Exception e) {
// 回滚事务(丢弃所有未提交的消息)
producer.abortTransaction();
}
配置要求:
# 生产者配置
enable.idempotence=true # 事务要求幂等性
acks=all # 事务要求所有副本确认
transactional.id=unique-id # 必须唯一
# Broker 配置(消费者端)
isolation.level=read_committed # 只读取已提交的事务消息
注意事项:
transactional.id必须唯一,同一时间只能有一个生产者实例使用- 事务超时时间默认为 60 秒,长时间事务需要调整
transaction.timeout.ms - 消费者需要设置
isolation.level=read_committed才能正确过滤未提交的事务消息 - 事务会带来性能开销,非必要场景不要使用
Kafka 消费者
消费者工作流程
消费者消费消息的基本流程:
- 加入消费者组:与组协调器通信,加入消费者组
- 分区分配:获取分配给自己的分区
- 拉取消息:从分配的分区拉取消息
- 处理消息:处理收到的消息
- 提交 Offset:记录消费位置
Java 消费者示例
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
// 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("my-topic"));
// 消费消息
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
// 手动提交 Offset
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
Offset 提交方式
自动提交
// 配置自动提交
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
// 注意:自动提交可能导致消息丢失或重复消费
手动同步提交
// 处理完消息后同步提交
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
process(record);
}
// 同步提交,会阻塞直到提交成功
consumer.commitSync();
} catch (CommitFailedException e) {
// 提交失败处理
}
手动异步提交
// 异步提交,不阻塞
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
System.err.println("提交失败: " + offsets);
} else {
System.out.println("提交成功: " + offsets);
}
}
});
指定 Offset 提交
// 按分区指定 Offset 提交
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("topic", 0),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(offsets);
消费者再均衡
当消费者组中的消费者发生变化时,会触发再均衡(Rebalance):
触发条件:
- 新消费者加入
- 消费者离线或崩溃
- 主题分区数变化
再均衡类型
Kafka 支持两种再均衡协议:
1. Eager(急切)再均衡
传统的再均衡方式,所有消费者停止消费,释放所有分区,然后重新分配:
触发再均衡 → 所有消费者停止消费 → 释放所有分区 → 重新分配 → 恢复消费
缺点:会导致短暂的消费停顿,影响吞吐量。
2. Cooperative(协作式)再均衡
Kafka 2.4+ 引入的新协议,只重新分配需要变更的分区,不影响其他分区:
触发再均衡 → 只有受影响的消费者停止消费 → 只重新分配变更的分区 → 渐进式恢复
优势:减少再均衡的影响范围,降低消费中断时间。
分区分配策略
// 可用的分配策略
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// RangeAssignor(默认):按主题范围分配
// RoundRobinAssignor:轮询分配
// StickyAssignor:粘性分配,尽量保持原有分配
// CooperativeStickyAssignor:协作式粘性分配(推荐)
静态成员
静态成员可以避免因短暂离线(如重启)触发的再均衡:
// 配置静态成员
props.put("group.instance.id", "consumer-1-static");
// 配置较大的会话超时,允许短暂离线
props.put("session.timeout.ms", "300000"); // 5分钟
props.put("max.poll.interval.ms", "600000"); // 10分钟
工作原理:
- 静态成员离开组时,不会立即触发再均衡
- 在
session.timeout.ms内重新加入,可以继续消费原有分区 - 只有超时未返回,才会触发再均衡
适用场景:
- 需要重启更新的应用
- 维护期间的短暂离线
- 减少不必要的再均衡
再均衡监听器
consumer.subscribe(Collections.singletonList("topic"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 分区被收回前,提交 Offset
consumer.commitSync();
System.out.println("分区被收回: " + partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 分区被分配后,可以从指定位置开始消费
System.out.println("分区被分配: " + partitions);
// consumer.seekToBeginning(partitions);
// consumer.seekToEnd(partitions);
}
});
指定位置消费
// 从最早的消息开始消费
consumer.seekToBeginning(Collections.singletonList(
new TopicPartition("topic", 0)));
// 从最新的消息开始消费
consumer.seekToEnd(Collections.singletonList(
new TopicPartition("topic", 0)));
// 从指定 Offset 开始消费
consumer.seek(new TopicPartition("topic", 0), 100L);
// 从指定时间戳开始消费
long timestamp = System.currentTimeMillis() - 3600000; // 1小时前
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(new TopicPartition("topic", 0), timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
if (entry.getValue() != null) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
}
多线程消费
KafkaConsumer 不是线程安全的,但可以在主线程中消费,将消息分发给工作线程处理:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiThreadConsumer {
private final KafkaConsumer<String, String> consumer;
private final ExecutorService executor;
public MultiThreadConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "multi-thread-group");
// ... 其他配置
this.consumer = new KafkaConsumer<>(props);
this.executor = Executors.newFixedThreadPool(10);
}
public void consume() {
consumer.subscribe(Collections.singletonList("topic"));
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (final ConsumerRecord<String, String> record : records) {
// 提交任务到线程池
executor.submit(() -> {
processRecord(record);
});
}
// 等待所有任务完成后提交 Offset
// 需要自行实现任务跟踪
}
} finally {
executor.shutdown();
consumer.close();
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// 处理消息
}
}
Kafka Streams
Kafka Streams 是 Kafka 提供的流处理库,可以轻松构建实时流处理应用。
核心概念
| 概念 | 说明 |
|---|---|
| KStream | 消息流,每条记录都是独立的 |
| KTable | 变更日志流,记录键的最新值 |
| GlobalKTable | 全局表,所有实例都有完整数据 |
| Topology | 处理逻辑的有向无环图 |
WordCount 示例
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class WordCount {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// 从主题读取数据
KStream<String, String> source = builder.stream("input-topic");
// 处理数据:分割单词、计数
KTable<String, Long> counts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\s+")))
.groupBy((key, word) -> word)
.count(Materialized.as("counts-store"));
// 输出到主题
counts.toStream().to("output-topic",
Produced.with(Serdes.String(), Serdes.Long()));
// 创建并启动 Kafka Streams
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
常用操作
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input");
// 过滤
KStream<String, String> filtered = stream.filter((key, value) -> value.length() > 5);
// 映射
KStream<String, String> mapped = stream.mapValues(value -> value.toUpperCase());
// 分组聚合
KTable<String, Long> aggregated = stream
.groupBy((key, value) -> value)
.count();
// 窗口聚合
KTable<Windowed<String>, Long> windowed = stream
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// Join
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
KStream<String, String> joined = stream1.join(stream2,
(value1, value2) -> value1 + "-" + value2,
JoinWindows.of(Duration.ofMinutes(5)));
Kafka Connect
Kafka Connect 是 Kafka 提供的数据导入导出工具,可以轻松连接各种数据系统。
运行 Connect
# 分布式模式
bin/connect-distributed.sh config/connect-distributed.properties
# 单机模式
bin/connect-standalone.sh config/connect-standalone.properties
文件源连接器配置
{
"name": "file-source",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/var/log/app.log",
"topic": "log-topic",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
JDBC 源连接器配置
{
"name": "jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "password",
"topic.prefix": "mysql-",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "users, orders",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
管理 Connect
# 列出连接器
curl http://localhost:8083/connectors
# 创建连接器
curl -X POST -H "Content-Type: application/json" \
--data @connector.json \
http://localhost:8083/connectors
# 查看连接器状态
curl http://localhost:8083/connectors/file-source/status
# 删除连接器
curl -X DELETE http://localhost:8083/connectors/file-source
Kafka 运维管理
主题管理
# 创建主题
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 3 \
--replication-factor 2
# 查看主题列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看主题详情
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic
# 修改主题配置
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=86400000
# 增加分区(只能增加,不能减少)
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 6
# 删除主题
kafka-topics.sh --delete \
--bootstrap-server localhost:9092 \
--topic my-topic
消费者组管理
# 查看消费者组列表
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 查看消费者组详情
kafka-consumer-groups.sh --describe \
--bootstrap-server localhost:9092 \
--group my-group
# 查看消费者组成员
kafka-consumer-groups.sh --members \
--bootstrap-server localhost:9092 \
--group my-group
# 重置 Offset
kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server localhost:9092 \
--group my-group \
--topic my-topic \
--to-earliest \
--execute
# 删除消费者组
kafka-consumer-groups.sh --delete \
--bootstrap-server localhost:9092 \
--group my-group
性能测试
# 生产者性能测试
kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 1000000 \
--record-size 1000 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092
# 消费者性能测试
kafka-consumer-perf-test.sh \
--topic test-topic \
--messages 1000000 \
--bootstrap-server localhost:9092
集群监控
关键指标:
| 指标类别 | 具体指标 | 说明 |
|---|---|---|
| Broker | UnderReplicatedPartitions | 未完全同步的分区数 |
| OfflinePartitionsCount | 离线分区数 | |
| ActiveControllerCount | 活跃控制器数量 | |
| Topic | MessagesInPerSec | 每秒消息数 |
| BytesInPerSec | 每秒写入字节数 | |
| BytesOutPerSec | 每秒读取字节数 | |
| Consumer | ConsumerLag | 消费延迟 |
| ConsumerFetchRate | 拉取速率 |
监控工具:
- Kafka Manager / CMAK:集群管理工具
- Prometheus + Grafana:指标监控
- Burrow:消费者延迟监控
- Kafka Exporter:Prometheus 导出器
最佳实践
Topic 命名规范
<领域>.<业务>.<事件类型>
示例:
user.activity.login
order.payment.completed
system.log.error
分区数选择
分区数 = min(目标吞吐量 / 单分区最大吞吐量, 总分区数上限)
考虑因素:
- 单分区的顺序性保证
- 消费者的并行度
- Broker 的文件句柄数量
消息 Key 设计
// 选择合适的 Key 保证分区有序性
// 例如:订单相关消息使用订单ID作为Key
String key = orderId;
// 需要严格顺序的场景
String key = userId + "_" + sessionId;
// 时间窗口数据的Key设计
String key = userId + "_" + (timestamp / windowSize);
错误处理
// 生产者错误处理
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// 可重试错误:网络问题、Leader选举等
retryQueue.add(record);
} else {
// 不可重试错误:序列化失败、权限问题等
deadLetterQueue.add(record);
}
}
});
// 消费者错误处理
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
process(record);
} catch (Exception e) {
// 记录错误,跳过此消息
log.error("处理失败: " + record, e);
// 发送到死信队列
sendToDeadLetterQueue(record);
}
}
consumer.commitSync();
} catch (WakeupException e) {
// 正常关闭
} catch (Exception e) {
// 其他错误
}
性能优化
生产者优化:
// 批量发送
props.put("batch.size", 32768);
props.put("linger.ms", 10);
// 压缩
props.put("compression.type", "lz4");
// 缓冲区
props.put("buffer.memory", 67108864);
消费者优化:
// 批量拉取
props.put("max.poll.records", 1000);
props.put("fetch.min.bytes", 1048576);
props.put("fetch.max.wait.ms", 500);
Broker 优化:
# 增加网络和IO线程
num.network.threads=8
num.io.threads=16
# 增加日志刷新批次大小
log.flush.interval.messages=10000
log.flush.interval.ms=1000
# 调整JVM参数
KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"
KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20"
小结
本章介绍了 Kafka 的核心概念和使用方法:
- 核心概念:消息、主题、分区、Broker、消费者组、Offset
- 架构模式:KRaft 模式(Kafka 4.0+ 默认,已移除 ZooKeeper 依赖)
- 生产者:发送方式、分区策略、幂等性、事务保证
- 消费者:Offset 管理、再均衡机制、静态成员、协作式再均衡
- Kafka Streams:流处理 API
- Kafka Connect:数据导入导出
- 运维管理:主题管理、监控、性能优化
Kafka 版本演进要点
| 版本 | 重要特性 |
|---|---|
| Kafka 0.11 | 引入幂等性生产者、事务支持 |
| Kafka 2.1 | 引入 ZStandard 压缩 |
| Kafka 2.3 | 引入增量再均衡 |
| Kafka 2.4 | 协作式再均衡、静态成员 |
| Kafka 2.8 | KRaft 模式早期访问 |
| Kafka 3.0 | KRaft 模式生产就绪 |
| Kafka 4.0 | 完全移除 ZooKeeper,KRaft 成为默认,新消费者组协议 GA |
Kafka 4.0 重要变更详解
Kafka 4.0 是一个里程碑版本,包含多项重大变更:
1. ZooKeeper 完全移除
从 Kafka 4.0 开始,ZooKeeper 模式被完全移除,KRaft 成为唯一的元数据管理方式。这意味着:
- 新部署必须使用 KRaft 模式
- 从旧版本升级需要先迁移到 KRaft 模式
- 简化了运维复杂度,不再需要管理两套系统
2. Java 版本要求变更
| 组件 | Kafka 3.x | Kafka 4.0 |
|---|---|---|
| Kafka Clients | Java 8+ | Java 11+ |
| Kafka Streams | Java 8+ | Java 11+ |
| Kafka Broker | Java 8+ | Java 17+ |
| Kafka Connect | Java 8+ | Java 17+ |
升级注意:升级前请确保客户端版本在 2.1 或以上,否则会出现兼容性问题。
3. 新消费者组协议(KIP-848)
Kafka 4.0 正式发布了新一代消费者组协议,解决了传统协议的"停止-世界"再均衡问题:
# 启用新消费者组协议(客户端配置)
group.protocol=consumer
新协议优势:
- 消费者组再均衡不再需要停止所有消费者
- 大幅降低再均衡延迟
- 提高大集群的稳定性
- 简化客户端实现
4. 生产者 linger.ms 默认值变更
linger.ms 默认值从 0 改为 5ms。这是因为更大的批次通常能带来更好的吞吐量,而延迟影响很小。对于对延迟敏感的场景,可以手动调低此值。
5. 事务协议增强(KIP-890)
第二阶段的事务服务端防护已完成,减少了生产者故障时"僵尸事务"的可能性。
6. 合格领导者副本(KIP-966,预览功能)
KIP-966 引入了合格领导者副本(Eligible Leader Replicas,ELR)作为预览功能,解决了传统 ISR 选举可能丢失数据的问题:
问题背景:
在传统的 ISR(In-Sync Replicas)机制中,当 Leader 故障时,会选择 ISR 中的任意副本成为新 Leader。但 ISR 中的副本可能落后于高水位(High Watermark),这意味着:
- 某些已提交的消息可能在新 Leader 上不存在
- 故障恢复后可能丢失数据
- 需要等待所有 ISR 副本同步才能继续服务
ELR 解决方案:
ELR 是 ISR 的子集,保证拥有完整的高水位数据。当 Leader 故障时:
- 优先从 ELR 中选举新 Leader
- 确保 New Leader 拥有所有已提交的消息
- 防止数据丢失
传统 ISR 选举:
ISR = [Replica1, Replica2, Replica3]
Leader 故障 → 从 ISR 中选举新 Leader
问题:ISR 中的副本可能落后于高水位,选举后可能丢失数据
ELR 选举:
ISR = [Replica1, Replica2, Replica3]
ELR = [Replica1, Replica2] // 保证有完整数据的副本
Leader 故障 → 优先从 ELR 中选举
优势:确保新 Leader 拥有完整数据,防止数据丢失
配置示例:
# Broker 配置
# 启用 ELR 功能(预览功能)
elr.enabled=true
# ELR 最小副本数
elr.min.replicas=2
# 副本进入 ELR 的条件:必须完全同步到高水位
适用场景:
- 金融交易系统:对数据一致性要求极高
- 订单处理系统:不能容忍数据丢失
- 审计日志系统:需要保证数据完整性
注意事项:
- ELR 功能目前处于预览阶段,不建议生产环境使用
- ELR 会增加 Leader 选举的延迟(需要等待更多副本同步)
- 需要权衡可用性和一致性
7. 基于时间的 Offset 重置(KIP-1106)
传统上,消费者只能从最早、最新或指定 Offset 开始消费。KIP-1106 引入了基于时间的 Offset 重置策略:
# 新增的 offset 重置选项
# 从指定时间开始消费(支持多种格式)
auto.offset.reset=earliest
auto.offset.reset=latest
auto.offset.reset=none
# 新增:使用 seekTo 时间戳
# 在代码中使用
consumer.seekToBeginning(partitions); // 从最早开始
consumer.seekToEnd(partitions); // 从最新开始
// 从指定时间开始(新功能)
long timestamp = System.currentTimeMillis() - 3600000; // 1小时前
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(new TopicPartition("topic", 0), timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
8. kafka-groups.sh 新工具
Kafka 4.0 引入了新的 kafka-groups.sh 工具,用于统一管理所有类型的消费者组:
# 列出所有类型的组(消费者组、共享组等)
kafka-groups.sh --bootstrap-server localhost:9092 --list
# 查看组详情(自动识别组类型)
kafka-groups.sh --bootstrap-server localhost:9092 \
--group my-group --describe
# 查看组成员
kafka-groups.sh --bootstrap-server localhost:9092 \
--group my-group --members --verbose
与旧工具的区别:
| 工具 | 说明 |
|---|---|
kafka-consumer-groups.sh | 仅管理传统消费者组 |
kafka-share-groups.sh | 仅管理共享组 |
kafka-groups.sh | 统一管理所有类型的组(推荐使用) |
9. 客户端指标推送增强(KIP-1076)
KIP-1076 扩展了 KIP-714,允许应用程序在客户端指标中添加自定义指标:
// 注册自定义指标
Map<String, String> customMetrics = new HashMap<>();
customMetrics.put("app.processing.time.ms", String.valueOf(processingTime));
customMetrics.put("app.queue.size", String.valueOf(queueSize));
// 通过 Kafka 客户端 API 推送
// 这些指标会随客户端指标一起发送到 Broker
10. 移除的 API 和配置
Kafka 4.0 移除了大量已弃用的 API:
- 旧的消息格式 v0 和 v1
poll(long)方法(请使用poll(Duration))alterConfigsAdmin API(请使用incrementalAlterConfigs)- MirrorMaker 1(MM1)(请使用 MM2)
- Log4j 1.x(迁移到 Log4j 2.x)
学习建议
- 从基础开始:先理解消息、主题、分区、消费者组等核心概念
- 实践操作:使用命令行工具进行主题管理、消息收发
- 理解可靠性:掌握 acks、幂等性、事务的使用场景
- 优化性能:根据业务需求调整批量大小、压缩算法等参数
- 监控运维:关注 Consumer Lag、分区均衡等关键指标
Kafka 是现代大数据架构的核心组件,掌握 Kafka 对于构建实时数据系统至关重要。在实际项目中,Kafka 常与 Spark、Flink、HBase 等组件配合使用,构建完整的大数据处理平台。