跳到主要内容

Kafka 速查表

本章提供 Kafka 常用命令、配置和 API 的快速参考,方便日常开发和运维查阅。

命令行工具

主题管理

# 列出所有主题
kafka-topics.sh --list --bootstrap-server localhost:9092

# 创建主题
kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=604800000

# 查看主题详情
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

# 查看所有主题详情
kafka-topics.sh --describe --bootstrap-server localhost:9092

# 查看 Under-Replicated 分区
kafka-topics.sh --describe --under-replicated-partitions --bootstrap-server localhost:9092

# 查看离线分区
kafka-topics.sh --describe --unavailable-partitions --bootstrap-server localhost:9092

# 修改主题配置
kafka-configs.sh --alter \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=86400000 \
--bootstrap-server localhost:9092

# 增加分区(只能增加)
kafka-topics.sh --alter --topic my-topic --partitions 6 --bootstrap-server localhost:9092

# 删除主题
kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

生产者

# 控制台生产者
kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092

# 带 Key 的生产者
kafka-console-producer.sh \
--topic my-topic \
--property parse.key=true \
--property key.separator=, \
--bootstrap-server localhost:9092

# 带 ACL 的生产者
kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 \
--producer-property security.protocol=SASL_SSL \
--producer-property sasl.mechanism=SCRAM-SHA-512 \
--producer-property sasl.jaas.config='org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="pass";'

# 发送文件内容
kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092 < messages.txt

消费者

# 从最早开始消费
kafka-console-consumer.sh \
--topic my-topic \
--from-beginning \
--bootstrap-server localhost:9092

# 从最新开始消费
kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092

# 消费者组
kafka-console-consumer.sh \
--topic my-topic \
--group my-group \
--bootstrap-server localhost:9092

# 显示 Key 和 Value
kafka-console-consumer.sh \
--topic my-topic \
--from-beginning \
--property print.key=true \
--property key.separator=, \
--property print.timestamp=true \
--property print.partition=true \
--property print.offset=true \
--bootstrap-server localhost:9092

# 指定分区消费
kafka-console-consumer.sh \
--topic my-topic \
--partition 0 \
--offset 100 \
--bootstrap-server localhost:9092

# 最大消费数量
kafka-console-consumer.sh \
--topic my-topic \
--from-beginning \
--max-messages 10 \
--bootstrap-server localhost:9092

消费者组管理

# 列出消费者组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 查看消费者组详情
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-group

# 查看消费者组状态
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-group \
--state

# 重置消费者组偏移量 - 从最早开始
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--reset-offsets \
--to-earliest \
--execute \
--topic my-topic

# 重置消费者组偏移量 - 从最新开始
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--reset-offsets \
--to-latest \
--execute \
--topic my-topic

# 重置消费者组偏移量 - 指定偏移量
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--reset-offsets \
--to-offset 100 \
--execute \
--topic my-topic

# 重置消费者组偏移量 - 按时间
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--reset-offsets \
--to-datetime 2024-01-01T00:00:00.000 \
--execute \
--topic my-topic

# 删除消费者组
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--delete \
--group my-group

# 删除消费者组 Offset
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--delete-offsets \
--group my-group \
--topic my-topic

Broker 和集群管理

# 查看 Broker 信息
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# 查看集群 ID
kafka-cluster.sh cluster-id --bootstrap-server localhost:9092

# 查看日志目录
kafka-log-dirs.sh --describe \
--bootstrap-server localhost:9092 \
--broker-list 0,1,2

# 查看特定 Broker 的日志目录
kafka-log-dirs.sh --describe \
--bootstrap-server localhost:9092 \
--broker-list 0 \
--topic-list my-topic

# Leader 选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--all-topic-partitions \
--election-type preferred

# 分区重分配 - 生成方案
kafka-reassign-partitions.sh --generate \
--topics-to-move-json-file topics.json \
--broker-list "0,1,2" \
--bootstrap-server localhost:9092

# 分区重分配 - 执行
kafka-reassign-partitions.sh --execute \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092

# 分区重分配 - 验证
kafka-reassign-partitions.sh --verify \
--reassignment-json-file reassign.json \
--bootstrap-server localhost:9092

