跳到主要内容

Kafka Streams

Kafka Streams 是 Kafka 内置的轻量级流处理库,用于构建实时流处理应用。本章将详细介绍 Kafka Streams 的概念、API 和使用方法。

Kafka Streams 概述

什么是流处理?

流处理(Stream Processing)是对持续产生的数据流进行实时处理的技术。

┌─────────────────────────────────────────────────────────────┐
│ 传统 vs 流处理 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 传统处理(批处理): │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 数据 │────▶│ 存储 │────▶│ 处理 │ │
│ │ 收集 │ │ (DB) │ │ (定时) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 流处理: │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 数据 │────▶│ Kafka │────▶│ Streams │────▶结果 │
│ │ 实时 │ │ Topic │ │ 实时 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 特点: │
│ - 低延迟(毫秒级) │
│ - 事件驱动 │
│ - 状态维护 │
│ - 窗口计算 │
│ │
└─────────────────────────────────────────────────────────────┘

为什么使用 Kafka Streams?

  1. 轻量级:无需额外集群,纯库形式
  2. 低延迟:毫秒级处理延迟
  3. 精确一次:支持 Exactly-Once 语义
  4. 可扩展:支持并行处理
  5. 集成 Kafka:原生集成 Kafka 安全特性

与其他流处理框架对比

特性Kafka StreamsFlinkSpark Streaming
延迟毫秒级毫秒级秒级
部署库(无需集群)需要集群需要集群
状态管理内置内置Checkpoint
Exactly-Once支持支持较弱
生态Kafka only丰富Spark 生态

快速开始

添加 Maven 依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.7.0</version>
</dependency>

最简单的示例

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Printed;

import java.util.Properties;

public class WordCountApplication {

public static void main(String[] args) {
// 1. 配置
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// 2. 构建拓扑
StreamsBuilder builder = new StreamsBuilder();

// 源:读取输入主题
KStream<String, String> source = builder.stream("text-input");

// 处理:Word Count
KTable<String, Long> wordCounts = source
// 分割单词
.flatMapValues(value -> java.util.Arrays.asList(value.toLowerCase().split("\\W+")))
// 按单词分组
.groupBy((key, word) -> word)
// 计数
.count(Materialized.as("wordcount-store"));

// 输出:写入输出主题
wordCounts.toStream().to("wordcount-output");

// 3. 创建流应用
KafkaStreams streams = new KafkaStreams(builder.build(), config);

// 4. 启动
streams.start();

// 5. 关闭
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

核心概念

流(Stream)

流是无边界的、持续的、分类的事件序列。

┌─────────────────────────────────────────────────────────────┐
│ 流 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 时间 ─────────────────────────────────────────────────▶ │
│ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Event1│ │Event2│ │Event3│ │Event4│ │Event5│ │
│ │ T=1 │ │ T=2 │ │ T=3 │ │ T=4 │ │ T=5 │ │
│ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ │
│ │
│ 流的特点: │
│ - 不可变 │
│ - 有序(时间顺序) │
│ - 可重放 │
│ - 容错 │
│ │
└─────────────────────────────────────────────────────────────┘

表(Table)

表是流在某一时刻的快照,类似于数据库表。

┌─────────────────────────────────────────────────────────────┐
│ 表 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 流(事件序列): │
│ [Alice, 10:00] -> [Bob, 10:01] -> [Alice, 10:02] │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 表(快照): │
│ ┌─────────────────────────────────────────┐ │
│ │ Key │ Value │ 时间戳 │ │
│ ├─────────┼───────────┼──────────────────│ │
│ │ Alice │ ... │ 10:02 │ │
│ │ Bob │ ... │ 10:01 │ │
│ └─────────────────────────────────────────┘ │
│ │
│ 流 -> 表:聚合结果 │
│ 表 -> 流:变更日志 │
│ │
└─────────────────────────────────────────────────────────────┘

DStream vs KStream

概念说明
KStream抽象的记录流,可以遍历多次
KTable抽象的表,键唯一,新值覆盖旧值
GlobalKTable全局广播的表,每个实例有完整数据

KStream API

转换操作

KStream<String, String> source = builder.stream("input-topic");

// 1. map - 一对一转换
KStream<String, String> mapped = source.map(
(key, value) -> KeyValue.pair(key.toUpperCase(), value.toUpperCase())
);

// 2. flatMap - 一对多转换
KStream<String, String> flatMapped = source.flatMap(
(key, value) -> {
List<KeyValue<String, String>> result = new ArrayList<>();
for (String word : value.split(" ")) {
result.add(KeyValue.pair(word, word));
}
return result;
}
);

// 3. filter - 过滤
KStream<String, String> filtered = source.filter(
(key, value) -> value.length() > 10
);

// 4. selectKey - 重新设置键
KStream<String, String> rekeyed = source.selectKey(
(key, value) -> value.split(",")[0]
);

// 5. peek - 副作用操作(调试)
source.peek((key, value) -> System.out.println("Key: " + key + ", Value: " + value));

合并和分支

// 合并两个流
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");

KStream<String, String> merged = stream1.merge(stream2);

// 分支(根据条件拆分)
KStream<String, String>[] branches = source.branch(
(key, value) -> value.startsWith("error"), // 条件1
(key, value) -> value.startsWith("warn"), // 条件2
(key, value) -> true // 默认
);

branches[0].to("error-topic"); // 错误日志
branches[1].to("warn-topic"); // 警告日志
branches[2].to("info-topic"); // 信息日志

聚合操作

count

// 统计每条消息
KStream<String, String> source = builder.stream("clicks");

KTable<String, Long> clickCounts = source
.groupBy((key, value) -> value) // 按值分组
.count(); // 计数

// 输出到主题
clickCounts.toStream().to("click-counts-topic");

sum 和 avg

// 转换为数值类型
KStream<String, Long> numbers = source
.map((k, v) -> KeyValue.pair(k, Long.parseLong(v)));

// 求和
KTable<String, Long> sum = numbers
.groupByKey()
.sum();

// 平均值(需要先 sum 再 count 相除)
KTable<String, Double> average = numbers
.groupByKey()
.aggregate(
() -> new Agg(0L, 0), // 初始化
(key, value, agg) -> new Agg(agg.sum + value, agg.count + 1),
Materialized.as("avg-store")
)
.mapValues(agg -> (double) agg.sum / agg.count);

reduce

// 滚动聚合
KTable<String, String> aggregated = source
.groupByKey()
.reduce(
(value1, value2) -> value1 + ":" + value2 // 合并值
);

窗口计算

时间窗口

import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

// 滚动窗口(Tumbling Window)
KTable<Windowed<String>, Long> tumblingWindow = source
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5))) // 5分钟窗口
.count();

