Kafka 监控与运维
本章将详细介绍 Kafka 的监控指标、性能调优和运维实践。
监控概述
为什么要监控 Kafka?
┌─────────────────────────────────────────────────────────────┐
│ Kafka 监控重要性 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 性能优化 │
│ - 识别瓶颈 │
│ - 优化配置 │
│ │
│ 2. 问题诊断 │
│ - 快速定位问题 │
│ - 减少故障恢复时间 │
│ │
│ 3. 容量规划 │
│ - 预测资源需求 │
│ - 规划扩展 │
│ │
│ 4. 告警通知 │
│ - 异常告警 │
│ - 预防故障 │
│ │
└─────────────────────────────────────────────────────────────┘
JMX 监控
启用 JMX
# 方式1:环境变量
export JMX_PORT=9999
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
# 方式2:启动脚本修改
bin/kafka-server-start.sh
# 添加
export JMX_PORT=9999
JMX 指标
# 使用 jmxterm 查看指标
java -jar jmxterm-1.0.0.jar -l localhost:9999
# 查看 Broker 指标
> bean kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
> bean kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
> bean kafka.server:type=BrokerTopicMetrics,name=RequestsPerSec
关键监控指标
Broker 指标
| 指标名称 | 说明 | 正常范围 |
|---|---|---|
MessagesInPerSec | 每秒消息数 | 根据业务 |
BytesInPerSec | 每秒入站字节 | < 网络带宽 |
BytesOutPerSec | 每秒出站字节 | < 网络带宽 |
RequestsPerSec | 每秒请求数 | < 负载 |
UnderReplicatedPartitions | 复制不足分区 | 0 |
OfflinePartitionsCount | 离线分区数 | 0 |
ActiveControllerCount | 活跃控制器数 | 1 |
LeaderElectionRate | Leader 选举频率 | ≈ 0 |
Producer 指标
| 指标名称 | 说明 | 正常范围 |
|---|---|---|
record-send-rate | 发送速率 | 符合预期 |
request-latency-avg | 平均延迟 | < 100ms |
record-queue-time-avg | 排队时间 | < 10ms |
error-rate | 错误率 | ≈ 0 |
retry-rate | 重试率 | < 5% |
Consumer 指标
| 指标名称 | 说明 | 正常范围 |
|---|---|---|
records-consumed-rate | 消费速率 | 符合预期 |
fetch-rate | 拉取频率 | 正常 |
commit-latency-avg | 提交延迟 | < 10ms |
records-lag-max | 最大消费延迟 | 接近 0 |
fetch-latency-avg | 拉取延迟 | < 100ms |
Consumer Lag
┌─────────────────────────────────────────────────────────────┐
│ Consumer Lag 示意图 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 分区: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ [0][1][2][3][4][5][6][7][8][9][10][11][12] │ │
│ │ ↑ ↑ │ │
│ │ Producer Consumer │ │
│ │ (写入中) (消费中) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Lag = 8 │ │
│ │ (落后 8 条消息) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Lag = Producer Offset - Consumer Offset │
│ │
│ 监控 Lag 很重要: │
│ - Lag 过大说明消费者处理不及时 │
│ - 可能需要增加消费者数量 │
│ - 可能需要优化处理逻辑 │
│ │
└─────────────────────────────────────────────────────────────┘
查看 Lag
# 使用 kafka-consumer-groups 命令
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe
# 输出示例
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
my-group orders 0 1000 1200 200
my-group orders 1 800 1000 200
my-group orders 2 950 1000 50
Prometheus + Grafana
配置 JMX Exporter
# docker-compose.yml
services:
jmx-exporter:
image: bitnami/jmx-exporter:latest
ports:
- "9404:9404"
volumes:
- ./jmx-config.yml:/config.yml
environment:
- JAVA_OPTS=-Xmx128M -XX:+UseG1GC -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9010 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
JMX 配置 jmx-config.yml:
---
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=(.*)><>Count'
name: kafka_server_broker_topic_metrics_$1_total
type: GAUGE
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=(.*)><>OneMinuteRate'
name: kafka_server_broker_topic_metrics_$1_rate
type: GAUGE
- pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.*)><>(records-lag-max|records-consumed-rate|fetch-latency-avg)'
name: kafka_consumer_$2
labels:
client_id: "$1"
type: GAUGE
Grafana Dashboard
{
"dashboard": {
"title": "Kafka Overview",
"panels": [
{
"title": "Messages In Per Sec",
"targets": [
{
"expr": "sum(rate(kafka_server_broker_topic_metrics_messages_in_total[5m]))"
}
]
},
{
"title": "Consumer Lag",
"targets": [
{
"expr": "sum(kafka_consumer_records_lag_max)"
}
]
}
]
}
}
性能调优
Broker 调优
# 网络和线程
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
# 批量处理
batch.size=32768
linger.ms=10
# 副本
replica.socket.timeout.ms=30000
replica.fetch.max.bytes=1048576
# 日志
log.segment.bytes=1073741824
log.retention.hours=168
Producer 调优
// 高吞吐配置
props.put("batch.size", 32768);
props.put("linger.ms", 10);
props.put("buffer.memory", 67108864);
props.put("compression.type", "lz4");
props.put("acks", "1");
// 低延迟配置
props.put("linger.ms", 0);
props.put("acks", "1");
// 高可靠配置
props.put("acks", "all");
props.put("retries", 3);
props.put("enable.idempotence", true);
Consumer 调优
// 高吞吐配置
props.put("fetch.min.bytes", 1);
props.put("fetch.max.wait.ms", 500);
props.put("max.poll.records", 500);
props.put("enable.auto.commit", false);
// 减少 Rebalance
props.put("session.timeout.ms", 60000);
props.put("heartbeat.interval.ms", 3000);
props.put("max.poll.interval.ms", 300000);
日志管理
Broker 日志
# 配置日志目录
log.dirs=/var/lib/kafka/logs
# 日志级别
# config/log4j.properties
log4j.rootLogger=INFO, stdout
# 控制器日志
log4j.logger.kafka.controller=DEBUG
# 日志清理
log.retention.hours=168
log.retention.bytes=10737418240
log.segment.bytes=1073741824
日志分析
# 分析日志大小
du -sh /var/lib/kafka/logs/*
# 清理旧日志
kafka-log-delete-script.sh --delete \
--bootstrap-server localhost:9092 \
--topic my-topic \
--older-than 604800000
运维命令
日常运维
# 查看主题列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 查看主题详情
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
# 查看消费进度
kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092
# 均衡 Leader
kafka-leader-election.sh --all-topics --election-type preferred --bootstrap-server localhost:9092
故障恢复
# 1. 检查 Broker 状态
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# 2. 检查分区状态
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092
# 3. 修复 UnderReplicated 分区
# 检查网络和磁盘
# 重启问题 Broker
# 4. 手动选举 Leader
kafka-leader-election.sh --topic my-topic --partition 0 --election-type unclean --bootstrap-server localhost:9092
# 5. 清理 ISR
# 减少 min.insync.replicas 临时恢复
告警规则
关键告警
| 告警项 | 条件 | 级别 |
|---|---|---|
| Broker Down | 任何 Broker 不可用 | Critical |
| UnderReplicated | UnderReplicated > 0 | Warning |
| Offline Partition | OfflinePartition > 0 | Critical |
| Consumer Lag | Lag > 10000 | Warning |
| Disk Usage | Disk > 80% | Warning |
| Controller Change | 频繁切换 | Warning |
容量规划
计算公式
分区数 = max(目标吞吐量 / 单分区吞吐量, 消费者数)
Broker数 = ceil(每日数据量 * 保留天数 / 单Broker存储)
副本数 = 业务重要性决定 (2-3)
示例
目标:
- 吞吐量: 100,000 msg/s
- 单分区吞吐: 10,000 msg/s
- 消费者: 10
- 数据保留: 7 天
- 消息大小: 1 KB
- 副本数: 3
计算:
- 分区数 = max(100000/10000, 10) = max(10, 10) = 10
- 每日数据量 = 100,000 * 86400 * 1KB = 8.64 GB
- 总存储 = 8.64 * 7 * 3 = 181 GB
- Broker数 = ceil(181 / 500GB) = 1
小结
- 监控 Kafka 需要关注多个层面的指标
- Consumer Lag 是关键性能指标
- Prometheus + Grafana 是常用监控方案
- 合理的调优可以显著提升性能
- 容量规划要综合考虑多个因素
下一步
现在让我们创建一个 速查表 方便日常查阅。