跳到主要内容

Kafka 消息队列

Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,后来成为 Apache 顶级项目。Kafka 以高吞吐、低延迟、高可用著称,是现代大数据架构的核心组件之一。

什么是 Kafka?

Kafka 是一个分布式事件流平台,具备三个核心能力:

  1. 发布和订阅:可以像消息队列一样发布和订阅消息流
  2. 持久化存储:以容错的方式存储消息流
  3. 流处理:在消息发生时或事后进行处理

Kafka 与传统消息队列的区别

维度Kafka传统消息队列(如 RabbitMQ)
消息模型发布-订阅模型,消息持久化点对点模型,消息消费后删除
吞吐量百万级/秒万级/秒
消息存储基于磁盘,可配置保留时间内存为主,消费后删除
消费模式拉取模式,消费者主动拉取推送模式,主动推送给消费者
消息回溯支持重复消费不支持
适用场景大数据、日志收集、流处理传统业务消息、任务队列

Kafka 应用场景

场景说明
日志收集集中收集各服务的日志,统一处理
消息系统服务间异步通信,解耦系统
流处理实时数据处理、实时分析
事件溯源记录系统状态变更事件
网站活动追踪追踪用户行为,实时分析

核心概念

消息(Message/Event)

消息是 Kafka 中数据的基本单位,包含以下组成部分:

  • Key(键):可选,用于分区路由和日志压缩
  • Value(值):消息的实际内容
  • Timestamp(时间戳):消息产生的时间
  • Headers(头部):可选的元数据
消息示例:
Key: "user-001"
Value: {"action": "login", "ip": "192.168.1.1"}
Timestamp: 1704067200000
Headers: {"source": "web-app"}

主题(Topic)

主题是消息的逻辑分类,类似于数据库中的表。生产者将消息发送到特定主题,消费者订阅主题来接收消息。

主题命名示例:
- user-events(用户事件)
- order-payments(订单支付)
- system-logs(系统日志)

分区(Partition)

每个主题可以分为多个分区,分区是 Kafka 实现高吞吐和可扩展的关键:

分区的作用

  1. 并行处理:多个分区可以被多个消费者并行消费
  2. 负载均衡:分区分布在不同的 Broker 上
  3. 有序性保证:同一分区内的消息有序

分区策略

  • 指定分区:直接指定消息发送到哪个分区
  • 按键哈希:根据 Key 的哈希值确定分区(保证相同 Key 的消息进入同一分区)
  • 轮询:轮流将消息分配到各分区(无 Key 时的默认策略)
主题:user-events(3个分区)
┌─────────────────────────────────────────────┐
│ Partition 0: [msg1] [msg4] [msg7] ... │
├─────────────────────────────────────────────┤
│ Partition 1: [msg2] [msg5] [msg8] ... │
├─────────────────────────────────────────────┤
│ Partition 2: [msg3] [msg6] [msg9] ... │
└─────────────────────────────────────────────┘

Broker

Broker 是 Kafka 集群中的服务节点,负责:

  • 存储消息:每个 Broker 存储部分分区数据
  • 处理请求:处理生产者和消费者的请求
  • 副本同步:在副本之间同步数据

一个 Kafka 集群通常由多个 Broker 组成,每个 Broker 用 ID 标识。

副本(Replica)

副本是分区的备份,用于保证数据的高可用:

  • Leader 副本:处理所有读写请求
  • Follower 副本:从 Leader 同步数据,不处理请求

当 Leader 故障时,会从 Follower 中选举新的 Leader。

分区副本分布示例(复制因子=3):

Broker 1 Broker 2 Broker 3
┌────────┐ ┌────────┐ ┌────────┐
│P0 │ │P0 │ │P0 │
│(Leader)│ │(Follower)│ │(Follower)│
├────────┤ ├────────┤ ├────────┤
│P1 │ │P1 │ │P1 │
│(Follower)│ │(Leader)│ │(Follower)│
└────────┘ └────────┘ └────────┘

消费者组(Consumer Group)

消费者组是实现消息广播和单播的关键机制:

  • 同一组内:每个分区只被一个消费者消费(单播)
  • 不同组间:每个组都能收到消息的完整副本(广播)
主题(4个分区)被消费者组A消费:

分区: P0 P1 P2 P3
↓ ↓ ↓ ↓
消费者组A: C1 C1 C2 C2

如果添加消费者C3到组A:
分区: P0 P1 P2 P3
↓ ↓ ↓ ↓
消费者组A: C1 C2 C3 C3(重新分配)

共享组(Share Group,Kafka 4.0+)

早期访问功能

共享组是 Kafka 4.0 引入的早期访问功能(KIP-932),目前不适合生产环境使用。该功能仍在积极开发中,API 和行为可能在未来版本中发生变化。

共享组是一种全新的消费模式,与传统的消费者组并存,提供了更灵活的消息消费方式。

共享组 vs 消费者组

特性消费者组共享组
分区分配每个分区只分配给一个消费者多个消费者可以从同一分区消费
消息顺序保证分区内消息顺序不保证消息顺序
消费模式竞争消费(每条消息只被一个消费者处理)协作消费(支持更灵活的消费模式)
适用场景需要消息顺序保证的场景任务队列、工作分发场景
消息确认Offset 提交记录级确认(ACCEPT/RELEASE/REJECT)

