跳到主要内容

Kafka 主题与分区

主题(Topic)和分区(Partition)是 Kafka 最核心的概念,理解它们对于高效使用 Kafka 至关重要。

主题概述

什么是主题?

主题是 Kafka 中存储消息的逻辑容器,类似于文件系统中的文件夹。每条消息都属于某个主题,主题将相关的消息组织在一起。

从图中可以看出,生产者将消息发送到主题,消费者从主题订阅消息。主题可以有多个生产者和多个消费者,它们之间完全解耦。

主题的特性

  1. 多生产者:多个生产者可以向同一主题发送消息,互不干扰
  2. 多消费者:多个消费者组可以独立消费同一主题,互不影响
  3. 持久化:消息持久化到磁盘,可配置保留策略(按时间或大小)
  4. 日志分段:每个分区由多个日志段(Log Segment)组成,便于管理和清理

主题命名规范

主题命名需要遵循以下规则:

  • 只能包含字母、数字、下划线(_)、连字符(-)和点(.
  • 不能为空,长度限制为 249 个字符
  • 建议使用有意义的名称,如 user-eventsorder-payments
注意

避免使用 __consumer_offsets__transaction_state 等以双下划线开头的名称,这些是 Kafka 内部使用的主题。

分区(Partition)

为什么需要分区?

分区是 Kafka 实现并行处理和高可扩展性的关键:

  1. 并行消费:多个消费者可以并行消费不同分区,提高吞吐量
  2. 负载均衡:消息分布在多个分区和 Broker 上,实现负载均衡
  3. 高吞吐量:分区允许 Kafka 线性扩展,增加分区数可提高并发度
  4. 顺序保证:同一分区内的消息有序,这是 Kafka 重要的语义保证

分区的工作原理

当生产者发送消息时,Kafka 根据分区策略决定消息发送到哪个分区。

分区策略

1. 轮询策略(Round Robin)

当消息没有 Key 时,Kafka 使用轮询策略将消息均匀分布到各个分区:

// 消息没有 Key,默认使用轮询
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("orders", null, "Message " + i));
}
// 结果: Partition 0: 0,3,6,9; Partition 1: 1,4,7; Partition 2: 2,5,8

轮询策略的优点是消息均匀分布,最大化并行度;缺点是无法保证消息顺序。

2. 键哈希策略(Key Hashing)

当消息有 Key 时,Kafka 使用 Key 的哈希值决定分区:

// 消息带有 Key
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders", // 主题
"user_123", // Key - 同一用户的订单会在同一分区
"Order data"
);

// 计算分区: Math.abs(Utils.murmur2(keyBytes)) % numPartitions

键哈希策略保证相同 Key 的消息总是发送到同一分区,从而保证这些消息的顺序性。

使用场景
  • 需要保证同一 Key 的消息有序:使用 Key Hashing
  • 不需要保证顺序:可以不使用 Key 或使用 null
  • 需要完全均匀分布:使用轮询策略

3. 粘性分区器(Sticky Partitioner)

Kafka 2.4 引入了粘性分区器,它会尽可能将消息发送到同一个分区,直到该分区的批次已满:

// 使用粘性分区器
props.put("partitioner.class", "org.apache.kafka.clients.producer.UniformStickyPartitioner");

粘性分区器的优点是减少请求次数,提高吞吐量;适合高吞吐量场景。

4. 自定义分区器

可以实现自定义分区器来满足特殊需求:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题的所有分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

// 自定义分区逻辑
if (keyBytes == null) {
// 没有 Key,使用随机分区
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
}

// VIP 用户分配到固定分区
String keyString = new String(keyBytes);
if (keyString.startsWith("VIP:")) {
return 0; // VIP 用户固定到分区 0
}

// 其他用户使用 Key Hash
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
}

分区内消息顺序

Kafka 只保证同一分区内的消息顺序。这意味着:

在分区 0 中,消息按照 Msg1 → Msg2 → Msg3 的顺序被消费,这是 Kafka 的重要保证。但如果 Msg4 和 Msg1 发送到不同分区,它们的相对顺序是不确定的。

管理主题

创建主题

# 基本创建
kafka-topics.sh --create \
--topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1

# 完整参数创建
kafka-topics.sh --create \
--topic orders \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config max.message.bytes=1048576

参数说明

参数说明
--partitions分区数量,影响并行度和吞吐量
--replication-factor副本数量,影响可用性
--config主题级别配置,如保留时间、消息大小限制

