跳到主要内容

Kafka 监控与运维

本章将详细介绍 Kafka 的监控指标、性能调优和运维实践。

监控概述

为什么要监控 Kafka?

JMX 监控

启用 JMX

# 方式1:环境变量
export JMX_PORT=9999
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

# 方式2:启动脚本修改 bin/kafka-server-start.sh

# 添加
export JMX_PORT=9999

JMX 指标

# 使用 jmxterm 查看指标
java -jar jmxterm-1.0.0.jar -l localhost:9999

# 查看 Broker 指标
> bean kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
> bean kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
> bean kafka.server:type=BrokerTopicMetrics,name=RequestsPerSec

关键监控指标

Broker 指标

指标名称说明正常范围
MessagesInPerSec每秒消息数根据业务
BytesInPerSec每秒入站字节< 网络带宽
BytesOutPerSec每秒出站字节< 网络带宽
RequestsPerSec每秒请求数< 负载
UnderReplicatedPartitions复制不足分区0
OfflinePartitionsCount离线分区数0
ActiveControllerCount活跃控制器数1
LeaderElectionRateLeader 选举频率接近 0

Producer 指标

指标名称说明正常范围
record-send-rate发送速率符合预期
request-latency-avg平均延迟< 100ms
record-queue-time-avg排队时间< 10ms
error-rate错误率接近 0
retry-rate重试率< 5%

Consumer 指标

指标名称说明正常范围
records-consumed-rate消费速率符合预期
fetch-rate拉取频率正常
commit-latency-avg提交延迟< 10ms
records-lag-max最大消费延迟接近 0
fetch-latency-avg拉取延迟< 100ms

Consumer Lag

Lag = Producer Offset - Consumer Offset = 12 - 4 = 8

监控 Lag 很重要

  • Lag 过大说明消费者处理不及时
  • 可能需要增加消费者数量
  • 可能需要优化处理逻辑

查看 Lag

# 使用 kafka-consumer-groups 命令
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe

# 输出示例
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
my-group orders 0 1000 1200 200
my-group orders 1 800 1000 200
my-group orders 2 950 1000 50

Prometheus + Grafana

配置 JMX Exporter

# docker-compose.yml
services:
jmx-exporter:
image: bitnami/jmx-exporter:latest
ports:
- "9404:9404"
volumes:
- ./jmx-config.yml:/config.yml
environment:
- JAVA_OPTS=-Xmx128M -XX:+UseG1GC -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9010 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false

JMX 配置 jmx-config.yml:

---
lowercaseOutputName: true
lowercaseOutputLabelNames: true

rules:
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=(.*)><>Count'
name: kafka_server_broker_topic_metrics_$1_total
type: GAUGE

- pattern: 'kafka.server<type=BrokerTopicMetrics, name=(.*)><>OneMinuteRate'
name: kafka_server_broker_topic_metrics_$1_rate
type: GAUGE

- pattern: 'kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.*)><>(records-lag-max|records-consumed-rate|fetch-latency-avg)'
name: kafka_consumer_$2
labels:
client_id: "$1"
type: GAUGE

Grafana Dashboard

{
"dashboard": {
"title": "Kafka Overview",
"panels": [
{
"title": "Messages In Per Sec",
"targets": [
{
"expr": "sum(rate(kafka_server_broker_topic_metrics_messages_in_total[5m]))"
}
]
},
{
"title": "Consumer Lag",
"targets": [
{
"expr": "sum(kafka_consumer_records_lag_max)"
}
]
}
]
}
}

性能调优

性能调优是 Kafka 运维的核心工作之一。本节将从 Broker、Producer、Consumer 三个维度详细介绍调优策略。

Broker 调优

Broker 调优需要从硬件选择、JVM 配置、操作系统参数和 Kafka 配置四个层面考虑。

硬件选择

Kafka 是 I/O 密集型应用,硬件选择对性能影响显著:

硬件推荐配置说明
磁盘SSD 或 NVMe顺序写入性能好,避免机械硬盘
内存32GB+大部分用于 Page Cache
CPU多核压缩、加密计算密集
网络万兆网卡高吞吐场景必备

JVM 配置

# 推荐的 JVM 配置
export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"

# G1 垃圾收集器(JDK 11+ 推荐)
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent \
-XX:MaxInlineSize=8"

# 启动 Kafka
bin/kafka-server-start.sh config/kraft/server.properties