ACL 安全管理

# 列出所有 ACL
kafka-acls.sh --bootstrap-server localhost:9092 --list

# 添加生产者权限
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--allow-principal User:producer \
--producer \
--topic "*"

# 添加消费者权限
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--allow-principal User:consumer \
--consumer \
--topic "*" \
--group "*"

# 添加主题读写权限
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--allow-principal User:alice \
--operation Read \
--operation Write \
--topic orders

# 禁止访问
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--deny-principal User:bob \
--operation All \
--topic sensitive-data

# 删除 ACL
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--remove \
--allow-principal User:alice \
--operation Read \
--topic orders

配置管理

# 查看 Broker 配置
kafka-configs.sh --describe \
--entity-type brokers \
--entity-name 0 \
--bootstrap-server localhost:9092

# 动态修改 Broker 配置
kafka-configs.sh --alter \
--entity-type brokers \
--entity-name 0 \
--add-config log.retention.hours=48 \
--bootstrap-server localhost:9092

# 查看主题配置
kafka-configs.sh --describe \
--entity-type topics \
--entity-name my-topic \
--bootstrap-server localhost:9092

# 查看客户端配额
kafka-configs.sh --describe \
--entity-type clients \
--entity-name my-client \
--bootstrap-server localhost:9092

常用配置

生产者配置

# 基础配置
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

# 可靠性配置
acks=all # 等待所有 ISR 副本确认
retries=3 # 发送失败重试次数
enable.idempotence=true # 启用幂等性
max.in.flight.requests.per.connection=5 # 并发请求数

# 性能配置
batch.size=16384 # 批次大小(字节)
linger.ms=5 # 发送延迟等待
compression.type=lz4 # 压缩类型:none, gzip, snappy, lz4, zstd
buffer.memory=33554432 # 缓冲区大小(32MB)
max.request.size=1048576 # 最大请求大小

# 超时配置
request.timeout.ms=30000 # 请求超时
delivery.timeout.ms=120000 # 交付超时

# 事务配置
transactional.id=my-transaction-id
transaction.timeout.ms=60000

消费者配置

# 基础配置
bootstrap.servers=localhost:9092
group.id=my-consumer-group
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 偏移量管理
auto.offset.reset=earliest # 无偏移量时:earliest, latest, none
enable.auto.commit=true # 自动提交偏移量
auto.commit.interval.ms=5000 # 自动提交间隔

# 拉取配置
max.poll.records=500 # 每次拉取的最大记录数
fetch.min.bytes=1 # 最小拉取字节数
fetch.max.wait.ms=500 # 最大等待时间
fetch.max.bytes=52428800 # 最大拉取字节数

# 心跳和会话
heartbeat.interval.ms=3000 # 心跳间隔
session.timeout.ms=45000 # 会话超时
max.poll.interval.ms=300000 # 最大拉取间隔

# 分区分配策略
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

# 事务隔离
isolation.level=read_committed # 只读已提交消息

Broker 配置

# 基础配置
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/tmp/kafka-logs
num.network.threads=3
num.io.threads=8

# 日志配置
log.retention.hours=168 # 日志保留时间(7天)
log.retention.bytes=-1 # 日志保留大小(-1不限制)
log.segment.bytes=1073741824 # 日志段大小(1GB)
log.cleanup.policy=delete # 清理策略:delete, compact
log.flush.interval.messages=10000
log.flush.interval.ms=1000

# 复制配置
default.replication.factor=3
min.insync.replicas=2
num.replica.fetchers=2
replica.lag.time.max.ms=30000

# 性能配置
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=3

# 事务配置
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.num.partitions=1

# KRaft 配置(Kafka 3.x+)
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@localhost:9093
controller.listener.names=CONTROLLER

主题配置

# 消息保留
retention.ms=604800000 # 保留时间(7天)
retention.bytes=1073741824 # 保留大小(1GB)

# 日志压缩
cleanup.policy=compact # 压缩策略
min.cleanable.dirty.ratio=0.5
delete.retention.ms=86400000

# 消息大小
max.message.bytes=1048576 # 最大消息大小
segment.bytes=1073741824 # 段大小

# 复制配置
min.insync.replicas=2 # 最小 ISR

