跳到主要内容

Kafka 生产者

生产者(Producer)负责将消息发送到 Kafka 主题。本章将详细介绍 Kafka 生产者的工作原理、配置选项和使用方法。

生产者概述

工作原理

┌─────────────────────────────────────────────────────────────┐
│ 生产者工作原理 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Producer │ │
│ │ │ │
│ │ ┌──────────┐ │ ┌─────────────┐ ┌─────────────┐ │
│ │ │ Buffer │──────▶│ Sender │───▶│ Brokers │ │
│ │ │ Queue │ │ │ Thread │ │ │ │
│ │ └──────────┘ │ └─────────────┘ └─────────────┘ │
│ │ │ │ │ │
│ │ │ ▼ ▼ │
│ │ │ ┌─────────────┐ ┌─────────────┐ │
│ │ │ │ Metadata │ │ Meta │ │
│ │ │ │ Request │ │ Response │ │
│ └──────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ 流程说明: │
│ 1. 消息进入缓冲区 │
│ 2. Sender 线程从缓冲区获取消息 │
│ 3. 获取主题元数据(分区 Leader 位置) │
│ 4. 发送请求到 Broker │
│ 5. 等待响应 │
│ │
└─────────────────────────────────────────────────────────────┘

核心组件

  1. 分区器(Partitioner):决定消息发送到哪个分区
  2. 缓冲区(Buffer):批量发送消息的缓冲区
  3. Sender 线程:后台发送消息的线程
  4. 元数据更新器:定期更新主题元数据

快速开始

添加 Maven 依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>

最简单的生产者

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class SimpleProducer {
public static void main(String[] args) {
// 1. 配置生产者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 2. 创建生产者
try (Producer<String, String> producer = new KafkaProducer<>(props)) {

// 3. 创建消息
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic", // 主题
"key", // 键(可选,用于分区)
"Hello Kafka!" // 值
);

// 4. 发送消息(同步等待)
producer.send(record).get();

System.out.println("Message sent successfully!");
} catch (Exception e) {
e.printStackTrace();
}
}
}

异步发送

// 异步发送,带回调
producer.send(record, (metadata, exception) -> {
if (exception == null) {
// 发送成功
System.out.println("Sent: " + record.value() +
" to partition: " + metadata.partition() +
" at offset: " + metadata.offset());
} else {
// 发送失败
exception.printStackTrace();
}
});

// 不需要等待,继续发送其他消息
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("topic", "message " + i));
}

生产者配置详解

必需配置

配置项说明示例
bootstrap.serversBroker 地址列表localhost:9092
key.serializer键的序列化器StringSerializer
value.serializer值的序列化器StringSerializer

性能相关配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");

// 缓冲区大小(默认 32MB)
// 越大批量发送效率越高,但占用内存越多
props.put("buffer.memory", 33554432);

// 批量发送大小(默认 16KB)
// 达到此大小会立即发送,不管是否到等待时间
props.put("batch.size", 16384);

// 等待时间(默认 0ms)
// 配合 batch.size 使用,延迟发送以积累更多消息
props.put("linger.ms", 10);

// 压缩类型(默认 none)
// 可选: none, gzip, snappy, lz4, zstd
props.put("compression.type", "lz4");

// 最大请求大小(默认 1MB)
props.put("max.request.size", 1048576);

可靠性配置

// Ack 确认机制(默认 1)
// 0: 不等待确认(最高吞吐,最低可靠)
// 1: Leader 确认即返回
// -1/all: 所有副本确认(最高可靠,最低吞吐)
props.put("acks", "all");

// 重试次数(默认 Integer.MAX_VALUE)
props.put("retries", 3);

// 重试间隔(默认 100ms)
props.put("retry.backoff.ms", 100);

// 客户端 ID
props.put("client.id", "my-producer");

超时配置

// 请求超时(默认 30s)
props.put("request.timeout.ms", 30000);

// 交付超时(默认 120s)
// 消息发送的最大总时间,超过则失败
props.put("delivery.timeout.ms", 120000);

序列化器

内置序列化器

// 字符串序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 字节数组序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