JVM 调优要点

  • 堆内存大小:不要超过 6GB,过大反而导致 GC 问题
  • 使用 G1 GC:适合大堆内存,控制停顿时间
  • 避免 CMS:JDK 11+ 推荐使用 G1

操作系统调优

# 增加文件描述符限制
ulimit -n 1000000

# 优化网络参数
sysctl -w net.core.somaxconn=65535
sysctl -w net.ipv4.tcp_max_syn_backlog=65535
sysctl -w net.core.netdev_max_backlog=65535

# 禁用 Swap(重要)
sysctl -w vm.swappiness=0

# 调整脏页比例
sysctl -w vm.dirty_ratio=80
sysctl -w vm.dirty_background_ratio=5

# 文件系统(推荐 XFS)
# 挂载选项:noatime,nodiratime

Broker 配置调优

# 网络和线程配置
# num.network.threads: 处理网络请求的线程数
# 建议设置为 CPU 核数
num.network.threads=8

# num.io.threads: 处理磁盘 I/O 的线程数
# 建议设置为磁盘数的 2 倍
num.io.threads=16

# Socket 缓冲区大小
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600

# 日志配置
# 日志段大小,影响索引粒度和文件句柄数
log.segment.bytes=1073741824

# 日志保留时间
log.retention.hours=168

# 日志刷新间隔(通常依赖 OS Page Cache)
# log.flush.interval.messages=10000
# log.flush.interval.ms=1000

# 后台任务线程数
background.threads=10

# 副本 Fetcher 线程数
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500

# 日志目录(多磁盘分散 I/O)
log.dirs=/disk1/kafka-logs,/disk2/kafka-logs,/disk3/kafka-logs

Producer 调优

Producer 配置直接影响消息发送的吞吐量和延迟。

高吞吐配置

适用于日志收集、大数据分析等场景:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");

// 批量发送配置
// 增大批次大小,减少请求次数
props.put("batch.size", 65536); // 64KB

// 增加等待时间,积累更多消息
props.put("linger.ms", 20);

// 增大缓冲区
props.put("buffer.memory", 67108864); // 64MB

// 启用压缩(推荐 lz4 或 zstd)
props.put("compression.type", "zstd");

// 降低确认级别(牺牲可靠性换性能)
props.put("acks", "1");

// 并发请求数
props.put("max.in.flight.requests.per.connection", 10);

低延迟配置

适用于实时交易、在线业务等场景:

Properties props = new Properties();

// 立即发送,不等待
props.put("linger.ms", 0);

// 小批次
props.put("batch.size", 16384);

// 单次确认
props.put("acks", "1");

// 减少缓冲区
props.put("buffer.memory", 33554432); // 32MB

高可靠配置

适用于金融交易、订单处理等场景:

Properties props = new Properties();

// 所有副本确认
props.put("acks", "all");

// 启用幂等性
props.put("enable.idempotence", "true");

// 无限重试
props.put("retries", Integer.MAX_VALUE);

// 限制并发请求数(保证顺序)
props.put("max.in.flight.requests.per.connection", 5);

// 使用压缩减少数据量
props.put("compression.type", "lz4");

压缩算法选择

压缩算法CPU 消耗压缩率适用场景
none0%网络带宽充足
gzipCPU 充足、带宽受限
snappy中等平衡 CPU 和带宽
lz4很低中等推荐默认选择
zstd中等Kafka 2.1+ 推荐

Consumer 调优

Consumer 配置影响消费能力和 Rebalance 行为。

高吞吐配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");

// 增大每次拉取数量
props.put("max.poll.records", 1000);

// 增大拉取字节数
props.put("fetch.max.bytes", 104857600); // 100MB
props.put("max.partition.fetch.bytes", 1048576);

// 等待更多数据
props.put("fetch.min.bytes", 102400); // 100KB
props.put("fetch.max.wait.ms", 1000);

// 禁用自动提交
props.put("enable.auto.commit", "false");

减少 Rebalance

Rebalance 是 Consumer Group 重新分配分区的过程,会导致消费暂停:

// 增大会话超时时间
props.put("session.timeout.ms", 60000);

// 心跳间隔(建议 < session.timeout.ms / 3)
props.put("heartbeat.interval.ms", 20000);

// 增大最大拉取间隔(两次 poll 之间的最大间隔)
props.put("max.poll.interval.ms", 600000);

// 减少每次拉取数量,加快处理
props.put("max.poll.records", 100);

// 使用协作粘性分配器(Kafka 2.4+)
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