工作原理

共享组的核心机制是"获取锁"(Acquisition Lock):

  1. 消息获取:消费者调用 poll() 时,消息被加上一个有时限的获取锁
  2. 锁超时:默认 30 秒,超时后锁自动释放,消息可被其他消费者获取
  3. 消息确认:消费者处理完消息后,可以确认(ACCEPT)、释放(RELEASE)或拒绝(REJECT)
共享组工作流程:

┌─────────────────────────────────────────────────────────────┐
│ Kafka Topic │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Partition 0: [msg1] [msg2] [msg3] [msg4] [msg5] │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

┌─────────────┼─────────────┐
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│Consumer 1│ │Consumer 2│ │Consumer 3│
│ msg1(L) │ │ msg2(L) │ │ msg3(L) │
│ msg4(L) │ │ msg5(L) │ │ │
└──────────┘ └──────────┘ └──────────┘

说明:(L) 表示消息被加锁,同一分区的消息可以同时被多个消费者消费

确认类型

共享组支持三种确认类型:

类型说明使用场景
ACCEPT消息处理成功正常处理完成
RELEASE释放消息,可被重新消费临时性错误,希望其他消费者重试
REJECT拒绝消息,不再重新消费永久性错误,消息无法处理

使用示例

隐式确认(批量确认)

import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Properties;
import java.util.Arrays;

public class ShareConsumerImplicitAck {
public static void main(String[] args) {
// 配置共享组消费者
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-share-group"); // 共享组ID
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

// 创建共享组消费者(注意使用 KafkaShareConsumer 而非 KafkaConsumer)
KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);

// 订阅主题
consumer.subscribe(Arrays.asList("my-topic"));

try {
while (true) {
// 拉取消息
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息
processMessage(record);
// 不需要显式确认,下一次 poll 或 commitSync 时自动确认
} catch (Exception e) {
// 处理失败,消息会在锁超时后自动释放
System.err.println("处理失败: " + e.getMessage());
}
}
// 可选:显式提交确认
consumer.commitSync();
}
} finally {
consumer.close();
}
}

private static void processMessage(ConsumerRecord<String, String> record) {
System.out.printf("处理消息: offset=%d, key=%s, value=%s%n",
record.offset(), record.key(), record.value());
}
}

显式确认(记录级确认)

import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import java.time.Duration;
import java.util.Properties;
import java.util.Arrays;

public class ShareConsumerExplicitAck {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-share-group");
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
try {
// 处理消息
processMessage(record);
// 确认处理成功
consumer.acknowledge(record, AcknowledgeType.ACCEPT);
} catch (TransientException e) {
// 临时性错误,释放消息让其他消费者重试
System.err.println("临时错误,释放消息: " + e.getMessage());
consumer.acknowledge(record, AcknowledgeType.RELEASE);
} catch (PermanentException e) {
// 永久性错误,拒绝消息
System.err.println("永久错误,拒绝消息: " + e.getMessage());
consumer.acknowledge(record, AcknowledgeType.REJECT);
}
}
// 提交所有确认
consumer.commitSync();
}
} finally {
consumer.close();
}
}

private static void processMessage(ConsumerRecord<String, String> record)
throws TransientException, PermanentException {
// 业务处理逻辑
}
}

// 自定义异常类型
class TransientException extends Exception {
public TransientException(String message) { super(message); }
}

class PermanentException extends Exception {
public PermanentException(String message) { super(message); }
}

配置参数

共享组特有的配置参数:

参数默认值说明
group.share.record.lock.duration.ms30000获取锁的持续时间(毫秒)
group.share.isolation.levelread_uncommitted隔离级别,控制是否读取事务消息

管理命令

Kafka 4.0 提供了新的管理工具:

# 列出所有共享组
kafka-share-groups.sh --bootstrap-server localhost:9092 --list

# 查看共享组详情
kafka-share-groups.sh --bootstrap-server localhost:9092 \
--group my-share-group --describe

# 查看共享组成员
kafka-share-groups.sh --bootstrap-server localhost:9092 \
--group my-share-group --members --verbose

# 查看所有类型的组(消费者组 + 共享组)
kafka-groups.sh --bootstrap-server localhost:9092 --list

# 重置共享组的 Offset
kafka-share-groups.sh --bootstrap-server localhost:9092 \
--group my-share-group \
--topic my-topic \
--reset-offsets \
--to-earliest \
--execute

适用场景

适合共享组的场景

  • 任务队列:将任务分发给多个工作进程处理
  • 工作分发:并行处理不需要严格顺序的工作项
  • 微服务通信:服务间的请求-响应模式
  • 负载均衡消费:消费者处理能力不均匀时自动平衡

不适合共享组的场景

  • 需要严格消息顺序的场景
  • 需要精确一次语义的场景(目前不支持)
  • 事务性消息处理(read_committed 隔离级别尚未支持)

