Kafka 主题与分区
主题(Topic)和分区(Partition)是 Kafka 最核心的概念,理解它们对于高效使用 Kafka 至关重要。
主题概述
什么是主题?
主题是 Kafka 中存储消息的逻辑容器,类似于文件系统中的文件夹。每条消息都属于某个主题,主题将相关的消息组织在一起。
从图中可以看出,生产者将消息发送到主题,消费者从主题订阅消息。主题可以有多个生产者和多个消费者,它们之间完全解耦。
主题的特性
- 多生产者:多个生产者可以向同一主题发送消息,互不干扰
- 多消费者:多个消费者组可以独立消费同一主题,互不影响
- 持久化:消息持久化到磁盘,可配置保留策略(按时间或大小)
- 日志分段:每个分区由多个日志段(Log Segment)组成,便于管理和清理
主题命名规范
主题命名需要遵循以下规则:
- 只能包含字母、数字、下划线(
_)、连字符(-)和点(.) - 不能为空,长度限制为 249 个字符
- 建议使用有意义的名称,如
user-events、order-payments
避免使用 __consumer_offsets、__transaction_state 等以双下划线开头的名称,这些是 Kafka 内部使用的主题。
分区(Partition)
为什么需要分区?
分区是 Kafka 实现并行处理和高可扩展性的关键:
- 并行消费:多个消费者可以并行消费不同分区,提高吞吐量
- 负载均衡:消息分布在多个分区和 Broker 上,实现负载均衡
- 高吞吐量:分区允许 Kafka 线性扩展,增加分区数可提高并发度
- 顺序保证:同一分区内的消息有序,这是 Kafka 重要的语义保证
分区的工作原理
当生产者发送消息时,Kafka 根据分区策略决定消息发送到哪个分区。
分区策略
1. 轮询策略(Round Robin)
当消息没有 Key 时,Kafka 使用轮询策略将消息均匀分布到各个分区:
// 消息没有 Key,默认使用轮询
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("orders", null, "Message " + i));
}
// 结果: Partition 0: 0,3,6,9; Partition 1: 1,4,7; Partition 2: 2,5,8
轮询策略的优点是消息均匀分布,最大化并行度;缺点是无法保证消息顺序。
2. 键哈希策略(Key Hashing)
当消息有 Key 时,Kafka 使用 Key 的哈希值决定分区:
// 消息带有 Key
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders", // 主题
"user_123", // Key - 同一用户的订单会在同一分区
"Order data"
);
// 计算分区: Math.abs(Utils.murmur2(keyBytes)) % numPartitions
键哈希策略保证相同 Key 的消息总是发送到同一分区,从而保证这些消息的顺序性。
- 需要保证同一 Key 的消息有序:使用 Key Hashing
- 不需要保证顺序:可以不使用 Key 或使用
null - 需要完全均匀分布:使用轮询策略
3. 粘性分区器(Sticky Partitioner)
Kafka 2.4 引入了粘性分区器,它会尽可能将消息发送到同一个分区,直到该分区的批次已满:
// 使用粘性分区器
props.put("partitioner.class", "org.apache.kafka.clients.producer.UniformStickyPartitioner");
粘性分区器的优点是减少请求次数,提高吞吐量;适合高吞吐量场景。
4. 自定义分区器
可以实现自定义分区器来满足特殊需求:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题的所有分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 自定义分区逻辑
if (keyBytes == null) {
// 没有 Key,使用随机分区
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
}
// VIP 用户分配到固定分区
String keyString = new String(keyBytes);
if (keyString.startsWith("VIP:")) {
return 0; // VIP 用户固定到分区 0
}
// 其他用户使用 Key Hash
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
分区内消息顺序
Kafka 只保证同一分区内的消息顺序。这意味着:
在分区 0 中,消息按照 Msg1 → Msg2 → Msg3 的顺序被消费,这是 Kafka 的重要保证。但如果 Msg4 和 Msg1 发送到不同分区,它们的相对顺序是不确定的。
管理主题
创建主题
# 基本创建
kafka-topics.sh --create \
--topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# 完整参数创建
kafka-topics.sh --create \
--topic orders \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config max.message.bytes=1048576
参数说明:
| 参数 | 说明 |
|---|---|
--partitions | 分区数量,影响并行度和吞吐量 |
--replication-factor | 副本数量,影响可用性 |
--config | 主题级别配置,如保留时间、消息大小限制 |
查看主题
# 列出所有主题
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看主题详情
kafka-topics.sh --describe \
--topic orders \
--bootstrap-server localhost:9092
输出示例:
Topic: orders TopicId: abc123... PartitionCount: 3 ReplicationFactor: 1
Configs: retention.ms=604800000,max.message.bytes=1048576
Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Partition: 2 Leader: 1 Replicas: 1 Isr: 1
修改主题
# 增加分区(注意:分区只能增加,不能减少)
kafka-topics.sh --alter \
--topic orders \
--partitions 9 \
--bootstrap-server localhost:9092
# 修改配置
kafka-configs.sh --alter \
--entity-type topics \
--entity-name orders \
--add-config retention.ms=86400000 \
--bootstrap-server localhost:9092
# 删除配置
kafka-configs.sh --alter \
--entity-type topics \
--entity-name orders \
--delete-config retention.ms \
--bootstrap-server localhost:9092
分区数量只能增加,不能减少。减少分区会导致数据丢失和消息路由混乱。如果必须减少分区,需要创建新主题并迁移数据。
删除主题
# 删除主题(需在 broker 配置中启用 delete.topic.enable=true)
kafka-topics.sh --delete \
--topic old-topic \
--bootstrap-server localhost:9092
分区策略设计
如何确定分区数量?
分区数量是 Kafka 最重要的配置之一,需要综合考虑以下因素:
考虑因素
- 吞吐量需求:目标吞吐量除以单分区吞吐量
- 消费者数量:分区数应大于等于消费者数(最佳等于消费者数)
- Broker 数量:分区数应该是 Broker 数的倍数,便于均匀分布
- 磁盘容量:分区数乘分区大小应小于等于磁盘容量
计算公式
分区数 = max(消费者数, 目标吞吐量 / 单分区吞吐量)
示例:
- 目标吞吐量:100,000 msg/s
- 单分区吞吐量:10,000 msg/s
- 消费者数:8
分区数 = max(8, 100000/10000) = max(8, 10) = 10
分区数量的权衡
| 分区数量 | 优点 | 缺点 |
|---|---|---|
| 过少 | 管理简单 | 限制并行度,吞吐量受限 |
| 适中 | 平衡性能和管理 | 需要合理规划 |
| 过多 | 高并行度 | 增加 Zookeeper/KRaft 负载,恢复时间变长 |
建议:从较小的分区数开始,根据实际需求逐步调整。
分区分配策略
Kafka 自动分配分区到 Broker,遵循以下原则:
副本分配原则:
- 副本分散在不同的 Broker,避免单点故障
- 分区 Leader 均衡分布,均衡读写负载
- 考虑机架感知(Rack Awareness),跨机架分布副本
消息保留策略
Kafka 提供灵活的消息保留策略,可以按时间或大小保留消息。
基于时间的保留
# 设置消息保留 7 天
kafka-configs.sh --alter \
--entity-type topics \
--entity-name orders \
--add-config retention.ms=604800000 \
--bootstrap-server localhost:9092
常用配置:
retention.ms:保留时间(毫秒),默认 7 天retention.minutes:保留时间(分钟)retention.hours:保留时间(小时),默认 168 小时(7 天)
基于大小的保留
# 设置主题最大大小 10GB
kafka-configs.sh --alter \
--entity-type topics \
--entity-name orders \
--add-config retention.bytes=10737418240 \
--bootstrap-server localhost:9092
常用配置:
retention.bytes:每个分区最大字节数,-1 表示不限制retention.bytes作用于分区级别,主题总大小 = 分区数 × retention.bytes
日志压缩策略
日志压缩保留每个 Key 的最新值,适合作为变更数据源:
# 启用日志压缩
kafka-configs.sh --alter \
--entity-type topics \
--entity-name user-status \
--add-config cleanup.policy=compact \
--bootstrap-server localhost:9092
压缩配置:
cleanup.policy=compact:启用压缩min.cleanable.dirty.ratio:触发压缩的脏数据比例delete.retention.ms:标记删除后保留时间
保留策略对比
| 策略 | 说明 | 适用场景 |
|---|---|---|
delete | 基于时间/大小删除 | 普通日志、消息队列 |
compact | 保留每个 Key 最新值 | 变更数据源(CDC)、配置存储、用户状态 |
compact,delete | 组合策略 | 既需要压缩也需要过期删除 |
日志段管理
日志段结构
每个分区由多个日志段(Log Segment)组成:
Partition: orders-0/
├── 00000000000000000000.log # 活跃段,正在写入
├── 00000000000000000000.index # 偏移量索引
├── 00000000000000000000.timeindex # 时间戳索引
├── 00000000000000001000.log # 已完成的段
├── 00000000000000001000.index
├── 00000000000000001000.timeindex
└── ...
文件说明:
.log:实际消息数据,顺序追加写入.index:稀疏索引,加速偏移量查找.timeindex:时间戳索引,支持按时间查找
日志段配置
# 日志段文件大小(默认 1GB)
log.segment.bytes=1073741824
# 日志段文件检查间隔(默认 5 分钟)
log.segment.check.interval.ms=300000
# 基于时间的日志段滚动(默认 7 天)
log.roll.hours=168
# 日志压缩相关
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.delete.retention.ms=86400000
索引机制
Kafka 使用稀疏索引提高查找效率:
- 索引不会为每条消息建立条目,而是每隔一定字节数建立索引项
- 查找时,先通过索引定位到大致位置,然后顺序扫描
- 这种设计在空间和查询效率之间取得了平衡
最佳实践
1. 合理规划分区数
- 不要过多:每个分区占用内存和文件句柄,过多会影响性能
- 不要过少:分区少会影响并行度和吞吐量
- 建议:从较小的分区数开始,根据需求调整
2. 选择合适的 Key
// 需要消息有序:使用有业务意义的 Key
producer.send(new ProducerRecord<>("orders", userId, orderData));
// 不需要有序:不使用 Key 或使用随机 Key
producer.send(new ProducerRecord<>("logs", null, logData));
3. 监控分区平衡
# 检查分区分布
kafka-topics.sh --describe \
--topic orders \
--bootstrap-server localhost:9092
# 如果 Leader 分布不均,执行首选副本选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--all-topics \
--election-type preferred
4. 使用压缩
# 启用生产者压缩
kafka-configs.sh --alter \
--entity-type topics \
--entity-name large-topic \
--add-config compression.type=producer \
--bootstrap-server localhost:9092
5. 预估分区大小
单个分区大小 = 日均消息量 × 消息平均大小 × 保留天数 / 分区数
确保单个分区大小在合理范围内(一般不超过 50GB),便于故障恢复。
Java API 操作示例
创建主题
import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.*;
public class TopicManagement {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) {
// 创建主题配置
NewTopic topic = new NewTopic("orders", 6, (short) 1);
topic.configs(Map.of(
"retention.ms", "604800000",
"compression.type", "producer"
));
// 创建主题
CreateTopicsResult result = admin.createTopics(Collections.singleton(topic));
// 等待创建完成
result.all().get(30, TimeUnit.SECONDS);
System.out.println("Topic created successfully");
}
}
}
列出所有主题
try (AdminClient admin = AdminClient.create(props)) {
ListTopicsResult result = admin.listTopics();
Set<String> names = result.names().get();
System.out.println("Topics: " + names);
}
查看主题详情
try (AdminClient admin = AdminClient.create(props)) {
DescribeTopicsResult result = admin.describeTopics(Collections.singleton("orders"));
Map<String, TopicDescription> descriptions = result.all().get();
for (Map.Entry<String, TopicDescription> entry : descriptions.entrySet()) {
System.out.println("Topic: " + entry.getKey());
for (TopicPartitionInfo partition : entry.getValue().partitions()) {
System.out.println(" Partition: " + partition.partition());
System.out.println(" Leader: " + partition.leader());
System.out.println(" Replicas: " + partition.replicas());
System.out.println(" ISR: " + partition.isr());
}
}
}
增加分区
try (AdminClient admin = AdminClient.create(props)) {
Map<String, NewPartitions> newPartitions = Map.of(
"orders", NewPartitions.increaseTo(9)
);
CreatePartitionsResult result = admin.createPartitions(newPartitions);
result.all().get(30, TimeUnit.SECONDS);
System.out.println("Partitions increased successfully");
}
小结
- 主题是 Kafka 存储消息的逻辑容器,类似文件系统的文件夹
- 分区是实现并行处理和高可用的关键,同一分区内消息有序
- 分区策略影响消息顺序和负载均衡,需根据业务选择
- 合理规划分区数量很重要,影响吞吐量和运维复杂度
- 消息保留策略需要根据业务需求选择,支持按时间、大小或压缩
下一步
现在让我们学习 生产者 的使用,了解如何向 Kafka 发送消息。