跳到主要内容

Kafka 消费者

消费者(Consumer)负责从 Kafka 主题订阅和消费消息。本章将详细介绍 Kafka 消费者的使用方法、消费者组机制和偏移量管理。

消费者概述

工作原理

┌─────────────────────────────────────────────────────────────┐
│ 消费者工作原理 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Consumer │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Poll Loop │───▶│ Process │ │ │
│ │ │ 拉取消息 │ │ 处理消息 │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Fetcher │───▶│ Commit │ │ │
│ │ │ 获取数据 │ │ 提交偏移量 │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Brokers │ │
│ │ Topic: orders │ │
│ │ Partition 0,1,2 │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

核心概念

  1. Poll 机制:消费者主动从 Broker 拉取消息
  2. 消费者组:多个消费者组成组,共同消费主题
  3. 偏移量提交:记录消费位置,支持消息重放

快速开始

添加 Maven 依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>

最简单的消费者

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
public static void main(String[] args) {
// 1. 配置消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");

// 2. 创建消费者
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {

// 3. 订阅主题
consumer.subscribe(Collections.singletonList("my-topic"));

// 4. 拉取消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

for (ConsumerRecord<String, String> record : records) {
System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
}
}
}

消费者配置详解

必需配置

配置项说明示例
bootstrap.serversBroker 地址列表localhost:9092
group.id消费者组 IDmy-group
key.deserializer键的反序列化器StringDeserializer
value.deserializer值的反序列化器StringDeserializer

消费者组配置

// 消费者组 ID(必需)
props.put("group.id", "my-consumer-group");

// 自动提交偏移量(默认 true)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 5000);

// 手动提交偏移量
props.put("enable.auto.commit", "false");

拉取配置

// 每次拉取的最大字节数(默认 50MB)
props.put("max.poll.records", 500);

// 拉取超时(默认 500ms)
props.put("poll.timeout.ms", 500);

// 最大拉取字节数(默认 50MB)
props.put("fetch.max.bytes", 52428800);

// 最小拉取字节数(默认 1B)
props.put("fetch.min.bytes", 1);

// 等待最大时间(默认 500ms)
props.put("fetch.max.wait.ms", 500);

心跳和会话配置

// 心跳间隔(默认 3s)
// 必须在 session.timeout.ms 之前
props.put("heartbeat.interval.ms", 3000);

// 会话超时(默认 45s)
// 超过此时间无响应,消费者被认为死亡
props.put("session.timeout.ms", 45000);

// 最大拉取间隔(默认 5min)
// 超过此时间未 poll,消费者被踢出组
props.put("max.poll.interval.ms", 300000);

偏移量配置

// 自动重置偏移量策略(默认 earliest)
// earliest: 从最早消息开始
// latest: 从最新消息开始
// none: 没有找到偏移量则抛异常
props.put("auto.offset.reset", "earliest");

// 是否允许自动提交(默认 true)
props.put("enable.auto.commit", "true");

// 自动提交间隔(默认 5s)
props.put("auto.commit.interval.ms", 5000);

// 偏移量同步提交超时(默认 5s)
props.put("offset.sync.timeout.ms", 5000);

消费者组

消费者组原理

┌─────────────────────────────────────────────────────────────┐
│ 消费者组消费模型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Topic: orders (3 partitions) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ P0 │ │ P1 │ │ P2 │ │
│ │ [0,1,2] │ │ [3,4,5] │ │ [6,7,8] │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └───────────┼───────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Consumer Group: order-group │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ │ │
│ │ │Consumer1│ │Consumer2│ │ │
│ │ │ 消费 │ │ 消费 │ │ │
│ │ │ P0,P1 │ │ P2 │ │ │
│ │ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────┘ │
│ │
│ 规则: │
│ - 每个分区只能被组内一个消费者消费 │
│ - 消费者数量 <= 分区数量(多余消费者空闲) │
│ - 新消费者加入会触发 Rebalance │
│ │
└─────────────────────────────────────────────────────────────┘

分区分配策略

// 分配策略(默认 RangeAssignor)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RangeAssignor");
// 或者
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
// 或者多个策略
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.RangeAssignor," +
"org.apache.kafka.clients.consumer.RoundRobinAssignor");

分配策略对比