注意事项

  1. 顺序性牺牲:共享组不保证消息顺序,如果业务需要顺序保证,请使用传统消费者组
  2. 事务支持有限:目前 read_committed 隔离级别尚未完全支持
  3. 生产环境慎用:作为早期访问功能,API 可能变化,不建议在生产环境使用
  4. Broker 版本要求:需要 Kafka 4.0+ 的 Broker 才能使用共享组功能
  5. 客户端要求:需要使用 Kafka 4.0+ 的客户端库
  6. 协议版本:共享组使用新的 RPC 协议,与旧客户端不兼容

共享组 vs 传统消息队列

共享组使 Kafka 能够更好地与传统消息队列(如 RabbitMQ、ActiveMQ)竞争:

特性Kafka 共享组RabbitMQActiveMQ
消息模型发布-订阅 + 队列队列为主队列为主
吞吐量百万级/秒万级/秒万级/秒
消息持久化磁盘持久化,支持回溯内存+磁盘内存+磁盘
消费确认ACCEPT/RELEASE/REJECTACK/NACKACK
消息顺序不保证单消费者时保证单消费者时保证
扩展性水平扩展有限扩展有限扩展

迁移建议

如果你的应用从传统消息队列迁移到 Kafka,可以考虑:

  • 任务队列场景 → 使用共享组
  • 事件流场景 → 使用传统消费者组
  • 混合场景 → 同一 Kafka 集群可以同时支持两种模式

Offset

Offset 是消息在分区中的唯一标识,是一个递增的整数:

  • 消费者通过 Offset 记录消费位置
  • Kafka 不删除已消费的消息,支持重复消费
  • Offset 可以保存在 Kafka 内部(__consumer_offsets 主题)或外部
分区中的消息和 Offset:

Offset: 0 1 2 3 4
消息: [msg1] [msg2] [msg3] [msg4] [msg5]
消费者A的读取位置: ↑
offset=2

Kafka 架构

整体架构

Kafka 采用分布式架构,主要组件包括:

┌─────────────────────────────────────────────────────────────┐
│ Kafka 集群 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Broker1 │ │ Broker2 │ │ Broker3 │ │
│ │ P0(L) │ │ P1(L) │ │ P2(L) │ │
│ │ P1(F) │ │ P2(F) │ │ P0(F) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────┘
↑ ↑ ↑
│ │ │
┌───┴───┐ ┌───┴───┐ ┌───┴───┐
│Producer│ │Producer│ │Consumer│
└───────┘ └───────┘ │ Group │
└────────┘

KRaft 模式 vs ZooKeeper 模式

Kafka 有两种部署模式:

ZooKeeper 模式(传统模式)

ZooKeeper 负责:

  • 集群元数据管理
  • Controller 选举
  • Broker 状态监控
  • Topic 配置管理

缺点

  • 运维复杂,需要管理两套系统
  • ZooKeeper 可能成为性能瓶颈
  • 元数据同步延迟较高

KRaft 模式(新模式,Kafka 2.8+)

KRaft 模式使用 Kafka 内置的 Raft 协议管理元数据:

优势

  • 架构简化,无需 ZooKeeper
  • 元数据访问延迟降低
  • 部署和运维更简单
  • 扩展性更好

配置示例

# KRaft 模式配置
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093
controller.listener.names=CONTROLLER
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
推荐

对于新项目,推荐使用 KRaft 模式。Kafka 4.0 已完全移除 ZooKeeper 支持。

消息存储

Kafka 的消息存储设计:

日志段(Log Segment)

  • 每个分区由多个日志段组成
  • 当前活跃段用于写入,其他段只读
  • 日志段达到阈值后滚动创建新段

文件结构

/kafka-logs/
└── topic-partition/
├── 00000000000000000000.log # 消息数据
├── 00000000000000000000.index # 偏移量索引
├── 00000000000000000000.timeindex # 时间索引
├── 00000000000000000005.log
├── 00000000000000000005.index
└── 00000000000000000005.timeindex

消息保留策略

# 基于时间保留(默认7天)
log.retention.hours=168

# 基于大小保留(默认-1,不限制)
log.retention.bytes=1073741824

# 日志段大小
log.segment.bytes=1073741824

# 日志压缩(相同Key保留最新值)
log.cleanup.policy=compact

Kafka 安装与配置

快速启动(单机模式)

# 下载 Kafka
wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz
tar -xzf kafka_2.13-3.6.1.tgz
cd kafka_2.13-3.6.1

# 启动 ZooKeeper(如果使用 ZooKeeper 模式)
bin/zookeeper-server-start.sh config/zookeeper.properties &

# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties &

# 或者使用 KRaft 模式
# 生成集群ID
KAFKA_CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
# 格式化存储
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
# 启动
bin/kafka-server-start.sh config/kraft/server.properties

关键配置参数

Broker 配置

# 基础配置
broker.id=0 # Broker唯一标识
listeners=PLAINTEXT://:9092 # 监听地址
advertised.listeners=PLAINTEXT://host:9092 # 对外地址
num.network.threads=3 # 网络线程数
num.io.threads=8 # IO线程数

# 日志配置
log.dirs=/var/kafka-logs # 日志目录
num.partitions=3 # 默认分区数
default.replication.factor=3 # 默认副本数
log.retention.hours=168 # 消息保留时间