// 整数序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

// Long 序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.LongSerializer");

自定义序列化器

import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;

public class UserSerializer implements Serializer<User> {
private ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(Map<String, ?> configs, boolean isKey) {}

@Override
public byte[] serialize(String topic, User data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize User", e);
}
}

@Override
public void close() {}
}

使用 JSON

// 添加依赖
// <dependency>
// <groupId>com.fasterxml.jackson.core</groupId>
// <artifactId>jackson-databind</artifactId>
// </dependency>

// 使用 JSON 序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 发送 JSON
User user = new User(1, "张三", "[email protected]");
String json = new ObjectMapper().writeValueAsString(user);
producer.send(new ProducerRecord<>("users", user.getId().toString(), json));

分区器

默认分区器

Kafka 提供三种内置分区器:

  1. DefaultPartitioner:默认分区器
  2. RoundRobinPartitioner:轮询分区器
  3. UniformStickyPartitioner:统一粘性分区器
// 使用粘性分区(推荐,用于高吞吐场景)
props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");

// 或者使用轮询
props.put("partitioner.class", "org.apache.kafka.clients.producer.RoundRobinPartitioner");

自定义分区器

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class UserIdPartitioner implements Partitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

if (keyBytes == null) {
// 没有 Key,使用随机分区
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
}

// 提取用户 ID
String keyStr = new String(keyBytes);

// VIP 用户固定到第一个分区(保证 VIP 消息优先处理)
if (keyStr.startsWith("VIP:")) {
return 0;
}

// 普通用户使用哈希分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
}

消息发送模式

1. 发送并忘记(Fire and Forget)

// 不等待结果,消息可能丢失
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>("topic", "message " + i));
}

2. 同步发送

// 同步等待结果
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(); // 阻塞

// 或者带超时
try {
metadata = future.get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// 处理超时
}

3. 异步发送(推荐)

// 带回调的异步发送
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 记录错误
exception.printStackTrace();
} else {
// 处理成功结果
System.out.println("Partition: " + metadata.partition() +
", Offset: " + metadata.offset());
}
});

// 批量异步发送
List<ProducerRecord<String, String>> records = createRecords();
Callback callback = (metadata, ex) -> {
if (ex != null) {
ex.printStackTrace();
}
};
records.forEach(r -> producer.send(r, callback));

批量发送优化

批量发送原理

┌─────────────────────────────────────────────────────────────┐
│ 批量发送机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ProducerRecord ──┐ │
│ ProducerRecord ──┼──> Buffer ──> Batch ──> Sender ──> Broker │
│ ProducerRecord ──┘ │ │
│ │ │
│ batch.size=16KB │
│ linger.ms=10 │
│ │
│ 发送条件(满足任一即发送): │
│ 1. 批次达到 batch.size │
│ 2. 等待时间达到 linger.ms │
│ 3. 缓冲区满 │
│ │
└─────────────────────────────────────────────────────────────┘

配置示例

// 高吞吐配置
Properties highThroughputProps = new Properties();
highThroughputProps.put("bootstrap.servers", "localhost:9092");
highThroughputProps.put("key.serializer", "StringSerializer");
highThroughputProps.put("value.serializer", "StringSerializer");

// 增大缓冲区
highThroughputProps.put("buffer.memory", 67108864); // 64MB

// 增大批量大小
highThroughputProps.put("batch.size", 32768); // 32KB

// 等待更长以积累更多消息
highThroughputProps.put("linger.ms", 50); // 50ms

// 使用压缩
highThroughputProps.put("compression.type", "lz4");

// 高可靠配置
Properties highReliabilityProps = new Properties();
highReliabilityProps.put("bootstrap.servers", "localhost:9092");
highReliabilityProps.put("key.serializer", "StringSerializer");
highReliabilityProps.put("value.serializer", "StringSerializer");

// 等待所有副本确认
highReliabilityProps.put("acks", "all");

// 启用幂等性
highReliabilityProps.put("enable.idempotence", "true");

// 设置幂等性相关参数
highReliabilityProps.put("max.in.flight.requests.per.connection", 5);
highReliabilityProps.put("retries", 3);

