跳到主要内容

Kafka 主题与分区

主题(Topic)和分区(Partition)是 Kafka 最核心的概念,理解它们对于高效使用 Kafka 至关重要。

主题概述

什么是主题?

主题是 Kafka 中存储消息的逻辑容器,类似于文件系统中的文件夹。每条消息都属于某个主题。

┌─────────────────────────────────────────────────────────────┐
│ Kafka 主题结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Cluster │ │
│ │ │ │
│ │ Topic: orders │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Partition 0 │ │ │
│ │ │ [0] Order #001 │ │ │
│ │ │ [3] Order #002 │ │ │
│ │ │ [6] Order #003 │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Partition 1 │ │ │
│ │ │ [1] Order #004 │ │ │
│ │ │ [4] Order #005 │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Partition 2 │ │ │
│ │ │ [2] Order #006 │ │ │
│ │ │ [5] Order #007 │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

主题的特性

  1. 多生产者:多个生产者可以向同一主题发送消息
  2. 多消费者:多个消费者组可以独立消费同一主题
  3. 持久化:消息持久化到磁盘,可配置保留策略
  4. 日志分段:每个分区由多个日志段(Log Segment)组成

分区(Partition)

为什么需要分区?

分区是 Kafka 实现并行处理和高可扩展性的关键:

  1. 并行消费:多个消费者可以并行消费不同分区
  2. 负载均衡:消息分布在多个分区,实现负载均衡
  3. 高吞吐量:分区允许 Kafka 线性扩展吞吐量

分区的工作原理

┌─────────────────────────────────────────────────────────────�
│ 分区消息分布机制 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer发送消息时根据分区策略决定消息发送到哪个分区: │
│ │
│ ┌─────────────┐ │
│ │ Message │ │
│ │ Key=null │────> 轮询策略 Round Robin │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ │
│ │ Message │ │
│ │ Key=user1 │────> Hash(key) % numPartitions │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

分区策略

1. 轮询策略(Round Robin)

当消息没有 Key 时,Kafka 使用轮询策略将消息均匀分布到各个分区:

// 消息没有 Key,默认使用轮询
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("orders", null, "Message " + i));
}
// 结果: Partition 0: 0,3,6; Partition 1: 1,4,7; Partition 2: 2,5,8

2. 键哈希策略(Key Hashing)

当消息有 Key 时,Kafka 使用 Key 的哈希值决定分区:

// 消息带有 Key
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders", // 主题
"user_123", // Key - 同一用户的订单会在同一分区
"Order data"
);

// 计算分区: Math.abs(Utils.toPositive(Utils.murmur2(keyBytes))) % numPartitions
使用场景
  • 需要保证同一 Key 的消息有序:使用 Key Hashing
  • 不需要保证顺序:可以不使用 Key 或使用 null

3. 自定义分区策略

可以实现自定义分区器:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题的所有分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

// 自定义分区逻辑
if (keyBytes == null) {
// 没有 Key,使用轮询
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
}

// VIP 用户分配到固定分区
String keyString = new String(keyBytes);
if (keyString.startsWith("VIP:")) {
return 0; // VIP 用户固定到分区 0
}

// 其他用户使用 Key Hash
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
}

管理主题

创建主题

# 基本创建
kafka-topics.sh --create \
--topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1

# 完整参数创建
kafka-topics.sh --create \
--topic orders \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config max.message.bytes=1048576

查看主题

# 列出所有主题
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看主题详情
kafka-topics.sh --describe \
--topic orders \
--bootstrap-server localhost:9092

输出示例:

Topic: orders    TopicId: abc123...    PartitionCount: 3    ReplicationFactor: 1
Configs: retention.ms=604800000,max.message.bytes=1048576
Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Partition: 2 Leader: 1 Replicas: 1 Isr: 1

修改主题

# 增加分区(注意:分区只能增加,不能减少)
kafka-topics.sh --alter \
--topic orders \
--partitions 9 \
--bootstrap-server localhost:9092

# 修改配置
kafka-topics.sh --alter \
--topic orders \
--config retention.ms=86400000 \
--bootstrap-server localhost:9092

# 删除配置
kafka-topics.sh --alter \
--topic orders \
--delete-config retention.ms \
--bootstrap-server localhost:9092

删除主题

# 删除主题(需启用删除功能)
kafka-topics.sh --delete \
--topic old-topic \
--bootstrap-server localhost:9092

分区策略设计

如何确定分区数量?

分区数量是 Kafka 最重要的配置之一,需要综合考虑:

考虑因素

  1. 吞吐量需求:目标吞吐量除以单分区吞吐量
  2. 消费者数量:分区数大于等于消费者数(最佳等于消费者数)
  3. Broker 数量:分区数应该是 Broker 数的倍数
  4. 磁盘容量:分区数乘分区大小小于等于磁盘容量

计算公式

分区数 = max(消费者数, 目标吞吐量 / 单分区吞吐量)

示例:

  • 目标吞吐量:100,000 msg/s
  • 单分区吞吐量:10,000 msg/s
  • 消费者数:8