# 性能配置
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 副本配置
num.recovery.threads.per.data.dir=1 # 恢复线程数
replica.fetch.max.bytes=1048576 # 副本拉取最大字节数
replica.fetch.wait.max.ms=500 # 副本拉取等待时间

生产者配置

# 基础配置
bootstrap.servers=localhost:9092 # Kafka集群地址
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

# 可靠性配置
acks=all # 确认机制:0, 1, all(-1)
retries=2147483647 # 重试次数(默认Integer.MAX_VALUE)
max.in.flight.requests.per.connection=5 # 连接最大未确认请求数
enable.idempotence=true # 幂等性(推荐开启)

# 批量发送配置
batch.size=16384 # 批量大小(默认16KB)
linger.ms=5 # 等待时间(Kafka 4.0默认从0改为5ms)
buffer.memory=33554432 # 缓冲区大小(默认32MB)
delivery.timeout.ms=120000 # 发送超时时间(默认2分钟)

# 压缩配置
compression.type=snappy # 压缩类型:none, gzip, snappy, lz4, zstd
批量发送优化

linger.ms 控制生产者等待多长时间以累积更多消息形成更大的批次。Kafka 4.0 将默认值从 0 改为 5ms,因为更大的批次通常能带来更好的吞吐量,而延迟影响很小。在高吞吐场景下,可以适当增大此值(如 10-50ms)。

消费者配置

# 基础配置
bootstrap.servers=localhost:9092
group.id=my-group # 消费者组ID
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 消费配置
auto.offset.reset=earliest # 无Offset时的行为:earliest, latest, none
enable.auto.commit=true # 自动提交Offset
auto.commit.interval.ms=5000 # 自动提交间隔
max.poll.records=500 # 单次拉取最大记录数
max.poll.interval.ms=300000 # 拉取最大间隔(默认5分钟)

# 会话配置
session.timeout.ms=45000 # 会话超时时间(默认45秒)
heartbeat.interval.ms=3000 # 心跳间隔(建议不超过session.timeout.ms的1/3)

# 高级配置
group.protocol=classic # 消费者组协议:classic(经典)或 consumer(新版)
group.instance.id=static-consumer-1 # 静态成员ID,避免短暂离线触发再均衡

Kafka 生产者

生产者工作流程

生产者发送消息的基本流程:

  1. 序列化:将 Key 和 Value 序列化为字节数组
  2. 分区选择:确定消息发送到哪个分区
  3. 消息压缩:可选的批量压缩
  4. 批量发送:累积消息批量发送
  5. 确认接收:等待 Broker 确认

Java 生产者示例

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerDemo {
public static void main(String[] args) {
// 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 5);
props.put("enable.idempotence", true);

// 创建生产者
Producer<String, String> producer = new KafkaProducer<>(props);

// 发送消息(异步)
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key1", "value1");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("发送成功: " +
"topic=" + metadata.topic() +
", partition=" + metadata.partition() +
", offset=" + metadata.offset());
} else {
System.err.println("发送失败: " + exception.getMessage());
}
}
});

// 发送消息(同步)
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("发送成功: " + metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}

// 关闭生产者
producer.close();
}
}

消息发送方式

发送并忘记(Fire-and-Forget)

// 不关心发送结果
producer.send(new ProducerRecord<>("topic", "key", "value"));

异步发送(Async)

// 回调处理发送结果
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 处理发送失败
exception.printStackTrace();
} else {
// 处理发送成功
System.out.println("Offset: " + metadata.offset());
}
});

同步发送(Sync)

// 等待发送完成
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Offset: " + metadata.offset());
} catch (Exception e) {
// 处理异常
e.printStackTrace();
}

分区策略

默认分区器

// 如果指定分区,使用指定分区
// 如果有 Key,对 Key 哈希后取模
// 如果无 Key,使用轮询(Sticky Partition Cache)
props.put("partitioner.class",
"org.apache.kafka.clients.producer.internals.DefaultPartitioner");

自定义分区器

public class CustomPartitioner implements Partitioner {
private int specialPartition;

@Override
public void configure(Map<String, ?> configs) {
// 初始化配置
specialPartition = Integer.parseInt(
configs.get("special.partition").toString());
}

@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区逻辑
if (key != null && key.toString().startsWith("special-")) {
return specialPartition;
}
// 其他消息随机分配
return ThreadLocalRandom.current().nextInt(
cluster.partitionCountForTopic(topic));
}

@Override
public void close() {}
}

// 使用自定义分区器
props.put("partitioner.class", CustomPartitioner.class.getName());
props.put("special.partition", "0");

可靠性保证

Ack 确认机制

// acks=0: 不等待确认,最快但可能丢失
props.put("acks", "0");

// acks=1: 等待 Leader 确认,平衡性能和可靠性
props.put("acks", "1");

// acks=all: 等待所有 ISR 副本确认,最可靠
props.put("acks", "all");

幂等性生产者

// 启用幂等性,防止重复消息
props.put("enable.idempotence", true);
// 等价于:
// props.put("acks", "all");
// props.put("retries", Integer.MAX_VALUE);
// props.put("max.in.flight.requests.per.connection", 5);

事务性生产者