策略说明特点
RangeAssignor范围分配按主题逐个分配,可能不均衡
RoundRobinAssignor轮询分配所有主题轮询,分配更均衡
StickyAssignor粘性分配保持现有分配,减少重平衡影响
CooperativeStickyAssignor协作粘性支持增量重平衡,减少抖动

偏移量管理

什么是偏移量?

┌─────────────────────────────────────────────────────────────┐
│ 偏移量示意图 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Partition 0: │
│ ┌───┬───┬───┬───┬───┬───┬───┬───┬───┐ │
│ │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ ... │
│ └───┴───┴───┴───┴───┴───┴───┴───┴───┘ │
│ ▲ │
│ │ │
│ Committed Offset = 3 │
│ (下一次从 offset=3 开始消费) │
│ │
└─────────────────────────────────────────────────────────────┘

自动提交

// 启用自动提交(默认启用)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 5000);

// 自动提交原理
// 每隔 5 秒,提交最后一次 poll 返回的消息的最大偏移量
// 缺点:可能丢失已处理但未提交的消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

// 处理消息
for (ConsumerRecord<String, String> record : records) {
process(record); // 处理消息
}
// 自动提交当前已处理的最大偏移量
}

手动提交

// 禁用自动提交
props.put("enable.auto.commit", "false");

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

for (ConsumerRecord<String, String> record : records) {
process(record);
}

// 方式1:同步提交(阻塞)
consumer.commitSync();

// 方式2:异步提交(非阻塞)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
});

// 方式3:提交特定偏移量
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
}
consumer.commitSync(offsets);
}

提交策略选择

方式优点缺点适用场景
自动提交简单可能重复或丢失不敏感场景
同步提交可靠阻塞低吞吐、可靠性要求高
异步提交不阻塞可能失败高吞吐场景
组合提交平衡稍复杂推荐方案

推荐:组合提交

// 禁用自动提交
props.put("enable.auto.commit", "false");

try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

for (ConsumerRecord<String, String> record : records) {
process(record);
}

// 异步提交,失败时记录
consumer.commitAsync((offsets, ex) -> {
if (ex != null) {
log.error("Commit failed", ex);
}
});
}
} finally {
// 最后同步提交确保不丢失
try {
consumer.commitSync();
} finally {
consumer.close();
}
}

消息处理

基本处理

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

// 方式1:遍历每条消息
for (ConsumerRecord<String, String> record : records) {
System.out.println("Topic: " + record.topic());
System.out.println("Partition: " + record.partition());
System.out.println("Offset: " + record.offset());
System.out.println("Key: " + record.key());
System.out.println("Value: " + record.value());
System.out.println("Timestamp: " + record.timestamp());
}

// 方式2:按主题分区处理
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);

// 处理该分区的消息
for (ConsumerRecord<String, String> record : partitionRecords) {
process(record);
}

// 提交该分区的偏移量
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(
tp, new OffsetAndMetadata(lastOffset + 1)
));
}
}

处理 JSON 消息

import com.fasterxml.jackson.databind.ObjectMapper;

ObjectMapper objectMapper = new ObjectMapper();

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

for (ConsumerRecord<String, String> record : records) {
try {
// 反序列化 JSON
Order order = objectMapper.readValue(record.value(), Order.class);

// 处理订单
processOrder(order);

} catch (Exception e) {
// 处理解析错误
log.error("Failed to parse message", e);
}
}
}

消费者 Rebalance

什么是 Rebalance?

Rebalance 是消费者组内分区所有权重新分配的过程。

┌─────────────────────────────────────────────────────────────┐
│ Rebalance 过程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 触发条件: │
│ - 消费者加入或离开组 │
│ - 主题增加或减少分区 │
│ - 订阅主题变更 │
│ │
│ Rebalance 过程: │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │JoinGroup│────▶│SyncGroup │────▶│Consuming│ │
│ │ 阶段 │ │ 阶段 │ │ 阶段 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Coordinator 协调 │ │
│ │ - 收集所有消费者的分区偏好 │ │
│ │ - 计算新的分区分配方案 │ │
│ │ - 分发给所有消费者 │ │
│ └─────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

Rebalance 监听器

