Kafka 速查表
本章提供 Kafka 常用命令和配置的快速参考。
命令行工具
主题管理
# 列出所有主题
kafka-topics.sh --list --bootstrap-server localhost:9092
# 创建主题
kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# 查看主题详情
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
# 修改分区数
kafka-topics.sh --alter --topic my-topic --partitions 6 --bootstrap-server localhost:9092
# 删除主题
kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
生产者
# 控制台生产者
kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092
# 带 Key 的生产者
kafka-console-producer.sh \
--topic my-topic \
--property parse.key=true \
--property key.separator=, \
--bootstrap-server localhost:9092
# 发送消息
# > key,value
# > user1,{"name":"张三"}
消费者
# 从最早开始消费
kafka-console-consumer.sh \
--topic my-topic \
--from-beginning \
--bootstrap-server localhost:9092
# 从最新开始消费
kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092
# 消费特定分区
kafka-console-consumer.sh \
--topic my-topic \
--partition 0 \
--offset 100 \
--bootstrap-server localhost:9092
# 带消费者组消费
kafka-console-consumer.sh \
--topic my-topic \
--group my-group \
--from-beginning \
--bootstrap-server localhost:9092
# 带 Key 显示
kafka-console-consumer.sh \
--topic my-topic \
--from-beginning \
--property print.key=true \
--property key.separator=" - " \
--bootstrap-server localhost:9092
消费者组
# 列出消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 查看消费组详情
kafka-consumer-groups.sh \
--group my-group \
--describe \
--bootstrap-server localhost:9092
# 重置偏移量到最早
kafka-consumer-groups.sh \
--group my-group \
--reset-offsets \
--to-earliest \
--topic my-topic \
--execute \
--bootstrap-server localhost:9092
# 重置偏移量到最新
kafka-consumer-groups.sh \
--group my-group \
--reset-offsets \
--to-latest \
--topic my-topic \
--execute \
--bootstrap-server localhost:9092
# 重置偏移量到指定位置
kafka-consumer-groups.sh \
--group my-group \
--reset-offsets \
--to-offset 1000 \
--topic my-topic \
--execute \
--bootstrap-server localhost:9092
副本管理
# 首选副本选举
kafka-leader-election.sh \
--topic my-topic \
--partition 0 \
--election-type preferred \
--bootstrap-server localhost:9092
# 所有主题首选副本选举
kafka-leader-election.sh \
--all-topics \
--election-type preferred \
--bootstrap-server localhost:9092
# 不清洁选举(慎用)
kafka-leader-election.sh \
--topic my-topic \
--partition 0 \
--election-type unclean \
--bootstrap-server localhost:9092
ACL 管理
# 添加生产者 ACL
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--producer \
--topic "*" \
--user producer
# 添加消费者 ACL
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--consumer \
--topic "*" \
--group "*" \
--user consumer
# 查看 ACL
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--list \
--topic my-topic
Producer 配置
Java Producer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");
// acks (默认 1)
props.put("acks", "all");
// 重试
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// 批量发送
props.put("batch.size", 16384);
props.put("linger.ms", 10);
// 缓冲区
props.put("buffer.memory", 33554432);
// 压缩
props.put("compression.type", "lz4");
// 超时
props.put("request.timeout.ms", 30000);
props.put("delivery.timeout.ms", 120000);
// 幂等性
props.put("enable.idempotence", "true");
props.put("max.in.flight.requests.per.connection", 5);
Producer<String, String> producer = new KafkaProducer<>(props);
Consumer 配置
Java Consumer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "StringDeserializer");
props.put("value.deserializer", "StringDeserializer");
// 自动提交
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 5000);
// 拉取配置
props.put("fetch.min.bytes", 1);
props.put("fetch.max.wait.ms", 500);
props.put("max.poll.records", 500);
// 心跳和会话
props.put("heartbeat.interval.ms", 3000);
props.put("session.timeout.ms", 45000);
props.put("max.poll.interval.ms", 300000);
// 偏移量
props.put("auto.offset.reset", "earliest");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
Broker 配置
# 基本配置
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/var/lib/kafka/logs
# 分区和副本
num.partitions=3
default.replication.factor=1
min.insync.replicas=1
# 网络和线程
num.network.threads=3
num.io.threads=8
# 日志
log.retention.hours=168
log.segment.bytes=1073741824
# 副本
replica.lag.time.max.ms=30000
# 压缩
compression.type=producer
Docker Compose
单节点 Kafka
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KRaft 模式(无需 Zookeeper)
version: '3.8'
services:
kafka:
image: apache/kafka:3.7.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
常用查询
查看消息数量
# 统计主题消息数
kafka-run-class.sh kafka.tools.GetOffsetShell \
--topic my-topic \
--time -1 \
--bootstrap-server localhost:9092 | \
awk -F: '{sum += $3} END {print sum}'
查看消费延迟
# 查看最大延迟
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe | awk '{print $NF}' | sort -rn | head -1
导出/导入偏移量
# 导出偏移量
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--export \
> offsets.txt
# 导入偏移量
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--import \
< offsets.txt
监控指标
Broker 关键指标
| 指标 | JMX 名称 | 说明 |
|---|---|---|
| MessagesInPerSec | kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec | 每秒消息数 |
| BytesInPerSec | kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec | 每秒入站字节 |
| UnderReplicatedPartitions | kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions | 复制不足分区 |
| OfflinePartitions | kafka.controller:type=KafkaController,name=OfflinePartitionsCount | 离线分区 |
Producer 关键指标
| 指标 | 说明 |
|---|---|
| record-send-rate | 发送速率 |
| request-latency-avg | 平均延迟 |
| record-queue-time-avg | 排队时间 |
Consumer 关键指标
| 指标 | 说明 |
|---|---|
| records-consumed-rate | 消费速率 |
| records-lag-max | 最大延迟 |
| fetch-latency-avg | 拉取延迟 |
快速故障排查
| 问题 | 排查命令 |
|---|---|
| 分区不可用 | kafka-topics.sh --describe |
| 消费延迟 | kafka-consumer-groups.sh --describe |
| 副本不同步 | 检查 UnderReplicatedPartitions |
| 磁盘空间 | df -h + du -sh /var/lib/kafka |
| 网络问题 | telnet localhost 9092 |
| 内存问题 | jmap -heap |
端口说明
| 端口 | 说明 |
|---|---|
| 9092 | Kafka Broker |
| 2181 | Zookeeper |
| 9093 | KRaft Controller |
| 8080 | Kafka Connect REST |
| 9999 | JMX 监控 |
| 9404 | Prometheus Exporter |