# 其他
compression.type=producer # 压缩类型

Java API 速查

生产者 API

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

// 创建生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("enable.idempotence", "true");

Producer<String, String> producer = new KafkaProducer<>(props);

// 同步发送
producer.send(new ProducerRecord<>("my-topic", "key", "value")).get();

// 异步发送
producer.send(new ProducerRecord<>("my-topic", "key", "value"),
(metadata, exception) -> {
if (exception == null) {
System.out.println("Sent to partition " + metadata.partition());
}
});

// 事务发送
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic1", "key", "value1"));
producer.send(new ProducerRecord<>("topic2", "key", "value2"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}

producer.close();

消费者 API

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

// 创建消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");

Consumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅主题
consumer.subscribe(Arrays.asList("my-topic"));

// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}

// 同步提交
consumer.commitSync();

// 异步提交
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed: " + exception);
}
});
}

// 手动分配分区
consumer.assign(Arrays.asList(new TopicPartition("my-topic", 0)));

// 从指定偏移量开始
consumer.seek(new TopicPartition("my-topic", 0), 100);

// 从头开始
consumer.seekToBeginning(Arrays.asList(new TopicPartition("my-topic", 0)));

// 从最新开始
consumer.seekToEnd(Arrays.asList(new TopicPartition("my-topic", 0)));

AdminClient API

import org.apache.kafka.clients.admin.*;
import java.util.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

try (AdminClient admin = AdminClient.create(props)) {

// 创建主题
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
admin.createTopics(Collections.singleton(topic)).all().get();

// 列出主题
Set<String> topics = admin.listTopics().names().get();

// 描述主题
DescribeTopicsResult result = admin.describeTopics(Collections.singleton("my-topic"));

// 删除主题
admin.deleteTopics(Collections.singleton("my-topic")).all().get();

// 创建分区
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put("my-topic", NewPartitions.increaseTo(6));
admin.createPartitions(newPartitions).all().get();

// 查看 Consumer Group
ListConsumerGroupsResult groups = admin.listConsumerGroups();
}

监控指标

Broker 指标

JMX MBean指标说明
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec每秒入站消息数监控吞吐量
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec每秒入站字节数网络带宽监控
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec每秒出站字节数网络带宽监控
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions复制不足分区数应为 0
kafka.server:type=ReplicaManager,name=OfflinePartitionsCount离线分区数应为 0
kafka.controller:type=KafkaController,name=ActiveControllerCount活跃控制器数应为 1
kafka.controller:type=KafkaController,name=LeaderElectionRateLeader 选举频率应接近 0
kafka.server:type=ReplicaManager,name=IsrShrinksPerSecISR 收缩速率应接近 0

Producer 指标

指标说明
record-send-rate发送速率
request-latency-avg平均请求延迟
record-queue-time-avg平均排队时间
error-rate错误率
retry-rate重试率
compression-rate压缩率

Consumer 指标

指标说明
records-consumed-rate消费速率
fetch-rate拉取频率
commit-latency-avg提交延迟
records-lag-max最大消费延迟
fetch-latency-avg拉取延迟

常见问题排查

生产者问题

问题可能原因解决方案
发送超时网络问题、Broker 繁忙增大 request.timeout.ms
消息丢失acks=0acks=1使用 acks=all
重复消息重试导致启用 enable.idempotence
吞吐量低批次太小增大 batch.sizelinger.ms

消费者问题

问题可能原因解决方案
频繁 Rebalance处理时间过长增大 max.poll.interval.ms
消费延迟大消费能力不足增加消费者数量
重复消费提交失败使用手动提交
消息丢失自动提交使用手动提交

Broker 问题

问题可能原因解决方案
Under-ReplicatedFollower 滞后检查网络、磁盘
离线分区Leader 不可用检查 Broker 状态
磁盘满数据量大调整保留策略
高延迟GC 问题调整 JVM 参数

Docker 快速启动

version: '3.8'
services:
kafka:
image: apache/kafka:3.7.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

小结

本速查表涵盖了 Kafka 日常开发和运维中最常用的命令、配置和 API。建议将本页面加入书签,需要时快速查阅。