// 滑动窗口(Hopping Window)
KTable<Windowed<String>, Long> hoppingWindow = source
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1))) // 滑动步长 1 分钟
.count();

// 会话窗口(Session Window)
KTable<Windowed<String>, Long> sessionWindow = source
.groupBy((key, value) -> value)
.windowedBy(SessionWindows.with(Duration.ofMinutes(10))) // 会话间隔 10 分钟
.count();

窗口类型对比

窗口类型说明特点
滚动窗口固定大小,不重叠每条记录属于一个窗口
滑动窗口固定大小,可重叠粒度更细
会话窗口动态大小按活动会话分组

窗口联结

// 两个流的窗口联结
KStream<String, String> orders = builder.stream("orders");
KStream<String, String> shipments = builder.stream("shipments");

// 窗口联结
KStream<String, String> joined = orders
.join(
shipments,
(orderValue, shipmentValue) -> orderValue + " <-> " + shipmentValue,
JoinWindows.of(Duration.ofDays(7)) // 7 天内
);

状态管理

状态存储

// 使用状态存储
KTable<String, Long> aggregated = source
.groupBy((key, value) -> value)
.aggregate(
() -> 0L, // 初始值
(key, value, agg) -> agg + 1,
Materialized.as("count-store") // 指定状态存储名
);

// 状态存储类型
// Materialized.as("store-name") // 默认 RocksDB
// Materialized.as("store-name", Serdes, Serdes) // 指定序列化
// Materialized.inMemory() // 内存存储(测试用)

交互查询

// 获取当前状态
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameWithType("count-store", QueryableStoreTypes.keyValueStore())
);

Long count = store.get("some-key");

// 全局状态查询
GlobalKTable<String, String> globalTable = builder.globalTable(
"product-info-topic",
Materialized.as("global-store")
);

错误处理和容错

容错机制

┌─────────────────────────────────────────────────────────────┐
│ Kafka Streams 容错 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 检查点(Checkpointing) │
│ - 定期保存处理状态到 Kafka │
│ - 故障恢复时从检查点恢复 │
│ │
│ 2. 事务支持 │
│ - Exactly-Once 处理 │
│ - 源和输出都在同一事务中 │
│ │
│ 3. 重试 │
│ - 可配置的失败重试 │
│ │
│ 4. 日志摘要 │
│ - Changelog Topic 保存状态变更 │
│ - 故障恢复时重放 │
│ │
└─────────────────────────────────────────────────────────────┘

Exactly-Once 配置

// 启用 Exactly-Once
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);

// 或者使用 Exactly-Once(较旧版本)
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);

错误处理

// 处理异常
KStream<String, String> processed = source
.mapValues(value -> {
try {
return processSafely(value);
} catch (Exception e) {
// 记录错误或发送到 DLQ
return null;
}
})
.filter((k, v) -> v != null);

最佳实践

配置优化

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");

// 并行度
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

// 缓存
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 1024 * 100); // 100MB

// 提交间隔
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

性能优化

  1. 合理设置分区数:与应用实例数匹配
  2. 使用缓存:减少 RocksDB I/O
  3. 批处理:合理设置 batch size
  4. 状态存储优化:选择合适的存储后端

生产部署

# Kubernetes 部署示例
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-streams-app
spec:
replicas: 3
template:
spec:
containers:
- name: app
env:
- name: APPLICATION_ID
value: "my-streams-app"
- name: BOOTSTRAP_SERVERS
value: "kafka:9092"

示例:实时词频统计

public class WordCountExample {

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

// 读取输入
KStream<String, String> textLines = builder.stream("text-lines");

// 处理
KTable<String, Long> wordCounts = textLines
.flatMapValues(text -> Arrays.asList(text.toLowerCase().split("\\s+")))
.filter((key, word) -> !word.isEmpty())
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
.count(Materialized.as("word-counts-store"));

// 输出
wordCounts.toStream()
.filter((word, count) -> count > 10) // 只输出高频词
.to("wordcounts-output", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

小结

  1. Kafka Streams 是轻量级的流处理库
  2. KStream 表示无界流,KTable 表示有界表
  3. 支持丰富的转换聚合窗口操作
  4. 内置状态管理,支持 Exactly-Once
  5. 适合构建低延迟实时应用

下一步

接下来让我们学习 Kafka Connect 数据集成。