查看主题

# 列出所有主题
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看主题详情
kafka-topics.sh --describe \
--topic orders \
--bootstrap-server localhost:9092

输出示例:

Topic: orders    TopicId: abc123...    PartitionCount: 3    ReplicationFactor: 1
Configs: retention.ms=604800000,max.message.bytes=1048576
Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Partition: 2 Leader: 1 Replicas: 1 Isr: 1

修改主题

# 增加分区(注意:分区只能增加,不能减少)
kafka-topics.sh --alter \
--topic orders \
--partitions 9 \
--bootstrap-server localhost:9092

# 修改配置
kafka-configs.sh --alter \
--entity-type topics \
--entity-name orders \
--add-config retention.ms=86400000 \
--bootstrap-server localhost:9092

# 删除配置
kafka-configs.sh --alter \
--entity-type topics \
--entity-name orders \
--delete-config retention.ms \
--bootstrap-server localhost:9092
注意

分区数量只能增加,不能减少。减少分区会导致数据丢失和消息路由混乱。如果必须减少分区,需要创建新主题并迁移数据。

删除主题

# 删除主题(需在 broker 配置中启用 delete.topic.enable=true)
kafka-topics.sh --delete \
--topic old-topic \
--bootstrap-server localhost:9092

分区策略设计

如何确定分区数量?

分区数量是 Kafka 最重要的配置之一,需要综合考虑以下因素:

考虑因素

  1. 吞吐量需求:目标吞吐量除以单分区吞吐量
  2. 消费者数量:分区数应大于等于消费者数(最佳等于消费者数)
  3. Broker 数量:分区数应该是 Broker 数的倍数,便于均匀分布
  4. 磁盘容量:分区数乘分区大小应小于等于磁盘容量

计算公式

分区数 = max(消费者数, 目标吞吐量 / 单分区吞吐量)

示例:

  • 目标吞吐量:100,000 msg/s
  • 单分区吞吐量:10,000 msg/s
  • 消费者数:8
分区数 = max(8, 100000/10000) = max(8, 10) = 10

分区数量的权衡

分区数量优点缺点
过少管理简单限制并行度,吞吐量受限
适中平衡性能和管理需要合理规划
过多高并行度增加 Zookeeper/KRaft 负载,恢复时间变长

建议:从较小的分区数开始,根据实际需求逐步调整。

分区分配策略

Kafka 自动分配分区到 Broker,遵循以下原则:

副本分配原则

  1. 副本分散在不同的 Broker,避免单点故障
  2. 分区 Leader 均衡分布,均衡读写负载
  3. 考虑机架感知(Rack Awareness),跨机架分布副本

消息保留策略

Kafka 提供灵活的消息保留策略,可以按时间或大小保留消息。

基于时间的保留

# 设置消息保留 7 天
kafka-configs.sh --alter \
--entity-type topics \
--entity-name orders \
--add-config retention.ms=604800000 \
--bootstrap-server localhost:9092

常用配置:

  • retention.ms:保留时间(毫秒),默认 7 天
  • retention.minutes:保留时间(分钟)
  • retention.hours:保留时间(小时),默认 168 小时(7 天)

基于大小的保留

# 设置主题最大大小 10GB
kafka-configs.sh --alter \
--entity-type topics \
--entity-name orders \
--add-config retention.bytes=10737418240 \
--bootstrap-server localhost:9092

常用配置:

  • retention.bytes:每个分区最大字节数,-1 表示不限制
  • retention.bytes 作用于分区级别,主题总大小 = 分区数 × retention.bytes

日志压缩策略

日志压缩保留每个 Key 的最新值,适合作为变更数据源:

# 启用日志压缩
kafka-configs.sh --alter \
--entity-type topics \
--entity-name user-status \
--add-config cleanup.policy=compact \
--bootstrap-server localhost:9092

压缩配置:

  • cleanup.policy=compact:启用压缩
  • min.cleanable.dirty.ratio:触发压缩的脏数据比例
  • delete.retention.ms:标记删除后保留时间

保留策略对比

策略说明适用场景
delete基于时间/大小删除普通日志、消息队列
compact保留每个 Key 最新值变更数据源(CDC)、配置存储、用户状态
compact,delete组合策略既需要压缩也需要过期删除

日志段管理

日志段结构

每个分区由多个日志段(Log Segment)组成:

Partition: orders-0/
├── 00000000000000000000.log # 活跃段,正在写入
├── 00000000000000000000.index # 偏移量索引
├── 00000000000000000000.timeindex # 时间戳索引
├── 00000000000000001000.log # 已完成的段
├── 00000000000000001000.index
├── 00000000000000001000.timeindex
└── ...

文件说明

  • .log:实际消息数据,顺序追加写入
  • .index:稀疏索引,加速偏移量查找
  • .timeindex:时间戳索引,支持按时间查找

日志段配置

# 日志段文件大小(默认 1GB)
log.segment.bytes=1073741824

# 日志段文件检查间隔(默认 5 分钟)
log.segment.check.interval.ms=300000

# 基于时间的日志段滚动(默认 7 天)
log.roll.hours=168

# 日志压缩相关
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.delete.retention.ms=86400000

索引机制

Kafka 使用稀疏索引提高查找效率:

  • 索引不会为每条消息建立条目,而是每隔一定字节数建立索引项
  • 查找时,先通过索引定位到大致位置,然后顺序扫描
  • 这种设计在空间和查询效率之间取得了平衡

最佳实践

1. 合理规划分区数

  • 不要过多:每个分区占用内存和文件句柄,过多会影响性能
  • 不要过少:分区少会影响并行度和吞吐量
  • 建议:从较小的分区数开始,根据需求调整

2. 选择合适的 Key

// 需要消息有序:使用有业务意义的 Key
producer.send(new ProducerRecord<>("orders", userId, orderData));

// 不需要有序:不使用 Key 或使用随机 Key
producer.send(new ProducerRecord<>("logs", null, logData));

3. 监控分区平衡

# 检查分区分布
kafka-topics.sh --describe \
--topic orders \
--bootstrap-server localhost:9092

# 如果 Leader 分布不均,执行首选副本选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--all-topics \
--election-type preferred

4. 使用压缩

# 启用生产者压缩
kafka-configs.sh --alter \
--entity-type topics \
--entity-name large-topic \
--add-config compression.type=producer \
--bootstrap-server localhost:9092

5. 预估分区大小

单个分区大小 = 日均消息量 × 消息平均大小 × 保留天数 / 分区数

确保单个分区大小在合理范围内(一般不超过 50GB),便于故障恢复。

Java API 操作示例

创建主题

import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.*;

public class TopicManagement {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (AdminClient admin = AdminClient.create(props)) {

// 创建主题配置
NewTopic topic = new NewTopic("orders", 6, (short) 1);
topic.configs(Map.of(
"retention.ms", "604800000",
"compression.type", "producer"
));

// 创建主题
CreateTopicsResult result = admin.createTopics(Collections.singleton(topic));

// 等待创建完成
result.all().get(30, TimeUnit.SECONDS);
System.out.println("Topic created successfully");
}
}
}

列出所有主题

try (AdminClient admin = AdminClient.create(props)) {
ListTopicsResult result = admin.listTopics();
Set<String> names = result.names().get();
System.out.println("Topics: " + names);
}

查看主题详情

try (AdminClient admin = AdminClient.create(props)) {
DescribeTopicsResult result = admin.describeTopics(Collections.singleton("orders"));
Map<String, TopicDescription> descriptions = result.all().get();

for (Map.Entry<String, TopicDescription> entry : descriptions.entrySet()) {
System.out.println("Topic: " + entry.getKey());
for (TopicPartitionInfo partition : entry.getValue().partitions()) {
System.out.println(" Partition: " + partition.partition());
System.out.println(" Leader: " + partition.leader());
System.out.println(" Replicas: " + partition.replicas());
System.out.println(" ISR: " + partition.isr());
}
}
}

增加分区

try (AdminClient admin = AdminClient.create(props)) {
Map<String, NewPartitions> newPartitions = Map.of(
"orders", NewPartitions.increaseTo(9)
);

CreatePartitionsResult result = admin.createPartitions(newPartitions);
result.all().get(30, TimeUnit.SECONDS);
System.out.println("Partitions increased successfully");
}

小结

  1. 主题是 Kafka 存储消息的逻辑容器,类似文件系统的文件夹
  2. 分区是实现并行处理和高可用的关键,同一分区内消息有序
  3. 分区策略影响消息顺序和负载均衡,需根据业务选择
  4. 合理规划分区数量很重要,影响吞吐量和运维复杂度
  5. 消息保留策略需要根据业务需求选择,支持按时间、大小或压缩

下一步

现在让我们学习 生产者 的使用,了解如何向 Kafka 发送消息。