事务性生产者用于实现跨分区的原子写入,保证"精确一次"语义。这对于需要将消息写入多个主题、或需要将消费位置和输出结果一起提交的场景非常重要。

使用场景

  • 消费者-生产者模式:从 Kafka 消费消息,处理后写入另一个主题,需要保证消费位置和输出结果的原子性
  • 跨主题写入:一条消息需要写入多个主题,要么全部成功要么全部失败
  • 流处理应用:Kafka Streams 使用事务保证精确一次处理语义

工作原理

事务性生产者使用两阶段提交协议:

  1. 阶段一:向事务协调器注册事务,发送消息到各分区
  2. 阶段二:提交或回滚事务,事务协调器通知所有分区
// 配置事务
props.put("transactional.id", "my-transactional-id"); // 必须唯一,用于标识生产者实例

Producer<String, String> producer = new KafkaProducer<>(props);

// 初始化事务(只需执行一次)
// 这会清理该 transactional.id 之前的未完成事务
producer.initTransactions();

try {
// 开始事务
producer.beginTransaction();

// 发送多条消息到不同主题(原子性保证)
producer.send(new ProducerRecord<>("topic1", "key1", "value1"));
producer.send(new ProducerRecord<>("topic2", "key2", "value2"));

// 可以同时发送消费偏移量(用于消费-处理-生产模式)
// producer.sendOffsetsToTransaction(offsets, consumerGroupId);

// 提交事务(原子写入)
producer.commitTransaction();
} catch (Exception e) {
// 回滚事务(丢弃所有未提交的消息)
producer.abortTransaction();
}

配置要求

# 生产者配置
enable.idempotence=true # 事务要求幂等性
acks=all # 事务要求所有副本确认
transactional.id=unique-id # 必须唯一

# Broker 配置(消费者端)
isolation.level=read_committed # 只读取已提交的事务消息

注意事项

  1. transactional.id 必须唯一,同一时间只能有一个生产者实例使用
  2. 事务超时时间默认为 60 秒,长时间事务需要调整 transaction.timeout.ms
  3. 消费者需要设置 isolation.level=read_committed 才能正确过滤未提交的事务消息
  4. 事务会带来性能开销,非必要场景不要使用

Kafka 消费者

消费者工作流程

消费者消费消息的基本流程:

  1. 加入消费者组:与组协调器通信,加入消费者组
  2. 分区分配:获取分配给自己的分区
  3. 拉取消息:从分配的分区拉取消息
  4. 处理消息:处理收到的消息
  5. 提交 Offset:记录消费位置

Java 消费者示例

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {
public static void main(String[] args) {
// 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");

// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅主题
consumer.subscribe(Collections.singletonList("my-topic"));

// 消费消息
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
System.out.printf(
"topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}

// 手动提交 Offset
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}

Offset 提交方式

自动提交

// 配置自动提交
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");

// 注意:自动提交可能导致消息丢失或重复消费

手动同步提交

// 处理完消息后同步提交
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
process(record);
}
// 同步提交,会阻塞直到提交成功
consumer.commitSync();
} catch (CommitFailedException e) {
// 提交失败处理
}

手动异步提交

// 异步提交,不阻塞
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
System.err.println("提交失败: " + offsets);
} else {
System.out.println("提交成功: " + offsets);
}
}
});

指定 Offset 提交

// 按分区指定 Offset 提交
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("topic", 0),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(offsets);

消费者再均衡

当消费者组中的消费者发生变化时,会触发再均衡(Rebalance):

触发条件

  • 新消费者加入
  • 消费者离线或崩溃
  • 主题分区数变化

再均衡类型

Kafka 支持两种再均衡协议:

1. Eager(急切)再均衡

传统的再均衡方式,所有消费者停止消费,释放所有分区,然后重新分配:

触发再均衡 → 所有消费者停止消费 → 释放所有分区 → 重新分配 → 恢复消费

缺点:会导致短暂的消费停顿,影响吞吐量。

2. Cooperative(协作式)再均衡

Kafka 2.4+ 引入的新协议,只重新分配需要变更的分区,不影响其他分区:

触发再均衡 → 只有受影响的消费者停止消费 → 只重新分配变更的分区 → 渐进式恢复

优势:减少再均衡的影响范围,降低消费中断时间。

分区分配策略

// 可用的分配策略
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

// RangeAssignor(默认):按主题范围分配
// RoundRobinAssignor:轮询分配
// StickyAssignor:粘性分配,尽量保持原有分配
// CooperativeStickyAssignor:协作式粘性分配(推荐)

静态成员

静态成员可以避免因短暂离线(如重启)触发的再均衡:

// 配置静态成员
props.put("group.instance.id", "consumer-1-static");

// 配置较大的会话超时,允许短暂离线
props.put("session.timeout.ms", "300000"); // 5分钟
props.put("max.poll.interval.ms", "600000"); // 10分钟

工作原理

  • 静态成员离开组时,不会立即触发再均衡
  • session.timeout.ms 内重新加入,可以继续消费原有分区
  • 只有超时未返回,才会触发再均衡

适用场景

  • 需要重启更新的应用
  • 维护期间的短暂离线
  • 减少不必要的再均衡