分区数 = max(8, 100000/10000) = max(8, 10) = 10

分区分配策略

┌─────────────────────────────────────────────────────────────┐
│ 分区分配策略示意图 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Broker 1 Broker 2 Broker 3 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ P0, P3 │ │ P1, P4 │ │ P2, P5 │ │
│ │ (Leader)│ │ (Leader)│ │ (Leader)│ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 副本分配原则: │
│ 1. 副本分散在不同的 Broker │
│ 2. 分区 Leader 均衡分布 │
│ 3. 考虑机架感知(rack awareness) │
│ │
└─────────────────────────────────────────────────────────────┘

消息保留策略

基于时间

# 设置消息保留 7 天
kafka-topics.sh --alter \
--topic orders \
--config retention.ms=604800000 \
--bootstrap-server localhost:9092

基于大小

# 设置主题最大大小 10GB
kafka-topics.sh --alter \
--topic orders \
--config retention.bytes=10737418240 \
--bootstrap-server localhost:9092

压缩策略

# 启用日志压缩(只保留每个 Key 的最新值)
kafka-topics.sh --alter \
--topic user-status \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--bootstrap-server localhost:9092

保留策略对比

策略说明适用场景
delete基于时间/大小删除普通日志、消息队列
compact保留每个 Key 最新值变更数据源(CDC)、配置存储

日志段管理

日志段结构

┌─────────────────────────────────────────────────────────────┐
│ 分区日志段结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Partition: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Log Segment 0 (active) │ │
│ │ ├── 00000000000000000000.index │ │
│ │ ├── 00000000000000000000.log │ │
│ │ └── 00000000000000000000.timeindex │ │
│ └─────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Log Segment 1 │ │
│ │ ├── 00000000000000001000.index │ │
│ │ ├── 00000000000000001000.log │ │
│ │ └── 00000000000000001000.timeindex │ │
│ └─────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Log Segment 2 │ │
│ │ ├── 00000000000000002000.index │ │
│ │ ├── 00000000000000002000.log │ │
│ │ └── 00000000000000002000.timeindex │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 文件说明: │
│ - .log: 实际消息数据 │
│ - .index: 消息偏移量索引 │
│ - .timeindex: 时间戳索引 │
│ │
└─────────────────────────────────────────────────────────────┘

日志段配置

# 日志段文件大小(默认 1GB)
log.segment.bytes=1073741824

# 日志段文件检查间隔(默认 5 分钟)
log.segment.check.interval.ms=300000

# 日志段文件最小年龄(用于压缩, 默认 1 分钟)
log.segment.delete.delay.ms=60000

最佳实践

1. 合理规划分区数

  • 不要过多:每个分区占用资源,过多会影响性能
  • 不要过少:分区少会影响并行度和吞吐量
  • 建议:从较小的分区数开始,根据需求调整

2. 选择合适的 Key

  • 需要消息有序:使用有业务意义的 Key
  • 不需要有序:不使用 Key 或使用随机 Key

3. 监控分区平衡

# 检查分区平衡状态
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--partition 0 \
--topic orders \
--election-type preferred

4. 使用压缩

# 启用压缩
kafka-topics.sh --alter \
--topic large-topic \
--config compression.type=producer \
--bootstrap-server localhost:9092

Java API 操作示例

创建主题

import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.*;

public class TopicManagement {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (AdminClient admin = AdminClient.create(props)) {

// 创建主题配置
NewTopic topic = new NewTopic("orders", 6, (short) 1);
topic.configs(Map.of(
"retention.ms", "604800000",
"compression.type", "producer"
));

// 创建主题
CreateTopicsResult result = admin.createTopics(Collections.singleton(topic));

// 等待创建完成
result.all().get(30, TimeUnit.SECONDS);
System.out.println("Topic created successfully");
}
}
}

列出所有主题

try (AdminClient admin = AdminClient.create(props)) {
ListTopicsResult result = admin.listTopics();
Set<String> names = result.names().get();
System.out.println("Topics: " + names);
}

查看主题详情

try (AdminClient admin = AdminClient.create(props)) {
DescribeTopicsResult result = admin.describeTopics(Collections.singleton("orders"));
Map<String, TopicDescription> descriptions = result.all().get();

for (Map.Entry<String, TopicDescription> entry : descriptions.entrySet()) {
System.out.println("Topic: " + entry.getKey());
for (PartitionInfo partition : entry.getValue().partitions()) {
System.out.println(" Partition: " + partition.partition());
System.out.println(" Leader: " + partition.leader());
System.out.println(" Replicas: " + Arrays.toString(partition.replicas()));
}
}
}

小结

  1. 主题是 Kafka 存储消息的逻辑容器
  2. 分区是实现并行处理和高可用的关键
  3. 分区策略影响消息顺序和负载均衡
  4. 合理规划分区数量很重要
  5. 消息保留策略需要根据业务需求选择

下一步

现在让我们学习 生产者 的使用,了解如何向 Kafka 发送消息。