幂等性

什么是幂等性?

幂等性确保消息只会被精确一次处理,即使生产者重试也不会产生重复消息。

┌─────────────────────────────────────────────────────────────┐
│ 幂等性原理 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 无幂等性: │
│ Producer ──[Msg1]──> Broker ──[Msg1]──> 成功 │
│ │ │
│ └────[Msg1 (重试)]──> Broker ──[Msg1]──> 重复! │
│ │
│ 有幂等性: │
│ Producer ──[Msg1, PID=1, Seq=0]──> Broker │
│ │ │
│ └────[Msg1, PID=1, Seq=0]──> Broker ──[ACK]──> 去重 │
│ │
│ PID: Producer ID │
│ Seq: 消息序列号 │
│ │
└─────────────────────────────────────────────────────────────┘

启用幂等性

props.put("enable.idempotence", "true");

// 推荐配置(幂等性自动设置)
props.put("max.in.flight.requests.per.connection", 5);
props.put("retries", 3);
props.put("acks", "all");

幂等性条件

  • enable.idempotence = true
  • acks = all
  • retries > 0
  • max.in.flight.requests.per.connection <= 5

事务

什么是事务?

Kafka 事务确保一批消息要么全部成功,要么全部失败。

// 配置事务
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");
props.put("transactional.id", "my-transactional-id"); // 事务 ID
props.put("enable.idempotence", "true");

Producer<String, String> producer = new KafkaProducer<>(props);

// 初始化事务
producer.initTransactions();

try {
// 开始事务
producer.beginTransaction();

// 发送多条消息
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("topic1", "key", "value1-" + i));
producer.send(new ProducerRecord<>("topic2", "key", "value2-" + i));
}

// 提交事务
producer.commitTransaction();

} catch (Exception e) {
// 回滚事务
producer.abortTransaction();
e.printStackTrace();
}

使用场景

  • 多主题消息原子性写入
  • 消费者-生产者事务(exactly-once semantics)
  • 数据管道 Exactly-Once 处理

错误处理

可重试错误

// 配置重试
props.put("retries", 3);
props.put("retry.backoff.ms", 1000);

producer.send(record, (metadata, exception) -> {
// 判断是否可重试
if (exception instanceof RetriableException) {
// 可以重试,例如:Broker 不可用
System.out.println("Retriable error: " + exception.getMessage());
} else {
// 不可重试,例如:消息太大
System.out.println("Non-retriable error: " + exception.getMessage());
}
});

常见错误类型

错误类型说明是否可重试
NotEnoughReplicasException副本不足
LeaderNotAvailableExceptionLeader 不可用
OffsetOutOfRangeException偏移量越界
MessageSizeTooLargeException消息太大
UnknownTopicOrPartitionException主题/分区不存在

监控指标

关键指标

// 获取生产者指标
Map<MetricName, ? extends Metric> metrics = producer.metrics();

// 常用指标
double sendRate = metrics.get(new MetricName("record-send-rate", "producer-metrics")).value();
double requestLatency = metrics.get(new MetricName("request-latency-avg", "producer-metrics")).value();
long recordQueueTime = (long) metrics.get(new MetricName("record-queue-time-avg", "producer-metrics")).value();

监控项

指标说明
record-send-rate消息发送速率
request-latency-avg平均请求延迟
record-queue-time-avg平均排队时间
outgoing-byte-rate字节发送速率
error-rate错误率

最佳实践

  1. 使用异步发送:提高吞吐量
  2. 批量发送:配置 linger.msbatch.size
  3. 启用压缩:减少网络传输
  4. 启用幂等性:保证消息不重复
  5. 合理设置重试:处理临时错误
  6. 监控关键指标:及时发现问题

小结

  1. 生产者负责将消息发送到 Kafka 主题
  2. 同步发送适合需要确认的场景,异步发送适合高吞吐场景
  3. 批量发送压缩可以显著提高性能
  4. 幂等性确保消息不重复
  5. 事务保证多消息原子性

下一步

接下来让我们学习 消费者 的使用,了解如何从 Kafka 消费消息。