跳到主要内容

Kafka 速查表

本章提供 Kafka 常用命令和配置的快速参考。

命令行工具

主题管理

# 列出所有主题
kafka-topics.sh --list --bootstrap-server localhost:9092

# 创建主题
kafka-topics.sh --create \
--topic my-topic \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1

# 查看主题详情
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

# 修改分区数
kafka-topics.sh --alter --topic my-topic --partitions 6 --bootstrap-server localhost:9092

# 删除主题
kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

生产者

# 控制台生产者
kafka-console-producer.sh \
--topic my-topic \
--bootstrap-server localhost:9092

# 带 Key 的生产者
kafka-console-producer.sh \
--topic my-topic \
--property parse.key=true \
--property key.separator=, \
--bootstrap-server localhost:9092

# 发送消息
# > key,value
# > user1,{"name":"张三"}

消费者

# 从最早开始消费
kafka-console-consumer.sh \
--topic my-topic \
--from-beginning \
--bootstrap-server localhost:9092

# 从最新开始消费
kafka-console-consumer.sh \
--topic my-topic \
--bootstrap-server localhost:9092

# 消费特定分区
kafka-console-consumer.sh \
--topic my-topic \
--partition 0 \
--offset 100 \
--bootstrap-server localhost:9092

# 带消费者组消费
kafka-console-consumer.sh \
--topic my-topic \
--group my-group \
--from-beginning \
--bootstrap-server localhost:9092

# 带 Key 显示
kafka-console-consumer.sh \
--topic my-topic \
--from-beginning \
--property print.key=true \
--property key.separator=" - " \
--bootstrap-server localhost:9092

消费者组

# 列出消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 查看消费组详情
kafka-consumer-groups.sh \
--group my-group \
--describe \
--bootstrap-server localhost:9092

# 重置偏移量到最早
kafka-consumer-groups.sh \
--group my-group \
--reset-offsets \
--to-earliest \
--topic my-topic \
--execute \
--bootstrap-server localhost:9092

# 重置偏移量到最新
kafka-consumer-groups.sh \
--group my-group \
--reset-offsets \
--to-latest \
--topic my-topic \
--execute \
--bootstrap-server localhost:9092

# 重置偏移量到指定位置
kafka-consumer-groups.sh \
--group my-group \
--reset-offsets \
--to-offset 1000 \
--topic my-topic \
--execute \
--bootstrap-server localhost:9092

副本管理

# 首选副本选举
kafka-leader-election.sh \
--topic my-topic \
--partition 0 \
--election-type preferred \
--bootstrap-server localhost:9092

# 所有主题首选副本选举
kafka-leader-election.sh \
--all-topics \
--election-type preferred \
--bootstrap-server localhost:9092

# 不清洁选举(慎用)
kafka-leader-election.sh \
--topic my-topic \
--partition 0 \
--election-type unclean \
--bootstrap-server localhost:9092

ACL 管理

# 添加生产者 ACL
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--producer \
--topic "*" \
--user producer

# 添加消费者 ACL
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--consumer \
--topic "*" \
--group "*" \
--user consumer

# 查看 ACL
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--list \
--topic my-topic

Producer 配置

Java Producer

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");

// acks (默认 1)
props.put("acks", "all");

// 重试
props.put("retries", 3);
props.put("retry.backoff.ms", 100);

// 批量发送
props.put("batch.size", 16384);
props.put("linger.ms", 10);

// 缓冲区
props.put("buffer.memory", 33554432);

// 压缩
props.put("compression.type", "lz4");

// 超时
props.put("request.timeout.ms", 30000);
props.put("delivery.timeout.ms", 120000);

// 幂等性
props.put("enable.idempotence", "true");
props.put("max.in.flight.requests.per.connection", 5);

Producer<String, String> producer = new KafkaProducer<>(props);

Consumer 配置

Java Consumer

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "StringDeserializer");
props.put("value.deserializer", "StringDeserializer");

// 自动提交
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 5000);

// 拉取配置
props.put("fetch.min.bytes", 1);
props.put("fetch.max.wait.ms", 500);
props.put("max.poll.records", 500);

// 心跳和会话
props.put("heartbeat.interval.ms", 3000);
props.put("session.timeout.ms", 45000);
props.put("max.poll.interval.ms", 300000);

// 偏移量
props.put("auto.offset.reset", "earliest");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}

Broker 配置

# 基本配置
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/var/lib/kafka/logs

# 分区和副本
num.partitions=3
default.replication.factor=1
min.insync.replicas=1

# 网络和线程
num.network.threads=3
num.io.threads=8

# 日志
log.retention.hours=168
log.segment.bytes=1073741824

# 副本
replica.lag.time.max.ms=30000

# 压缩
compression.type=producer

Docker Compose

单节点 Kafka

version: '3.8'

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

KRaft 模式(无需 Zookeeper)

version: '3.8'

services:
kafka:
image: apache/kafka:3.7.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

常用查询

查看消息数量

# 统计主题消息数
kafka-run-class.sh kafka.tools.GetOffsetShell \
--topic my-topic \
--time -1 \
--bootstrap-server localhost:9092 | \
awk -F: '{sum += $3} END {print sum}'

查看消费延迟

# 查看最大延迟
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe | awk '{print $NF}' | sort -rn | head -1

导出/导入偏移量

# 导出偏移量
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--export \
> offsets.txt

# 导入偏移量
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--import \
< offsets.txt

监控指标

Broker 关键指标

指标JMX 名称说明
MessagesInPerSeckafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec每秒消息数
BytesInPerSeckafka.server:type=BrokerTopicMetrics,name=BytesInPerSec每秒入站字节
UnderReplicatedPartitionskafka.server:type=ReplicaManager,name=UnderReplicatedPartitions复制不足分区
OfflinePartitionskafka.controller:type=KafkaController,name=OfflinePartitionsCount离线分区

Producer 关键指标

指标说明
record-send-rate发送速率
request-latency-avg平均延迟
record-queue-time-avg排队时间

Consumer 关键指标

指标说明
records-consumed-rate消费速率
records-lag-max最大延迟
fetch-latency-avg拉取延迟

快速故障排查

问题排查命令
分区不可用kafka-topics.sh --describe
消费延迟kafka-consumer-groups.sh --describe
副本不同步检查 UnderReplicatedPartitions
磁盘空间df -h + du -sh /var/lib/kafka
网络问题telnet localhost 9092
内存问题jmap -heap

端口说明

端口说明
9092Kafka Broker
2181Zookeeper
9093KRaft Controller
8080Kafka Connect REST
9999JMX 监控
9404Prometheus Exporter