Kafka 生产者
生产者(Producer)负责将消息发送到 Kafka 主题。本章将详细介绍 Kafka 生产者的工作原理、配置选项和使用方法。
生产者概述
工作原理
┌─────────────────────────────────────────────────────────────┐
│ 生产者工作原理 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Producer │ │
│ │ │ │
│ │ ┌──────────┐ │ ┌─────────────┐ ┌─────────────┐ │
│ │ │ Buffer │──────▶│ Sender │───▶│ Brokers │ │
│ │ │ Queue │ │ │ Thread │ │ │ │
│ │ └──────────┘ │ └─────────────┘ └─────────────┘ │
│ │ │ │ │ │
│ │ │ ▼ ▼ │
│ │ │ ┌─────────────┐ ┌─────────────┐ │
│ │ │ │ Metadata │ │ Meta │ │
│ │ │ │ Request │ │ Response │ │
│ └──────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ 流程说明: │
│ 1. 消息进入缓冲区 │
│ 2. Sender 线程从缓冲区获取消息 │
│ 3. 获取主题元数据(分区 Leader 位置) │
│ 4. 发送请求到 Broker │
│ 5. 等待响应 │
│ │
└─────────────────────────────────────────────────────────────┘
核心组件
- 分区器(Partitioner):决定消息发送到哪个分区
- 缓冲区(Buffer):批量发送消息的缓冲区
- Sender 线程:后台发送消息的线程
- 元数据更新器:定期更新主题元数据
快速开始
添加 Maven 依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
最简单的生产者
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// 1. 配置生产者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建生产者
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
// 3. 创建消息
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic", // 主题
"key", // 键(可选,用于分区)
"Hello Kafka!" // 值
);
// 4. 发送消息(同步等待)
producer.send(record).get();
System.out.println("Message sent successfully!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
异步发送
// 异步发送,带回调
producer.send(record, (metadata, exception) -> {
if (exception == null) {
// 发送成功
System.out.println("Sent: " + record.value() +
" to partition: " + metadata.partition() +
" at offset: " + metadata.offset());
} else {
// 发送失败
exception.printStackTrace();
}
});
// 不需要等待,继续发送其他消息
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("topic", "message " + i));
}
生产者配置详解
必需配置
| 配置项 | 说明 | 示例 |
|---|---|---|
bootstrap.servers | Broker 地址列表 | localhost:9092 |
key.serializer | 键的序列化器 | StringSerializer |
value.serializer | 值的序列化器 | StringSerializer |
性能相关配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");
// 缓冲区大小(默认 32MB)
// 越大批量发送效率越高,但占用内存越多
props.put("buffer.memory", 33554432);
// 批量发送大小(默认 16KB)
// 达到此大小会立即发送,不管是否到等待时间
props.put("batch.size", 16384);
// 等待时间(默认 0ms)
// 配合 batch.size 使用,延迟发送以积累更多消息
props.put("linger.ms", 10);
// 压缩类型(默认 none)
// 可选: none, gzip, snappy, lz4, zstd
props.put("compression.type", "lz4");
// 最大请求大小(默认 1MB)
props.put("max.request.size", 1048576);
可靠性配置
// Ack 确认机制(默认 1)
// 0: 不等待确认(最高吞吐,最低可靠)
// 1: Leader 确认即返回
// -1/all: 所有副本确认(最高可靠,最低吞吐)
props.put("acks", "all");
// 重试次数(默认 Integer.MAX_VALUE)
props.put("retries", 3);
// 重试间隔(默认 100ms)
props.put("retry.backoff.ms", 100);
// 客户端 ID
props.put("client.id", "my-producer");
超时配置
// 请求超时(默认 30s)
props.put("request.timeout.ms", 30000);
// 交付超时(默认 120s)
// 消息发送的最大总时间,超过则失败
props.put("delivery.timeout.ms", 120000);
序列化器
内置序列化器
// 字符串序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 字节数组序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// 整数序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
// Long 序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.LongSerializer");
自定义序列化器
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
public class UserSerializer implements Serializer<User> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public byte[] serialize(String topic, User data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize User", e);
}
}
@Override
public void close() {}
}
使用 JSON
// 添加依赖
// <dependency>
// <groupId>com.fasterxml.jackson.core</groupId>
// <artifactId>jackson-databind</artifactId>
// </dependency>
// 使用 JSON 序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 发送 JSON
User user = new User(1, "张三", "[email protected]");
String json = new ObjectMapper().writeValueAsString(user);
producer.send(new ProducerRecord<>("users", user.getId().toString(), json));
分区器
默认分区器
Kafka 提供三种内置分区器:
- DefaultPartitioner:默认分区器
- RoundRobinPartitioner:轮询分区器
- UniformStickyPartitioner:统一粘性分区器
// 使用粘性分区(推荐,用于高吞吐场景)
props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
// 或者使用轮询
props.put("partitioner.class", "org.apache.kafka.clients.producer.RoundRobinPartitioner");
自定义分区器
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class UserIdPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 没有 Key,使用随机分区
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
}
// 提取用户 ID
String keyStr = new String(keyBytes);
// VIP 用户固定到第一个分区(保证 VIP 消息优先处理)
if (keyStr.startsWith("VIP:")) {
return 0;
}
// 普通用户使用哈希分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
消息发送模式
1. 发送并忘记(Fire and Forget)
// 不等待结果,消息可能丢失
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>("topic", "message " + i));
}
2. 同步发送
// 同步等待结果
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(); // 阻塞
// 或者带超时
try {
metadata = future.get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// 处理超时
}
3. 异步发送(推荐)
// 带回调的异步发送
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 记录错误
exception.printStackTrace();
} else {
// 处理成功结果
System.out.println("Partition: " + metadata.partition() +
", Offset: " + metadata.offset());
}
});
// 批量异步发送
List<ProducerRecord<String, String>> records = createRecords();
Callback callback = (metadata, ex) -> {
if (ex != null) {
ex.printStackTrace();
}
};
records.forEach(r -> producer.send(r, callback));
批量发送优化
批量发送原理
┌─────────────────────────────────────────────────────────────┐
│ 批量发送机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ProducerRecord ──┐ │
│ ProducerRecord ──┼──> Buffer ──> Batch ──> Sender ──> Broker │
│ ProducerRecord ──┘ │ │
│ │ │
│ batch.size=16KB │
│ linger.ms=10 │
│ │
│ 发送条件(满足任一即发送): │
│ 1. 批次达到 batch.size │
│ 2. 等待时间达到 linger.ms │
│ 3. 缓冲区满 │
│ │
└─────────────────────────────────────────────────────────────┘
配置示例
// 高吞吐配置
Properties highThroughputProps = new Properties();
highThroughputProps.put("bootstrap.servers", "localhost:9092");
highThroughputProps.put("key.serializer", "StringSerializer");
highThroughputProps.put("value.serializer", "StringSerializer");
// 增大缓冲区
highThroughputProps.put("buffer.memory", 67108864); // 64MB
// 增大批量大小
highThroughputProps.put("batch.size", 32768); // 32KB
// 等待更长以积累更多消息
highThroughputProps.put("linger.ms", 50); // 50ms
// 使用压缩
highThroughputProps.put("compression.type", "lz4");
// 高可靠配置
Properties highReliabilityProps = new Properties();
highReliabilityProps.put("bootstrap.servers", "localhost:9092");
highReliabilityProps.put("key.serializer", "StringSerializer");
highReliabilityProps.put("value.serializer", "StringSerializer");
// 等待所有副本确认
highReliabilityProps.put("acks", "all");
// 启用幂等性
highReliabilityProps.put("enable.idempotence", "true");
// 设置幂等性相关参数
highReliabilityProps.put("max.in.flight.requests.per.connection", 5);
highReliabilityProps.put("retries", 3);
幂等性
什么是幂等性?
幂等性确保消息只会被精确一次处理,即使生产者重试也不会产生重复消息。
┌─────────────────────────────────────────────────────────────┐
│ 幂等性原理 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 无幂等性: │
│ Producer ──[Msg1]──> Broker ──[Msg1]──> 成功 │
│ │ │
│ └────[Msg1 (重试)]──> Broker ──[Msg1]──> 重复! │
│ │
│ 有幂等性: │
│ Producer ──[Msg1, PID=1, Seq=0]──> Broker │
│ │ │
│ └────[Msg1, PID=1, Seq=0]──> Broker ──[ACK]──> 去重 │
│ │
│ PID: Producer ID │
│ Seq: 消息序列号 │
│ │
└─────────────────────────────────────────────────────────────┘
启用幂等性
props.put("enable.idempotence", "true");
// 推荐配置(幂等性自动设置)
props.put("max.in.flight.requests.per.connection", 5);
props.put("retries", 3);
props.put("acks", "all");
幂等性条件
enable.idempotence = trueacks = allretries > 0max.in.flight.requests.per.connection <= 5
事务
什么是事务?
Kafka 事务确保一批消息要么全部成功,要么全部失败。
// 配置事务
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");
props.put("transactional.id", "my-transactional-id"); // 事务 ID
props.put("enable.idempotence", "true");
Producer<String, String> producer = new KafkaProducer<>(props);
// 初始化事务
producer.initTransactions();
try {
// 开始事务
producer.beginTransaction();
// 发送多条消息
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("topic1", "key", "value1-" + i));
producer.send(new ProducerRecord<>("topic2", "key", "value2-" + i));
}
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
// 回滚事务
producer.abortTransaction();
e.printStackTrace();
}
使用场景
- 多主题消息原子性写入
- 消费者-生产者事务(exactly-once semantics)
- 数据管道 Exactly-Once 处理
错误处理
可重试错误
// 配置重试
props.put("retries", 3);
props.put("retry.backoff.ms", 1000);
producer.send(record, (metadata, exception) -> {
// 判断是否可重试
if (exception instanceof RetriableException) {
// 可以重试,例如:Broker 不可用
System.out.println("Retriable error: " + exception.getMessage());
} else {
// 不可重试,例如:消息太大
System.out.println("Non-retriable error: " + exception.getMessage());
}
});
常见错误类型
| 错误类型 | 说明 | 是否可重试 |
|---|---|---|
NotEnoughReplicasException | 副本不足 | 是 |
LeaderNotAvailableException | Leader 不可用 | 是 |
OffsetOutOfRangeException | 偏移量越界 | 否 |
MessageSizeTooLargeException | 消息太大 | 否 |
UnknownTopicOrPartitionException | 主题/分区不存在 | 是 |
监控指标
关键指标
// 获取生产者指标
Map<MetricName, ? extends Metric> metrics = producer.metrics();
// 常用指标
double sendRate = metrics.get(new MetricName("record-send-rate", "producer-metrics")).value();
double requestLatency = metrics.get(new MetricName("request-latency-avg", "producer-metrics")).value();
long recordQueueTime = (long) metrics.get(new MetricName("record-queue-time-avg", "producer-metrics")).value();
监控项
| 指标 | 说明 |
|---|---|
record-send-rate | 消息发送速率 |
request-latency-avg | 平均请求延迟 |
record-queue-time-avg | 平均排队时间 |
outgoing-byte-rate | 字节发送速率 |
error-rate | 错误率 |
最佳实践
- 使用异步发送:提高吞吐量
- 批量发送:配置
linger.ms和batch.size - 启用压缩:减少网络传输
- 启用幂等性:保证消息不重复
- 合理设置重试:处理临时错误
- 监控关键指标:及时发现问题
小结
- 生产者负责将消息发送到 Kafka 主题
- 同步发送适合需要确认的场景,异步发送适合高吞吐场景
- 批量发送和压缩可以显著提高性能
- 幂等性确保消息不重复
- 事务保证多消息原子性
下一步
接下来让我们学习 消费者 的使用,了解如何从 Kafka 消费消息。