Kafka 监控与运维
本章将详细介绍 Kafka 的监控指标、性能调优和运维实践。
监控概述
为什么要监控 Kafka?
JMX 监控
启用 JMX
# 方式1:环境变量
export JMX_PORT=9999
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
# 方式2:启动脚本修改 bin/kafka-server-start.sh
# 添加
export JMX_PORT=9999
JMX 指标
# 使用 jmxterm 查看指标
java -jar jmxterm-1.0.0.jar -l localhost:9999
# 查看 Broker 指标
> bean kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
> bean kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
> bean kafka.server:type=BrokerTopicMetrics,name=RequestsPerSec
关键监控指标
Broker 指标
| 指标名称 | 说明 | 正常范围 |
|---|---|---|
MessagesInPerSec | 每秒消息数 | 根据业务 |
BytesInPerSec | 每秒入站字节 | < 网络带宽 |
BytesOutPerSec | 每秒出站字节 | < 网络带宽 |
RequestsPerSec | 每秒请求数 | < 负载 |
UnderReplicatedPartitions | 复制不足分区 | 0 |
OfflinePartitionsCount | 离线分区数 | 0 |
ActiveControllerCount | 活跃控制器数 | 1 |
LeaderElectionRate | Leader 选举频率 | 接近 0 |
Producer 指标
| 指标名称 | 说明 | 正常范围 |
|---|---|---|
record-send-rate | 发送速率 | 符合预期 |
request-latency-avg | 平均延迟 | < 100ms |
record-queue-time-avg | 排队时间 | < 10ms |
error-rate | 错误率 | 接近 0 |
retry-rate | 重试率 | < 5% |
Consumer 指标
| 指标名称 | 说明 | 正常范围 |
|---|---|---|
records-consumed-rate | 消费速率 | 符合预期 |
fetch-rate | 拉取频率 | 正常 |
commit-latency-avg | 提交延迟 | < 10ms |
records-lag-max | 最大消费延迟 | 接近 0 |
fetch-latency-avg | 拉取延迟 | < 100ms |
Consumer Lag
Lag = Producer Offset - Consumer Offset = 12 - 4 = 8
监控 Lag 很重要:
- Lag 过大说明消费者处理不及时
- 可能需要增加消费者数量
- 可能需要优化处理逻辑
查看 Lag
# 使用 kafka-consumer-groups 命令
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe
# 输出示例
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
my-group orders 0 1000 1200 200
my-group orders 1 800 1000 200
my-group orders 2 950 1000 50
Prometheus + Grafana
配置 JMX Exporter
# docker-compose.yml
services:
jmx-exporter:
image: bitnami/jmx-exporter:latest
ports:
- "9404:9404"
volumes:
- ./jmx-config.yml:/config.yml
environment:
- JAVA_OPTS=-Xmx128M -XX:+UseG1GC -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9010 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
JMX 配置 jmx-config.yml:
---
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=(.*)><>Count'
name: kafka_server_broker_topic_metrics_$1_total
type: GAUGE
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=(.*)><>OneMinuteRate'
name: kafka_server_broker_topic_metrics_$1_rate
type: GAUGE
- pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.*)><>(records-lag-max|records-consumed-rate|fetch-latency-avg)'
name: kafka_consumer_$2
labels:
client_id: "$1"
type: GAUGE
Grafana Dashboard
{
"dashboard": {
"title": "Kafka Overview",
"panels": [
{
"title": "Messages In Per Sec",
"targets": [
{
"expr": "sum(rate(kafka_server_broker_topic_metrics_messages_in_total[5m]))"
}
]
},
{
"title": "Consumer Lag",
"targets": [
{
"expr": "sum(kafka_consumer_records_lag_max)"
}
]
}
]
}
}
性能调优
性能调优是 Kafka 运维的核心工作之一。本节将从 Broker、Producer、Consumer 三个维度详细介绍调优策略。
Broker 调优
Broker 调优需要从硬件选择、JVM 配置、操作系统参数和 Kafka 配置四个层面考虑。
硬件选择
Kafka 是 I/O 密集型应用,硬件选择对性能影响显著:
| 硬件 | 推荐配置 | 说明 |
|---|---|---|
| 磁盘 | SSD 或 NVMe | 顺序写入性能好,避免机械硬盘 |
| 内存 | 32GB+ | 大部分用于 Page Cache |
| CPU | 多核 | 压缩、加密计算密集 |
| 网络 | 万兆网卡 | 高吞吐场景必备 |
JVM 配置
# 推荐的 JVM 配置
export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
# G1 垃圾收集器(JDK 11+ 推荐)
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent \
-XX:MaxInlineSize=8"
# 启动 Kafka
bin/kafka-server-start.sh config/kraft/server.properties
JVM 调优要点:
- 堆内存大小:不要超过 6GB,过大反而导致 GC 问题
- 使用 G1 GC:适合大堆内存,控制停顿时间
- 避免 CMS:JDK 11+ 推荐使用 G1
操作系统调优
# 增加文件描述符限制
ulimit -n 1000000
# 优化网络参数
sysctl -w net.core.somaxconn=65535
sysctl -w net.ipv4.tcp_max_syn_backlog=65535
sysctl -w net.core.netdev_max_backlog=65535
# 禁用 Swap(重要)
sysctl -w vm.swappiness=0
# 调整脏页比例
sysctl -w vm.dirty_ratio=80
sysctl -w vm.dirty_background_ratio=5
# 文件系统(推荐 XFS)
# 挂载选项:noatime,nodiratime
Broker 配置调优
# 网络和线程配置
# num.network.threads: 处理网络请求的线程数
# 建议设置为 CPU 核数
num.network.threads=8
# num.io.threads: 处理磁盘 I/O 的线程数
# 建议设置为磁盘数的 2 倍
num.io.threads=16
# Socket 缓冲区大小
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
# 日志配置
# 日志段大小,影响索引粒度和文件句柄数
log.segment.bytes=1073741824
# 日志保留时间
log.retention.hours=168
# 日志刷新间隔(通常依赖 OS Page Cache)
# log.flush.interval.messages=10000
# log.flush.interval.ms=1000
# 后台任务线程数
background.threads=10
# 副本 Fetcher 线程数
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
# 日志目录(多磁盘分散 I/O)
log.dirs=/disk1/kafka-logs,/disk2/kafka-logs,/disk3/kafka-logs
Producer 调优
Producer 配置直接影响消息发送的吞吐量和延迟。
高吞吐配置
适用于日志收集、大数据分析等场景:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");
// 批量发送配置
// 增大批次大小,减少请求次数
props.put("batch.size", 65536); // 64KB
// 增加等待时间,积累更多消息
props.put("linger.ms", 20);
// 增大缓冲区
props.put("buffer.memory", 67108864); // 64MB
// 启用压缩(推荐 lz4 或 zstd)
props.put("compression.type", "zstd");
// 降低确认级别(牺牲可靠性换性能)
props.put("acks", "1");
// 并发请求数
props.put("max.in.flight.requests.per.connection", 10);
低延迟配置
适用于实时交易、在线业务等场景:
Properties props = new Properties();
// 立即发送,不等待
props.put("linger.ms", 0);
// 小批次
props.put("batch.size", 16384);
// 单次确认
props.put("acks", "1");
// 减少缓冲区
props.put("buffer.memory", 33554432); // 32MB
高可靠配置
适用于金融交易、订单处理等场景:
Properties props = new Properties();
// 所有副本确认
props.put("acks", "all");
// 启用幂等性
props.put("enable.idempotence", "true");
// 无限重试
props.put("retries", Integer.MAX_VALUE);
// 限制并发请求数(保证顺序)
props.put("max.in.flight.requests.per.connection", 5);
// 使用压缩减少数据量
props.put("compression.type", "lz4");
压缩算法选择
| 压缩算法 | CPU 消耗 | 压缩率 | 适用场景 |
|---|---|---|---|
| none | 无 | 0% | 网络带宽充足 |
| gzip | 高 | 高 | CPU 充足、带宽受限 |
| snappy | 低 | 中等 | 平衡 CPU 和带宽 |
| lz4 | 很低 | 中等 | 推荐默认选择 |
| zstd | 中等 | 高 | Kafka 2.1+ 推荐 |
Consumer 调优
Consumer 配置影响消费能力和 Rebalance 行为。
高吞吐配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
// 增大每次拉取数量
props.put("max.poll.records", 1000);
// 增大拉取字节数
props.put("fetch.max.bytes", 104857600); // 100MB
props.put("max.partition.fetch.bytes", 1048576);
// 等待更多数据
props.put("fetch.min.bytes", 102400); // 100KB
props.put("fetch.max.wait.ms", 1000);
// 禁用自动提交
props.put("enable.auto.commit", "false");
减少 Rebalance
Rebalance 是 Consumer Group 重新分配分区的过程,会导致消费暂停:
// 增大会话超时时间
props.put("session.timeout.ms", 60000);
// 心跳间隔(建议 < session.timeout.ms / 3)
props.put("heartbeat.interval.ms", 20000);
// 增大最大拉取间隔(两次 poll 之间的最大间隔)
props.put("max.poll.interval.ms", 600000);
// 减少每次拉取数量,加快处理
props.put("max.poll.records", 100);
// 使用协作粘性分配器(Kafka 2.4+)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
Rebalance 优化策略:
- 确保处理时间 < max.poll.interval.ms
- 使用多线程处理:主线程只负责 poll,业务处理交给工作线程
- 监控 Rebalance 事件:通过 ConsumerRebalanceListener 监听
多线程消费模式
public class MultiThreadConsumer {
private final KafkaConsumer<String, String> consumer;
private final ExecutorService executor;
private final Map<TopicPartition, WorkerThread> workers = new ConcurrentHashMap<>();
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
WorkerThread worker = workers.computeIfAbsent(partition,
p -> new WorkerThread(p, this::commitOffset));
worker.submit(partitionRecords);
}
}
} finally {
executor.shutdown();
consumer.close();
}
}
private void commitOffset(TopicPartition partition, long offset) {
// 异步提交特定分区的 offset
consumer.commitAsync(
Collections.singletonMap(partition, new OffsetAndMetadata(offset)),
null
);
}
}
容量规划
分区数量计算
分区数 = max(目标吞吐量 / 单分区吞吐量, 消费者数量)
示例:
- 目标吞吐量:100 MB/s
- 单分区吞吐量:10 MB/s
- 消费者数量:8
分区数 = max(100 / 10, 8) = max(10, 8) = 10
存储容量计算
总存储 = 每日消息量 × 保留天数 × 副本数 × 压缩比 × 安全系数
示例:
- 每日消息量:100 GB
- 保留天数:7
- 副本数:3
- 压缩比:0.5(50% 压缩)
- 安全系数:1.2
总存储 = 100 × 7 × 3 × 0.5 × 1.2 = 1260 GB
Broker 数量计算
Broker 数 = ceil(总存储 / 单 Broker 存储, 目标吞吐 / 单 Broker 吞吐)
考虑因素:
- 每个 Broker 的磁盘容量
- 每个 Broker 的网络带宽
- 容错需求(N-1 个 Broker 故障)
日志管理
Broker 日志
# 配置日志目录
log.dirs=/var/lib/kafka/logs
# 日志级别
# config/log4j.properties
log4j.rootLogger=INFO, stdout
# 控制器日志
log4j.logger.kafka.controller=DEBUG
# 日志清理
log.retention.hours=168
log.retention.bytes=10737418240
log.segment.bytes=1073741824
日志分析
# 分析日志大小
du -sh /var/lib/kafka/logs/*
# 清理旧日志
kafka-log-delete-script.sh --delete \
--bootstrap-server localhost:9092 \
--topic my-topic \
--older-than 604800000
运维命令
日常运维
# 查看主题列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 查看主题详情
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
# 查看消费进度
kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092
# 均衡 Leader
kafka-leader-election.sh --all-topics --election-type preferred --bootstrap-server localhost:9092
故障恢复
# 1. 检查 Broker 状态
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# 2. 检查分区状态
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
# 3. 修复 UnderReplicated 分区
# 检查网络和磁盘
# 重启问题 Broker
# 4. 手动选举 Leader
kafka-leader-election.sh --topic my-topic --partition 0 --election-type unclean --bootstrap-server localhost:9092
# 5. 清理 ISR
# 减少 min.insync.replicas 临时恢复
告警规则
关键告警
| 告警项 | 条件 | 级别 |
|---|---|---|
| Broker Down | 任何 Broker 不可用 | Critical |
| UnderReplicated | UnderReplicated > 0 | Warning |
| Offline Partition | OfflinePartition > 0 | Critical |
| Consumer Lag | Lag > 10000 | Warning |
| Disk Usage | Disk > 80% | Warning |
| Controller Change | 频繁切换 | Warning |
容量规划
计算公式
分区数 = max(目标吞吐量 / 单分区吞吐量, 消费者数)
Broker数 = ceil(每日数据量 * 保留天数 / 单Broker存储)
副本数 = 业务重要性决定(2-3)
示例
目标:
- 吞吐量: 100,000 msg/s
- 单分区吞吐: 10,000 msg/s
- 消费者: 10
- 数据保留: 7 天
- 消息大小: 1 KB
- 副本数: 3
计算:
- 分区数 = max(100000/10000, 10) = max(10, 10) = 10
- 每日数据量 = 100,000 * 86400 * 1KB = 8.64 GB
- 总存储 = 8.64 * 7 * 3 = 181 GB
- Broker数 = ceil(181 / 500GB) = 1
小结
- 监控 Kafka 需要关注多个层面的指标
- Consumer Lag 是关键性能指标
- Prometheus + Grafana 是常用监控方案
- 合理的调优可以显著提升性能
- 容量规划要综合考虑多个因素
下一步
现在让我们创建一个 速查表 方便日常查阅。