Kafka 主题与分区
主题(Topic)和分区(Partition)是 Kafka 最核心的概念,理解它们对于高效使用 Kafka 至关重要。
主题概述
什么是主题?
主题是 Kafka 中存储消息的逻辑容器,类似于文件系统中的文件夹。每条消息都属于某个主题。
┌─────────────────────────────────────────────────────────────┐
│ Kafka 主题结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Cluster │ │
│ │ │ │
│ │ Topic: orders │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Partition 0 │ │ │
│ │ │ [0] Order #001 │ │ │
│ │ │ [3] Order #002 │ │ │
│ │ │ [6] Order #003 │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Partition 1 │ │ │
│ │ │ [1] Order #004 │ │ │
│ │ │ [4] Order #005 │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Partition 2 │ │ │
│ │ │ [2] Order #006 │ │ │
│ │ │ [5] Order #007 │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
主题的特性
- 多生产者:多个生产者可以向同一主题发送消息
- 多消费者:多个消费者组可以独立消费同一主题
- 持久化:消息持久化到磁盘,可配置保留策略
- 日志分段:每个分区由多个日志段(Log Segment)组成
分区(Partition)
为什么需要分区?
分区是 Kafka 实现并行处理和高可扩展性的关键:
- 并行消费:多个消费者可以并行消费不同分区
- 负载均衡:消息分布在多个分区,实现负载均衡
- 高吞吐量:分区允许 Kafka 线性扩展吞吐量
分区的工作原理
┌─────────────────────────────────────────────────────────────�
│ 分区消息分布机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer发送消息时根据分区策略决定消息发送到哪个分区: │
│ │
│ ┌─────────────┐ │
│ │ Message │ │
│ │ Key=null │────> 轮询策略 Round Robin │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ │
│ │ Message │ │
│ │ Key=user1 │────> Hash(key) % numPartitions │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
分区策略
1. 轮询策略(Round Robin)
当消息没有 Key 时,Kafka 使用轮询策略将消息均匀分布到各个分区:
// 消息没有 Key,默认使用轮询
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("orders", null, "Message " + i));
}
// 结果: Partition 0: 0,3,6; Partition 1: 1,4,7; Partition 2: 2,5,8
2. 键哈希策略(Key Hashing)
当消息有 Key 时,Kafka 使用 Key 的哈希值决定分区:
// 消息带有 Key
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders", // 主题
"user_123", // Key - 同一用户的订单会在同一分区
"Order data"
);
// 计算分区: Math.abs(Utils.toPositive(Utils.murmur2(keyBytes))) % numPartitions
使用场景
- 需要保证同一 Key 的消息有序:使用 Key Hashing
- 不需要保证顺序:可以不使用 Key 或使用 null
3. 自定义分区策略
可以实现自定义分区器:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题的所有分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 自定义分区逻辑
if (keyBytes == null) {
// 没有 Key,使用轮询
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
}
// VIP 用户分配到固定分区
String keyString = new String(keyBytes);
if (keyString.startsWith("VIP:")) {
return 0; // VIP 用户固定到分区 0
}
// 其他用户使用 Key Hash
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
管理主题
创建主题
# 基本创建
kafka-topics.sh --create \
--topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# 完整参数创建
kafka-topics.sh --create \
--topic orders \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config max.message.bytes=1048576
查看主题
# 列出所有主题
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看主题详情
kafka-topics.sh --describe \
--topic orders \
--bootstrap-server localhost:9092
输出示例:
Topic: orders TopicId: abc123... PartitionCount: 3 ReplicationFactor: 1
Configs: retention.ms=604800000,max.message.bytes=1048576
Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Partition: 2 Leader: 1 Replicas: 1 Isr: 1
修改主题
# 增加分区(注意:分区只能增加,不能减少)
kafka-topics.sh --alter \
--topic orders \
--partitions 9 \
--bootstrap-server localhost:9092
# 修改配置
kafka-topics.sh --alter \
--topic orders \
--config retention.ms=86400000 \
--bootstrap-server localhost:9092
# 删除配置
kafka-topics.sh --alter \
--topic orders \
--delete-config retention.ms \
--bootstrap-server localhost:9092
删除主题
# 删除主题(需启用删除功能)
kafka-topics.sh --delete \
--topic old-topic \
--bootstrap-server localhost:9092
分区策略设计
如何确定分区数量?
分区数量是 Kafka 最重要的配置之一,需要综合考虑:
考虑因素
- 吞吐量需求:目标吞吐量除以单分区吞吐量
- 消费者数量:分区数大于等于消费者数(最佳等于消费者数)
- Broker 数量:分区数应该是 Broker 数的倍数
- 磁盘容量:分区数乘分区大小小于等于磁盘容量
计算公式
分区数 = max(消费者数, 目标吞吐量 / 单分区吞吐量)
示例:
- 目标吞吐量:100,000 msg/s
- 单分区吞吐量:10,000 msg/s
- 消费者数:8
分区数 = max(8, 100000/10000) = max(8, 10) = 10
分区分配策略
┌─────────────────────────────────────────────────────────────┐
│ 分区分配策略示意图 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Broker 1 Broker 2 Broker 3 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ P0, P3 │ │ P1, P4 │ │ P2, P5 │ │
│ │ (Leader)│ │ (Leader)│ │ (Leader)│ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 副本分配原则: │
│ 1. 副本分散在不同的 Broker │
│ 2. 分区 Leader 均衡分布 │
│ 3. 考虑机架感知(rack awareness) │
│ │
└─────────────────────────────────────────────────────────────┘
消息保留策略
基于时间
# 设置消息保留 7 天
kafka-topics.sh --alter \
--topic orders \
--config retention.ms=604800000 \
--bootstrap-server localhost:9092
基于大小
# 设置主题最大大小 10GB
kafka-topics.sh --alter \
--topic orders \
--config retention.bytes=10737418240 \
--bootstrap-server localhost:9092
压缩策略
# 启用日志压缩(只保留每个 Key 的最新值)
kafka-topics.sh --alter \
--topic user-status \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--bootstrap-server localhost:9092
保留策略对比
| 策略 | 说明 | 适用场景 |
|---|---|---|
delete | 基于时间/大小删除 | 普通日志、消息队列 |
compact | 保留每个 Key 最新值 | 变更数据源(CDC)、配置存储 |
日志段管理
日志段结构
┌─────────────────────────────────────────────────────────────┐
│ 分区日志段结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Partition: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Log Segment 0 (active) │ │
│ │ ├── 00000000000000000000.index │ │
│ │ ├── 00000000000000000000.log │ │
│ │ └── 00000000000000000000.timeindex │ │
│ └─────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Log Segment 1 │ │
│ │ ├── 00000000000000001000.index │ │
│ │ ├── 00000000000000001000.log │ │
│ │ └── 00000000000000001000.timeindex │ │
│ └─────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Log Segment 2 │ │
│ │ ├── 00000000000000002000.index │ │
│ │ ├── 00000000000000002000.log │ │
│ │ └── 00000000000000002000.timeindex │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 文件说明: │
│ - .log: 实际消息数据 │
│ - .index: 消息偏移量索引 │
│ - .timeindex: 时间戳索引 │
│ │
└─────────────────────────────────────────────────────────────┘
日志段配置
# 日志段文件大小(默认 1GB)
log.segment.bytes=1073741824
# 日志段文件检查间隔(默认 5 分钟)
log.segment.check.interval.ms=300000
# 日志段文件最小年龄(用于压缩, 默认 1 分钟)
log.segment.delete.delay.ms=60000
最佳实践
1. 合理规划分区数
- 不要过多:每个分区占用资源,过多会影响性能
- 不要过少:分区少会影响并行度和吞吐量
- 建议:从较小的分区数开始,根据需求调整
2. 选择合适的 Key
- 需要消息有序:使用有业务意义的 Key
- 不需要有序:不使用 Key 或使用随机 Key
3. 监控分区平衡
# 检查分区平衡状态
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--partition 0 \
--topic orders \
--election-type preferred
4. 使用压缩
# 启用压缩
kafka-topics.sh --alter \
--topic large-topic \
--config compression.type=producer \
--bootstrap-server localhost:9092
Java API 操作示例
创建主题
import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.*;
public class TopicManagement {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) {
// 创建主题配置
NewTopic topic = new NewTopic("orders", 6, (short) 1);
topic.configs(Map.of(
"retention.ms", "604800000",
"compression.type", "producer"
));
// 创建主题
CreateTopicsResult result = admin.createTopics(Collections.singleton(topic));
// 等待创建完成
result.all().get(30, TimeUnit.SECONDS);
System.out.println("Topic created successfully");
}
}
}
列出所有主题
try (AdminClient admin = AdminClient.create(props)) {
ListTopicsResult result = admin.listTopics();
Set<String> names = result.names().get();
System.out.println("Topics: " + names);
}
查看主题详情
try (AdminClient admin = AdminClient.create(props)) {
DescribeTopicsResult result = admin.describeTopics(Collections.singleton("orders"));
Map<String, TopicDescription> descriptions = result.all().get();
for (Map.Entry<String, TopicDescription> entry : descriptions.entrySet()) {
System.out.println("Topic: " + entry.getKey());
for (PartitionInfo partition : entry.getValue().partitions()) {
System.out.println(" Partition: " + partition.partition());
System.out.println(" Leader: " + partition.leader());
System.out.println(" Replicas: " + Arrays.toString(partition.replicas()));
}
}
}
小结
- 主题是 Kafka 存储消息的逻辑容器
- 分区是实现并行处理和高可用的关键
- 分区策略影响消息顺序和负载均衡
- 合理规划分区数量很重要
- 消息保留策略需要根据业务需求选择
下一步
现在让我们学习 生产者 的使用,了解如何向 Kafka 发送消息。