再均衡监听器

consumer.subscribe(Collections.singletonList("topic"), 
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 分区被收回前,提交 Offset
consumer.commitSync();
System.out.println("分区被收回: " + partitions);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 分区被分配后,可以从指定位置开始消费
System.out.println("分区被分配: " + partitions);
// consumer.seekToBeginning(partitions);
// consumer.seekToEnd(partitions);
}
});

指定位置消费

// 从最早的消息开始消费
consumer.seekToBeginning(Collections.singletonList(
new TopicPartition("topic", 0)));

// 从最新的消息开始消费
consumer.seekToEnd(Collections.singletonList(
new TopicPartition("topic", 0)));

// 从指定 Offset 开始消费
consumer.seek(new TopicPartition("topic", 0), 100L);

// 从指定时间戳开始消费
long timestamp = System.currentTimeMillis() - 3600000; // 1小时前
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(new TopicPartition("topic", 0), timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
if (entry.getValue() != null) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
}

多线程消费

KafkaConsumer 不是线程安全的,但可以在主线程中消费,将消息分发给工作线程处理:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiThreadConsumer {
private final KafkaConsumer<String, String> consumer;
private final ExecutorService executor;

public MultiThreadConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "multi-thread-group");
// ... 其他配置
this.consumer = new KafkaConsumer<>(props);
this.executor = Executors.newFixedThreadPool(10);
}

public void consume() {
consumer.subscribe(Collections.singletonList("topic"));

try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));

for (final ConsumerRecord<String, String> record : records) {
// 提交任务到线程池
executor.submit(() -> {
processRecord(record);
});
}

// 等待所有任务完成后提交 Offset
// 需要自行实现任务跟踪
}
} finally {
executor.shutdown();
consumer.close();
}
}

private void processRecord(ConsumerRecord<String, String> record) {
// 处理消息
}
}

Kafka Streams

Kafka Streams 是 Kafka 提供的流处理库,可以轻松构建实时流处理应用。

核心概念

概念说明
KStream消息流,每条记录都是独立的
KTable变更日志流,记录键的最新值
GlobalKTable全局表,所有实例都有完整数据
Topology处理逻辑的有向无环图

WordCount 示例

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCount {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

// 从主题读取数据
KStream<String, String> source = builder.stream("input-topic");

// 处理数据:分割单词、计数
KTable<String, Long> counts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\s+")))
.groupBy((key, word) -> word)
.count(Materialized.as("counts-store"));

// 输出到主题
counts.toStream().to("output-topic",
Produced.with(Serdes.String(), Serdes.Long()));

// 创建并启动 Kafka Streams
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);

CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}

常用操作

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input");

// 过滤
KStream<String, String> filtered = stream.filter((key, value) -> value.length() > 5);

// 映射
KStream<String, String> mapped = stream.mapValues(value -> value.toUpperCase());

// 分组聚合
KTable<String, Long> aggregated = stream
.groupBy((key, value) -> value)
.count();

// 窗口聚合
KTable<Windowed<String>, Long> windowed = stream
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();

// Join
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
KStream<String, String> joined = stream1.join(stream2,
(value1, value2) -> value1 + "-" + value2,
JoinWindows.of(Duration.ofMinutes(5)));

Kafka Connect

Kafka Connect 是 Kafka 提供的数据导入导出工具,可以轻松连接各种数据系统。

运行 Connect

# 分布式模式
bin/connect-distributed.sh config/connect-distributed.properties

# 单机模式
bin/connect-standalone.sh config/connect-standalone.properties

文件源连接器配置

{
"name": "file-source",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/var/log/app.log",
"topic": "log-topic",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}

JDBC 源连接器配置

{
"name": "jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydb",
"connection.user": "root",
"connection.password": "password",
"topic.prefix": "mysql-",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "users, orders",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}

管理 Connect

# 列出连接器
curl http://localhost:8083/connectors

# 创建连接器
curl -X POST -H "Content-Type: application/json" \
--data @connector.json \
http://localhost:8083/connectors

# 查看连接器状态
curl http://localhost:8083/connectors/file-source/status

# 删除连接器
curl -X DELETE http://localhost:8083/connectors/file-source

Kafka 运维管理

主题管理

# 创建主题
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 3 \
--replication-factor 2

# 查看主题列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看主题详情
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic

# 修改主题配置
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=86400000

# 增加分区(只能增加,不能减少)
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 6

# 删除主题
kafka-topics.sh --delete \
--bootstrap-server localhost:9092 \
--topic my-topic

消费者组管理

# 查看消费者组列表
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 查看消费者组详情
kafka-consumer-groups.sh --describe \
--bootstrap-server localhost:9092 \
--group my-group

# 查看消费者组成员
kafka-consumer-groups.sh --members \
--bootstrap-server localhost:9092 \
--group my-group

# 重置 Offset
kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server localhost:9092 \
--group my-group \
--topic my-topic \
--to-earliest \
--execute

# 删除消费者组
kafka-consumer-groups.sh --delete \
--bootstrap-server localhost:9092 \
--group my-group

性能测试

# 生产者性能测试
kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 1000000 \
--record-size 1000 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092