// 设置 Rebalance 监听器
consumer.subscribe(Collections.singletonList("my-topic"), new ConsumerRebalanceListener() {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 分区被剥夺前调用
// 提交当前处理的偏移量
System.out.println("Partitions revoked: " + partitions);

// 同步提交偏移量
consumer.commitSync(currentOffsets);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 分区分配完成后调用
System.out.println("Partitions assigned: " + partitions);
}
});

减少 Rebalance 影响

// 增加心跳间隔
props.put("heartbeat.interval.ms", 3000);

// 增加会话超时
props.put("session.timeout.ms", 60000);

// 增加最大拉取间隔(防止处理时间过长导致 Rebalance)
props.put("max.poll.interval.ms", 300000);

// 减少每次拉取数量
props.put("max.poll.records", 500);

// 使用粘性分配策略
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

反序列化器

内置反序列化器

// 字符串反序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 字节数组反序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

// 整数反序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");

自定义反序列化器

import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;

public class UserDeserializer implements Deserializer<User> {
private ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public User deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, User.class);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize User", e);
}
}

@Override
public void close() {}
}

独立消费者

什么是独立消费者?

不加入消费者组,订阅特定分区消费:

// 不设置 group.id(或者设置不同的 ID)
props.put("group.id", ""); // 独立消费者

// 订阅特定分区
consumer.assign(Arrays.asList(
new TopicPartition("my-topic", 0),
new TopicPartition("my-topic", 2)
));

// 从特定偏移量开始消费
consumer.seek(new TopicPartition("my-topic", 0), 100);
consumer.seekToBeginning(Arrays.asList(
new TopicPartition("my-topic", 0)
));
consumer.seekToEnd(Arrays.asList(
new TopicPartition("my-topic", 0)
));

偏移量查找

// 从指定偏移量开始消费
TopicPartition partition = new TopicPartition("my-topic", 0);
consumer.assign(Arrays.asList(partition));

// 方式1:指定偏移量
consumer.seek(partition, 100);

// 方式2:从头开始
consumer.seekToBeginning(Collections.singleton(partition));

// 方式3:从最新开始
consumer.seekToEnd(Collections.singleton(partition));

// 方式4:查找特定时间戳的消息
List<PartitionInfo> partitionInfos = consumer.partitionsFor("my-topic");
List<TopicPartition> partitions = partitionInfos.stream()
.map(info -> new TopicPartition(info.topic(), info.partition()))
.collect(Collectors.toList());

Map<TopicPartition, Long> timestampsToSearch = partitions.stream()
.collect(Collectors.toMap(p -> p, p -> System.currentTimeMillis() - 3600000));

Map<TopicPartition, OffsetAndTimestamp> offsets =
consumer.offsetsForTimes(timestampsToSearch);

for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
if (entry.getValue() != null) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
}

消费者多线程

单线程消费者

// 每个线程一个消费者(推荐)
public class KafkaConsumerRunner implements Runnable {
private final KafkaConsumer<String, String> consumer;

public void run() {
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
process(records);
}
} finally {
consumer.close();
}
}
}

// 启动多个消费者线程
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executor.submit(new KafkaConsumerRunner());
}

线程安全

Kafka Consumer 不是线程安全的,每个线程需要独立的 Consumer 实例。

监控指标

关键指标

// 获取消费者指标
Map<MetricName, ? extends Metric> metrics = consumer.metrics();

// 常用指标
double pollRate = metrics.get(new MetricName("poll-rate", "consumer-metrics")).value();
double fetchRate = metrics.get(new MetricName("fetch-rate", "consumer-metrics")).value();
double commitRate = metrics.get(new MetricName("commit-rate", "consumer-metrics")).value();
double recordProcessTime = metrics.get(new MetricName("record-processing-time-avg", "consumer-metrics")).value();

最佳实践

  1. 使用消费者组:实现负载均衡和容错
  2. 手动提交偏移量:更好的控制消息处理
  3. 处理 Rebalance:合理设置监听器和超时
  4. 监控消费延迟:关注 consumer-lag 指标
  5. 优雅关闭:使用 try-with-resources 或显式 close

小结

  1. 消费者通过 Poll 机制从 Kafka 拉取消息
  2. 消费者组实现负载均衡,分区只能被组内一个消费者消费
  3. 偏移量记录消费位置,支持消息重放
  4. 手动提交比自动提交更可控
  5. Rebalance 会影响消费,需要优化减少其影响

下一步

接下来让我们学习 Broker 和集群 的机制。