跳到主要内容

Kafka 监控与运维

本章将详细介绍 Kafka 的监控指标、性能调优和运维实践。

监控概述

为什么要监控 Kafka?

┌─────────────────────────────────────────────────────────────┐
│ Kafka 监控重要性 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 性能优化 │
│ - 识别瓶颈 │
│ - 优化配置 │
│ │
│ 2. 问题诊断 │
│ - 快速定位问题 │
│ - 减少故障恢复时间 │
│ │
│ 3. 容量规划 │
│ - 预测资源需求 │
│ - 规划扩展 │
│ │
│ 4. 告警通知 │
│ - 异常告警 │
│ - 预防故障 │
│ │
└─────────────────────────────────────────────────────────────┘

JMX 监控

启用 JMX

# 方式1:环境变量
export JMX_PORT=9999
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

# 方式2:启动脚本修改
bin/kafka-server-start.sh

# 添加
export JMX_PORT=9999

JMX 指标

# 使用 jmxterm 查看指标
java -jar jmxterm-1.0.0.jar -l localhost:9999

# 查看 Broker 指标
> bean kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
> bean kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
> bean kafka.server:type=BrokerTopicMetrics,name=RequestsPerSec

关键监控指标

Broker 指标

指标名称说明正常范围
MessagesInPerSec每秒消息数根据业务
BytesInPerSec每秒入站字节< 网络带宽
BytesOutPerSec每秒出站字节< 网络带宽
RequestsPerSec每秒请求数< 负载
UnderReplicatedPartitions复制不足分区0
OfflinePartitionsCount离线分区数0
ActiveControllerCount活跃控制器数1
LeaderElectionRateLeader 选举频率≈ 0

Producer 指标

指标名称说明正常范围
record-send-rate发送速率符合预期
request-latency-avg平均延迟< 100ms
record-queue-time-avg排队时间< 10ms
error-rate错误率≈ 0
retry-rate重试率< 5%

Consumer 指标

指标名称说明正常范围
records-consumed-rate消费速率符合预期
fetch-rate拉取频率正常
commit-latency-avg提交延迟< 10ms
records-lag-max最大消费延迟接近 0
fetch-latency-avg拉取延迟< 100ms

Consumer Lag

┌─────────────────────────────────────────────────────────────┐
│ Consumer Lag 示意图 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 分区: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ [0][1][2][3][4][5][6][7][8][9][10][11][12] │ │
│ │ ↑ ↑ │ │
│ │ Producer Consumer │ │
│ │ (写入中) (消费中) │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Lag = 8 │ │
│ │ (落后 8 条消息) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Lag = Producer Offset - Consumer Offset │
│ │
│ 监控 Lag 很重要: │
│ - Lag 过大说明消费者处理不及时 │
│ - 可能需要增加消费者数量 │
│ - 可能需要优化处理逻辑 │
│ │
└─────────────────────────────────────────────────────────────┘

查看 Lag

# 使用 kafka-consumer-groups 命令
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe

# 输出示例
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
my-group orders 0 1000 1200 200
my-group orders 1 800 1000 200
my-group orders 2 950 1000 50

Prometheus + Grafana

配置 JMX Exporter

# docker-compose.yml
services:
jmx-exporter:
image: bitnami/jmx-exporter:latest
ports:
- "9404:9404"
volumes:
- ./jmx-config.yml:/config.yml
environment:
- JAVA_OPTS=-Xmx128M -XX:+UseG1GC -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9010 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false

JMX 配置 jmx-config.yml:

---
lowercaseOutputName: true
lowercaseOutputLabelNames: true

rules:
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=(.*)><>Count'
name: kafka_server_broker_topic_metrics_$1_total
type: GAUGE

- pattern: 'kafka.server<type=BrokerTopicMetrics, name=(.*)><>OneMinuteRate'
name: kafka_server_broker_topic_metrics_$1_rate
type: GAUGE

- pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.*)><>(records-lag-max|records-consumed-rate|fetch-latency-avg)'
name: kafka_consumer_$2
labels:
client_id: "$1"
type: GAUGE

Grafana Dashboard