Rebalance 优化策略

  1. 确保处理时间 < max.poll.interval.ms
  2. 使用多线程处理:主线程只负责 poll,业务处理交给工作线程
  3. 监控 Rebalance 事件:通过 ConsumerRebalanceListener 监听

多线程消费模式

public class MultiThreadConsumer {
private final KafkaConsumer<String, String> consumer;
private final ExecutorService executor;
private final Map<TopicPartition, WorkerThread> workers = new ConcurrentHashMap<>();

public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
WorkerThread worker = workers.computeIfAbsent(partition,
p -> new WorkerThread(p, this::commitOffset));
worker.submit(partitionRecords);
}
}
} finally {
executor.shutdown();
consumer.close();
}
}

private void commitOffset(TopicPartition partition, long offset) {
// 异步提交特定分区的 offset
consumer.commitAsync(
Collections.singletonMap(partition, new OffsetAndMetadata(offset)),
null
);
}
}

容量规划

分区数量计算

分区数 = max(目标吞吐量 / 单分区吞吐量, 消费者数量)

示例:
- 目标吞吐量:100 MB/s
- 单分区吞吐量:10 MB/s
- 消费者数量:8

分区数 = max(100 / 10, 8) = max(10, 8) = 10

存储容量计算

总存储 = 每日消息量 × 保留天数 × 副本数 × 压缩比 × 安全系数

示例:
- 每日消息量:100 GB
- 保留天数:7
- 副本数:3
- 压缩比:0.5(50% 压缩)
- 安全系数:1.2

总存储 = 100 × 7 × 3 × 0.5 × 1.2 = 1260 GB

Broker 数量计算

Broker 数 = ceil(总存储 / 单 Broker 存储, 目标吞吐 / 单 Broker 吞吐)

考虑因素:
- 每个 Broker 的磁盘容量
- 每个 Broker 的网络带宽
- 容错需求(N-1 个 Broker 故障)

日志管理

Broker 日志

# 配置日志目录
log.dirs=/var/lib/kafka/logs

# 日志级别
# config/log4j.properties
log4j.rootLogger=INFO, stdout

# 控制器日志
log4j.logger.kafka.controller=DEBUG

# 日志清理
log.retention.hours=168
log.retention.bytes=10737418240
log.segment.bytes=1073741824

日志分析

# 分析日志大小
du -sh /var/lib/kafka/logs/*

# 清理旧日志
kafka-log-delete-script.sh --delete \
--bootstrap-server localhost:9092 \
--topic my-topic \
--older-than 604800000

运维命令

日常运维

# 查看主题列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看消费者组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 查看主题详情
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

# 查看消费进度
kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092

# 均衡 Leader
kafka-leader-election.sh --all-topics --election-type preferred --bootstrap-server localhost:9092

故障恢复

# 1. 检查 Broker 状态
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# 2. 检查分区状态
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

# 3. 修复 UnderReplicated 分区
# 检查网络和磁盘
# 重启问题 Broker

# 4. 手动选举 Leader
kafka-leader-election.sh --topic my-topic --partition 0 --election-type unclean --bootstrap-server localhost:9092

# 5. 清理 ISR
# 减少 min.insync.replicas 临时恢复

告警规则

关键告警

告警项条件级别
Broker Down任何 Broker 不可用Critical
UnderReplicatedUnderReplicated > 0Warning
Offline PartitionOfflinePartition > 0Critical
Consumer LagLag > 10000Warning
Disk UsageDisk > 80%Warning
Controller Change频繁切换Warning

容量规划

计算公式

分区数 = max(目标吞吐量 / 单分区吞吐量, 消费者数)

Broker数 = ceil(每日数据量 * 保留天数 / 单Broker存储)

副本数 = 业务重要性决定(2-3)

示例

目标:
- 吞吐量: 100,000 msg/s
- 单分区吞吐: 10,000 msg/s
- 消费者: 10
- 数据保留: 7 天
- 消息大小: 1 KB
- 副本数: 3

计算:
- 分区数 = max(100000/10000, 10) = max(10, 10) = 10
- 每日数据量 = 100,000 * 86400 * 1KB = 8.64 GB
- 总存储 = 8.64 * 7 * 3 = 181 GB
- Broker数 = ceil(181 / 500GB) = 1

小结

  1. 监控 Kafka 需要关注多个层面的指标
  2. Consumer Lag 是关键性能指标
  3. Prometheus + Grafana 是常用监控方案
  4. 合理的调优可以显著提升性能
  5. 容量规划要综合考虑多个因素

下一步

现在让我们创建一个 速查表 方便日常查阅。