# 消费者性能测试
kafka-consumer-perf-test.sh \
--topic test-topic \
--messages 1000000 \
--bootstrap-server localhost:9092

集群监控

关键指标

指标类别具体指标说明
BrokerUnderReplicatedPartitions未完全同步的分区数
OfflinePartitionsCount离线分区数
ActiveControllerCount活跃控制器数量
TopicMessagesInPerSec每秒消息数
BytesInPerSec每秒写入字节数
BytesOutPerSec每秒读取字节数
ConsumerConsumerLag消费延迟
ConsumerFetchRate拉取速率

监控工具

  • Kafka Manager / CMAK:集群管理工具
  • Prometheus + Grafana:指标监控
  • Burrow:消费者延迟监控
  • Kafka Exporter:Prometheus 导出器

最佳实践

Topic 命名规范

<领域>.<业务>.<事件类型>

示例:
user.activity.login
order.payment.completed
system.log.error

分区数选择

分区数 = min(目标吞吐量 / 单分区最大吞吐量, 总分区数上限)

考虑因素:
- 单分区的顺序性保证
- 消费者的并行度
- Broker 的文件句柄数量

消息 Key 设计

// 选择合适的 Key 保证分区有序性
// 例如:订单相关消息使用订单ID作为Key
String key = orderId;

// 需要严格顺序的场景
String key = userId + "_" + sessionId;

// 时间窗口数据的Key设计
String key = userId + "_" + (timestamp / windowSize);

错误处理

// 生产者错误处理
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// 可重试错误:网络问题、Leader选举等
retryQueue.add(record);
} else {
// 不可重试错误:序列化失败、权限问题等
deadLetterQueue.add(record);
}
}
});

// 消费者错误处理
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
process(record);
} catch (Exception e) {
// 记录错误,跳过此消息
log.error("处理失败: " + record, e);
// 发送到死信队列
sendToDeadLetterQueue(record);
}
}
consumer.commitSync();
} catch (WakeupException e) {
// 正常关闭
} catch (Exception e) {
// 其他错误
}

性能优化

生产者优化

// 批量发送
props.put("batch.size", 32768);
props.put("linger.ms", 10);

// 压缩
props.put("compression.type", "lz4");

// 缓冲区
props.put("buffer.memory", 67108864);

消费者优化

// 批量拉取
props.put("max.poll.records", 1000);
props.put("fetch.min.bytes", 1048576);
props.put("fetch.max.wait.ms", 500);

Broker 优化

# 增加网络和IO线程
num.network.threads=8
num.io.threads=16

# 增加日志刷新批次大小
log.flush.interval.messages=10000
log.flush.interval.ms=1000

# 调整JVM参数
KAFKA_HEAP_OPTS="-Xms8g -Xmx8g"
KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20"

小结

本章介绍了 Kafka 的核心概念和使用方法:

  1. 核心概念:消息、主题、分区、Broker、消费者组、Offset
  2. 架构模式:KRaft 模式(Kafka 4.0+ 默认,已移除 ZooKeeper 依赖)
  3. 生产者:发送方式、分区策略、幂等性、事务保证
  4. 消费者:Offset 管理、再均衡机制、静态成员、协作式再均衡
  5. Kafka Streams:流处理 API
  6. Kafka Connect:数据导入导出
  7. 运维管理:主题管理、监控、性能优化

Kafka 版本演进要点

版本重要特性
Kafka 0.11引入幂等性生产者、事务支持
Kafka 2.1引入 ZStandard 压缩
Kafka 2.3引入增量再均衡
Kafka 2.4协作式再均衡、静态成员
Kafka 2.8KRaft 模式早期访问
Kafka 3.0KRaft 模式生产就绪
Kafka 4.0完全移除 ZooKeeper,KRaft 成为默认,新消费者组协议 GA

Kafka 4.0 重要变更详解

Kafka 4.0 是一个里程碑版本,包含多项重大变更:

1. ZooKeeper 完全移除

从 Kafka 4.0 开始,ZooKeeper 模式被完全移除,KRaft 成为唯一的元数据管理方式。这意味着:

  • 新部署必须使用 KRaft 模式
  • 从旧版本升级需要先迁移到 KRaft 模式
  • 简化了运维复杂度,不再需要管理两套系统

2. Java 版本要求变更

组件Kafka 3.xKafka 4.0
Kafka ClientsJava 8+Java 11+
Kafka StreamsJava 8+Java 11+
Kafka BrokerJava 8+Java 17+
Kafka ConnectJava 8+Java 17+

升级注意:升级前请确保客户端版本在 2.1 或以上,否则会出现兼容性问题。

3. 新消费者组协议(KIP-848)

Kafka 4.0 正式发布了新一代消费者组协议,解决了传统协议的"停止-世界"再均衡问题:

# 启用新消费者组协议(客户端配置)
group.protocol=consumer

新协议优势

  • 消费者组再均衡不再需要停止所有消费者
  • 大幅降低再均衡延迟
  • 提高大集群的稳定性
  • 简化客户端实现

4. 生产者 linger.ms 默认值变更

linger.ms 默认值从 0 改为 5ms。这是因为更大的批次通常能带来更好的吞吐量,而延迟影响很小。对于对延迟敏感的场景,可以手动调低此值。