{
"dashboard": {
"title": "Kafka Overview",
"panels": [
{
"title": "Messages In Per Sec",
"targets": [
{
"expr": "sum(rate(kafka_server_broker_topic_metrics_messages_in_total[5m]))"
}
]
},
{
"title": "Consumer Lag",
"targets": [
{
"expr": "sum(kafka_consumer_records_lag_max)"
}
]
}
]
}
}

性能调优

Broker 调优

# 网络和线程
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576

# 批量处理
batch.size=32768
linger.ms=10

# 副本
replica.socket.timeout.ms=30000
replica.fetch.max.bytes=1048576

# 日志
log.segment.bytes=1073741824
log.retention.hours=168

Producer 调优

// 高吞吐配置
props.put("batch.size", 32768);
props.put("linger.ms", 10);
props.put("buffer.memory", 67108864);
props.put("compression.type", "lz4");
props.put("acks", "1");

// 低延迟配置
props.put("linger.ms", 0);
props.put("acks", "1");

// 高可靠配置
props.put("acks", "all");
props.put("retries", 3);
props.put("enable.idempotence", true);

Consumer 调优

// 高吞吐配置
props.put("fetch.min.bytes", 1);
props.put("fetch.max.wait.ms", 500);
props.put("max.poll.records", 500);
props.put("enable.auto.commit", false);

// 减少 Rebalance
props.put("session.timeout.ms", 60000);
props.put("heartbeat.interval.ms", 3000);
props.put("max.poll.interval.ms", 300000);

日志管理

Broker 日志

# 配置日志目录
log.dirs=/var/lib/kafka/logs

# 日志级别
# config/log4j.properties
log4j.rootLogger=INFO, stdout

# 控制器日志
log4j.logger.kafka.controller=DEBUG

# 日志清理
log.retention.hours=168
log.retention.bytes=10737418240
log.segment.bytes=1073741824

日志分析

# 分析日志大小
du -sh /var/lib/kafka/logs/*

# 清理旧日志
kafka-log-delete-script.sh --delete \
--bootstrap-server localhost:9092 \
--topic my-topic \
--older-than 604800000

运维命令

日常运维

# 查看主题列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 查看主题详情
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

# 查看消费进度
kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092

# 均衡 Leader
kafka-leader-election.sh --all-topics --election-type preferred --bootstrap-server localhost:9092

故障恢复

# 1. 检查 Broker 状态
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# 2. 检查分区状态
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

# 3. 修复 UnderReplicated 分区
# 检查网络和磁盘
# 重启问题 Broker

# 4. 手动选举 Leader
kafka-leader-election.sh --topic my-topic --partition 0 --election-type unclean --bootstrap-server localhost:9092

# 5. 清理 ISR
# 减少 min.insync.replicas 临时恢复

告警规则

关键告警

告警项条件级别
Broker Down任何 Broker 不可用Critical
UnderReplicatedUnderReplicated > 0Warning
Offline PartitionOfflinePartition > 0Critical
Consumer LagLag > 10000Warning
Disk UsageDisk > 80%Warning
Controller Change频繁切换Warning

容量规划

计算公式

分区数 = max(目标吞吐量 / 单分区吞吐量, 消费者数)

Broker数 = ceil(每日数据量 * 保留天数 / 单Broker存储)

副本数 = 业务重要性决定 (2-3)

示例

目标:
- 吞吐量: 100,000 msg/s
- 单分区吞吐: 10,000 msg/s
- 消费者: 10
- 数据保留: 7 天
- 消息大小: 1 KB
- 副本数: 3

计算:
- 分区数 = max(100000/10000, 10) = max(10, 10) = 10
- 每日数据量 = 100,000 * 86400 * 1KB = 8.64 GB
- 总存储 = 8.64 * 7 * 3 = 181 GB
- Broker数 = ceil(181 / 500GB) = 1

小结

  1. 监控 Kafka 需要关注多个层面的指标
  2. Consumer Lag 是关键性能指标
  3. Prometheus + Grafana 是常用监控方案
  4. 合理的调优可以显著提升性能
  5. 容量规划要综合考虑多个因素

下一步

现在让我们创建一个 速查表 方便日常查阅。