5. 事务协议增强(KIP-890)

第二阶段的事务服务端防护已完成,减少了生产者故障时"僵尸事务"的可能性。

6. 合格领导者副本(KIP-966,预览功能)

KIP-966 引入了合格领导者副本(Eligible Leader Replicas,ELR)作为预览功能,解决了传统 ISR 选举可能丢失数据的问题:

问题背景

在传统的 ISR(In-Sync Replicas)机制中,当 Leader 故障时,会选择 ISR 中的任意副本成为新 Leader。但 ISR 中的副本可能落后于高水位(High Watermark),这意味着:

  • 某些已提交的消息可能在新 Leader 上不存在
  • 故障恢复后可能丢失数据
  • 需要等待所有 ISR 副本同步才能继续服务

ELR 解决方案

ELR 是 ISR 的子集,保证拥有完整的高水位数据。当 Leader 故障时:

  • 优先从 ELR 中选举新 Leader
  • 确保 New Leader 拥有所有已提交的消息
  • 防止数据丢失
传统 ISR 选举:
ISR = [Replica1, Replica2, Replica3]
Leader 故障 → 从 ISR 中选举新 Leader
问题:ISR 中的副本可能落后于高水位,选举后可能丢失数据

ELR 选举:
ISR = [Replica1, Replica2, Replica3]
ELR = [Replica1, Replica2] // 保证有完整数据的副本
Leader 故障 → 优先从 ELR 中选举
优势:确保新 Leader 拥有完整数据,防止数据丢失

配置示例

# Broker 配置
# 启用 ELR 功能(预览功能)
elr.enabled=true

# ELR 最小副本数
elr.min.replicas=2

# 副本进入 ELR 的条件:必须完全同步到高水位

适用场景

  • 金融交易系统:对数据一致性要求极高
  • 订单处理系统:不能容忍数据丢失
  • 审计日志系统:需要保证数据完整性

注意事项

  • ELR 功能目前处于预览阶段,不建议生产环境使用
  • ELR 会增加 Leader 选举的延迟(需要等待更多副本同步)
  • 需要权衡可用性和一致性

7. 基于时间的 Offset 重置(KIP-1106)

传统上,消费者只能从最早、最新或指定 Offset 开始消费。KIP-1106 引入了基于时间的 Offset 重置策略:

# 新增的 offset 重置选项
# 从指定时间开始消费(支持多种格式)
auto.offset.reset=earliest
auto.offset.reset=latest
auto.offset.reset=none

# 新增:使用 seekTo 时间戳
# 在代码中使用
consumer.seekToBeginning(partitions); // 从最早开始
consumer.seekToEnd(partitions); // 从最新开始

// 从指定时间开始(新功能)
long timestamp = System.currentTimeMillis() - 3600000; // 1小时前
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(new TopicPartition("topic", 0), timestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);

8. kafka-groups.sh 新工具

Kafka 4.0 引入了新的 kafka-groups.sh 工具,用于统一管理所有类型的消费者组:

# 列出所有类型的组(消费者组、共享组等)
kafka-groups.sh --bootstrap-server localhost:9092 --list

# 查看组详情(自动识别组类型)
kafka-groups.sh --bootstrap-server localhost:9092 \
--group my-group --describe

# 查看组成员
kafka-groups.sh --bootstrap-server localhost:9092 \
--group my-group --members --verbose

与旧工具的区别

工具说明
kafka-consumer-groups.sh仅管理传统消费者组
kafka-share-groups.sh仅管理共享组
kafka-groups.sh统一管理所有类型的组(推荐使用)

9. 客户端指标推送增强(KIP-1076)

KIP-1076 扩展了 KIP-714,允许应用程序在客户端指标中添加自定义指标:

// 注册自定义指标
Map<String, String> customMetrics = new HashMap<>();
customMetrics.put("app.processing.time.ms", String.valueOf(processingTime));
customMetrics.put("app.queue.size", String.valueOf(queueSize));

// 通过 Kafka 客户端 API 推送
// 这些指标会随客户端指标一起发送到 Broker

10. 移除的 API 和配置

Kafka 4.0 移除了大量已弃用的 API:

  • 旧的消息格式 v0 和 v1
  • poll(long) 方法(请使用 poll(Duration)
  • alterConfigs Admin API(请使用 incrementalAlterConfigs
  • MirrorMaker 1(MM1)(请使用 MM2)
  • Log4j 1.x(迁移到 Log4j 2.x)

学习建议

  1. 从基础开始:先理解消息、主题、分区、消费者组等核心概念
  2. 实践操作:使用命令行工具进行主题管理、消息收发
  3. 理解可靠性:掌握 acks、幂等性、事务的使用场景
  4. 优化性能:根据业务需求调整批量大小、压缩算法等参数
  5. 监控运维:关注 Consumer Lag、分区均衡等关键指标

Kafka 是现代大数据架构的核心组件,掌握 Kafka 对于构建实时数据系统至关重要。在实际项目中,Kafka 常与 Spark、Flink、HBase 等组件配合使用,构建完整的大数据处理平台。

参考资源