Kafka Streams
Kafka Streams 是 Apache Kafka 内置的轻量级流处理库,用于构建实时流处理应用。它不是一个独立的处理框架,而是一个可以直接嵌入到任何 Java 应用程序中的客户端库。本章将详细介绍 Kafka Streams 的核心概念、DSL API 和实际应用。
Kafka Streams 概述
什么是流处理?
流处理(Stream Processing)是对持续产生的数据流进行实时处理的技术。与传统的批处理不同,流处理在数据到达时立即处理,而不是等待积累到一定量后再处理。
流处理具有以下核心特点:
- 低延迟:毫秒级处理延迟,数据到达即处理
- 事件驱动:以事件为处理单元,每个事件独立触发处理逻辑
- 状态维护:可以维护处理状态,支持复杂的聚合和连接操作
- 窗口计算:支持基于时间窗口的聚合计算
为什么选择 Kafka Streams?
Kafka Streams 相比其他流处理框架有独特的优势:
| 特性 | Kafka Streams | Apache Flink | Spark Streaming |
|---|---|---|---|
| 延迟 | 毫秒级 | 毫秒级 | 秒级(微批处理) |
| 部署模式 | 库(无需集群) | 需要集群 | 需要集群 |
| 状态管理 | 内置 RocksDB | 内置状态后端 | Checkpoint 机制 |
| Exactly-Once | 完全支持 | 完全支持 | 较弱支持 |
| 生态系统 | Kafka 原生 | 多种数据源 | Spark 生态 |
Kafka Streams 的核心优势包括:
- 轻量级:无需部署独立的处理集群,只是一个普通的 Java 库
- 低延迟:毫秒级处理延迟,适合实时性要求高的场景
- 精确一次语义:支持 Exactly-Once 处理语义,确保数据不丢失不重复
- 可扩展:支持并行处理,可以轻松横向扩展
- 原生集成:与 Kafka 安全特性、监控工具无缝集成
适用场景
Kafka Streams 适合以下场景:
- 实时数据分析:实时统计、监控告警
- 事件驱动应用:订单处理、支付流程
- 数据管道:ETL、数据同步
- 实时推荐:基于用户行为的实时推荐
快速开始
添加 Maven 依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.7.0</version>
</dependency>
Word Count 示例
这是最经典的流处理示例——词频统计:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(String[] args) {
// 1. 配置 Kafka Streams
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 2. 构建处理拓扑
StreamsBuilder builder = new StreamsBuilder();
// 从主题读取文本行
KStream<String, String> textLines = builder.stream("text-input");
// 处理流程:分割 -> 分组 -> 计数
KTable<String, Long> wordCounts = textLines
// 将每行文本分割成单词
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// 按单词分组(key 变成单词)
.groupBy((key, word) -> word)
// 统计每个单词的出现次数
.count(Materialized.as("wordcount-store"));
// 将结果写入输出主题
wordCounts.toStream().to("wordcount-output");
// 3. 启动应用
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
// 4. 注册关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
这个示例展示了 Kafka Streams 的核心编程模型:
- 配置:设置应用 ID、Kafka 集群地址、序列化器
- 构建拓扑:定义数据处理的 DAG(有向无环图)
- 启动应用:创建 KafkaStreams 实例并启动
- 优雅关闭:注册关闭钩子确保资源正确释放
核心概念
KStream —— 记录流
KStream 是记录流(Record Stream)的抽象,每个数据记录代表一个独立的事件。类比数据库表,KStream 中的记录始终被解释为 INSERT 操作——新记录不会替换相同键的旧记录。
例如,如果流中依次出现 (alice, 1) 和 (alice, 3) 两条记录,计算 alice 的值总和时结果是 4,因为两条记录都被保留并参与计算。
适用场景:信用卡交易、页面访问日志、服务器日志等事件数据。
KTable —— 变更日志流
KTable 是变更日志流(Changelog Stream)的抽象,每个数据记录代表一次状态更新。新记录会覆盖相同键的旧记录,类似数据库表的 UPSERT 操作。当值为 null 时,表示删除该键。
如果变更日志中出现 (alice, 1) 后又出现 (alice, 3),则 alice 的最终值是 3,因为后者覆盖了前者。
适用场景:用户资料表、商品库存表、配置表等状态数据。
KStream 与 KTable 的关系
KStream 和 KTable 可以相互转换,它们之间存在紧密的联系:
- KTable → KStream:通过
toStream()方法,将表的变更历史作为事件流输出 - KStream → KTable:通过聚合操作(如
groupBy().count()),将事件流聚合为状态表
从存储角度来看:
- KStream 对应的 Kafka 主题不应启用日志压缩(Log Compaction),否则会丢失历史事件
- KTable 对应的 Kafka 主题应该启用日志压缩,只保留每个键的最新值
GlobalKTable —— 全局表
GlobalKTable 与 KTable 类似,也是变更日志流的抽象,区别在于数据分布方式:
- KTable:每个应用实例只加载对应分区的数据
- GlobalKTable:每个应用实例加载所有分区的完整数据
GlobalKTable 的优势:
- 支持非键值关联(Foreign Key Join),无需重新分区
- 适合小规模维表的全局广播
GlobalKTable 的劣势:
- 本地存储占用更多(每个实例都有完整副本)
- 网络和 Broker 负载更高(每个实例都订阅全量数据)
Streams DSL —— 流处理领域语言
Kafka Streams DSL(Domain Specific Language)是构建在 Processor API 之上的高级 API,提供了声明式的流处理编程模型。大多数数据处理操作只需要几行代码即可完成。
无状态转换
无状态转换不需要维护处理状态,每条记录的处理结果只取决于当前记录本身。
filter —— 过滤
KStream<String, String> stream = builder.stream("input-topic");
// 只保留值长度大于 10 的记录
KStream<String, String> filtered = stream.filter(
(key, value) -> value.length() > 10
);
// filterNot:过滤掉满足条件的记录
KStream<String, String> filteredNot = stream.filterNot(
(key, value) -> value.startsWith("error")
);
map —— 映射转换
// map:可以同时修改键和值
KStream<String, String> mapped = stream.map(
(key, value) -> KeyValue.pair(key.toUpperCase(), value.toUpperCase())
);
// mapValues:只修改值,保持键不变
KStream<String, String> mappedValues = stream.mapValues(
value -> value.trim()
);
map 与 mapValues 的区别:
map可能导致数据重新分区(如果修改了键)mapValues不会触发重新分区,性能更好
flatMap —— 扁平化映射
// flatMap:一条记录产生多条记录
KStream<String, String> flatMapped = stream.flatMap(
(key, value) -> {
List<KeyValue<String, String>> result = new ArrayList<>();
for (String word : value.split(" ")) {
result.add(KeyValue.pair(word, word));
}
return result;
}
);
// flatMapValues:一条记录产生多个值,保持原键
KStream<String, String> flatMappedValues = stream.flatMapValues(
value -> Arrays.asList(value.split(","))
);
selectKey —— 重新选择键
// 从值中提取第一个字段作为新键
KStream<String, String> rekeyed = stream.selectKey(
(key, value) -> value.split(",")[0]
);
注意:修改键会触发重新分区,因为数据需要按新键重新分配到不同分区。
branch —— 分支分流
// 根据条件将流分成多个分支
KStream<String, String>[] branches = stream.branch(
(key, value) -> value.startsWith("error"), // 分支0:错误日志
(key, value) -> value.startsWith("warn"), // 分支1:警告日志
(key, value) -> true // 分支2:其他日志(默认分支)
);
branches[0].to("error-topic");
branches[1].to("warn-topic");
branches[2].to("info-topic");
merge —— 合并流
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
// 合并两个流
KStream<String, String> merged = stream1.merge(stream2);
注意:合并后的流不保证两条原始流之间的顺序,只保证每条流内部的相对顺序。
peek —— 窥视(调试用)
// 打印每条记录,但不修改流(用于调试)
KStream<String, String> peeked = stream.peek(
(key, value) -> System.out.println("Key: " + key + ", Value: " + value)
);
有状态转换
有状态转换需要维护处理状态,处理结果依赖于历史记录。Kafka Streams 内置了容错的状态存储,状态数据会被持久化到 Kafka 的内部主题中。
分组操作
聚合操作之前必须先分组,分组确保数据按键正确分区:
// groupByKey:按原键分组
KGroupedStream<String, String> groupedByKey = stream.groupByKey();
// groupBy:按键选择器分组(可能触发重新分区)
KGroupedStream<String, String> grouped = stream.groupBy(
(key, value) -> value.split(",")[0]
);
聚合操作
count —— 计数
KTable<String, Long> counts = stream
.groupBy((key, value) -> value)
.count(Materialized.as("counts-store"));
aggregate —— 自定义聚合
// 自定义聚合逻辑
KTable<String, Integer> aggregated = stream
.groupByKey()
.aggregate(
() -> 0, // 初始值
(key, value, aggregate) -> aggregate + value.length(), // 累加器
Materialized.as("aggregated-store")
);
reduce —— 归约
// 将相同键的值拼接起来
KTable<String, String> reduced = stream
.groupByKey()
.reduce(
(value1, value2) -> value1 + "," + value2
);
窗口操作
窗口操作将无限的数据流划分为有限的窗口,在窗口内进行聚合计算。
滚动窗口(Tumbling Window)
滚动窗口是固定大小、不重叠的窗口。
import org.apache.kafka.streams.kstream.TimeWindows;
import java.time.Duration;
KTable<Windowed<String>, Long> tumblingCounts = stream
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
滑动窗口(Hopping Window)
滑动窗口是固定大小、可重叠的窗口,按固定步长向前滑动。
KTable<Windowed<String>, Long> hoppingCounts = stream
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1)))
.count();
会话窗口(Session Window)
会话窗口根据数据的活动间隔动态划分窗口,适合用户会话分析。
import org.apache.kafka.streams.kstream.SessionWindows;
KTable<Windowed<String>, Long> sessionCounts = stream
.groupBy((key, value) -> value)
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(10)))
.count();
窗口类型对比
| 窗口类型 | 特点 | 适用场景 |
|---|---|---|
| 滚动窗口 | 固定大小、不重叠 | 定期统计(每小时订单数) |
| 滑动窗口 | 固定大小、可重叠 | 平滑统计(移动平均值) |
| 会话窗口 | 动态大小 | 用户行为分析(会话时长) |
连接操作
Kafka Streams 支持多种类型的连接操作:
KStream-KStream 连接(窗口连接)
KStream<String, String> orders = builder.stream("orders");
KStream<String, String> payments = builder.stream("payments");
// 窗口连接:7天内的订单和支付
KStream<String, String> joined = orders.join(
payments,
(orderValue, paymentValue) -> orderValue + " | " + paymentValue,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(7))
);
KStream-KTable 连接(非窗口连接)
// 流与表的连接:用表中的数据丰富流数据
KStream<String, String> clicks = builder.stream("clicks");
KTable<String, String> users = builder.table("users");
KStream<String, String> enriched = clicks.join(
users,
(clickValue, userValue) -> clickValue + " by " + userValue
);
KStream-GlobalKTable 连接(支持外键连接)
GlobalKTable<String, String> products = builder.globalTable("products");
// 使用 KeyValueMapper 从流记录中提取连接键
KStream<String, String> orders = builder.stream("orders");
KStream<String, String> enriched = orders.join(
products,
(orderKey, orderValue) -> extractProductId(orderValue), // 外键提取
(orderValue, productValue) -> orderValue + " | " + productValue
);
连接类型对比
| 连接类型 | 内连接 | 左连接 | 外连接 |
|---|---|---|---|
| KStream-KStream | 支持 | 支持 | 支持 |
| KTable-KTable | 支持 | 支持 | 支持 |
| KStream-KTable | 支持 | 支持 | 不支持 |
| KStream-GlobalKTable | 支持 | 支持 | 不支持 |
协同分区要求
对于 KStream-KStream、KTable-KTable、KStream-KTable 的等值连接,输入主题必须满足协同分区要求:
- 两侧主题的分区数必须相同
- 所有写入主题的应用必须使用相同的分区策略
GlobalKTable 连接不需要协同分区,因为它在每个实例上都有完整的数据副本。
状态管理
状态存储类型
Kafka Streams 使用状态存储来维护处理状态:
| 存储类型 | 用途 | 特点 |
|---|---|---|
| TimestampedKeyValueStore | 非窗口聚合、KTable | 按键值存储,带时间戳 |
| TimestampedWindowStore | 时间窗口聚合 | 按窗口和键存储 |
| SessionStore | 会话窗口聚合 | 按会话存储 |
状态存储配置
// 使用 RocksDB 作为状态存储(默认)
KTable<String, Long> counts = stream
.groupBy((key, value) -> value)
.count(Materialized.as("counts-store"));
// 使用内存存储(仅用于测试)
KTable<String, Long> inMemoryCounts = stream
.groupBy((key, value) -> value)
.count(Materialized.inMemory("in-memory-store"));
交互式查询
交互式查询允许从状态存储中读取当前状态:
// 获取状态存储
ReadOnlyKeyValueStore<String, Long> store = streams.store(
StoreQueryParameters.fromNameAndType(
"counts-store",
QueryableStoreTypes.keyValueStore()
)
);
// 查询单个键
Long count = store.get("word");
// 遍历所有键值
KeyValueIterator<String, Long> iterator = store.all();
while (iterator.hasNext()) {
KeyValue<String, Long> entry = iterator.next();
System.out.println(entry.key + ": " + entry.value);
}
容错与 Exactly-Once 语义
容错机制
Kafka Streams 通过以下机制实现容错:
- 状态检查点:定期将处理状态保存到 Kafka 内部主题
- 变更日志主题:所有状态变更都记录到 changelog topic
- 自动恢复:故障恢复时从检查点或变更日志重建状态
Exactly-Once 配置
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 启用 Exactly-Once V2(推荐)
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
Exactly-Once 语义确保:
- 每条消息只被处理一次
- 即使发生故障,也不会丢失或重复处理消息
- 状态存储与消息处理保持原子性
生产者配置要求
使用 Exactly-Once 时,需要配置事务:
# 生产者配置
enable.idempotence=true
transactional.id=your-transactional-id
# 消费者配置
isolation.level=read_committed
应用配置与优化
核心配置参数
Properties config = new Properties();
// 必需配置
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 序列化器
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
// 并行度(线程数)
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// 缓存大小(提高吞吐量)
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
1024 * 1024 * 100); // 100MB
// 提交间隔
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// Exactly-Once 语义
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
性能优化建议
- 合理设置分区数:分区数应大于等于应用实例数,确保并行度
- 使用缓存:增大缓存减少 RocksDB I/O 操作
- 批处理:适当增大 batch.size 提高吞吐量
- 状态存储优化:选择合适的存储后端和压缩算法
生产部署
# Kubernetes 部署示例
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-streams-app
spec:
replicas: 3
template:
spec:
containers:
- name: app
image: my-streams-app:latest
env:
- name: APPLICATION_ID
value: "my-streams-app"
- name: BOOTSTRAP_SERVERS
value: "kafka:9092"
- name: NUM_STREAM_THREADS
value: "2"
resources:
limits:
memory: "2Gi"
requests:
memory: "1Gi"
实战案例:实时订单分析
下面是一个完整的实时订单分析应用:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
public class OrderAnalyticsApplication {
public static void main(String[] args) {
// 配置
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
// 1. 订单流
KStream<String, String> orders = builder.stream("orders");
// 2. 商品维表(全局表)
GlobalKTable<String, String> products = builder.globalTable("products");
// 3. 订单金额统计(每5分钟滚动窗口)
KTable<Windowed<String>, Long> orderCounts = orders
.groupBy((key, value) -> extractRegion(value))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("order-counts-store"));
// 4. 高价值订单(金额 > 1000)
KStream<String, String> highValueOrders = orders
.filter((key, value) -> extractAmount(value) > 1000);
// 5. 订单与商品信息关联
KStream<String, String> enrichedOrders = orders.join(
products,
(orderKey, orderValue) -> extractProductId(orderValue),
(orderValue, productValue) -> orderValue + "|" + productValue
);
// 6. 输出结果
orderCounts.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value))
.to("order-counts-output");
highValueOrders.to("high-value-orders");
enrichedOrders.to("enriched-orders");
// 启动
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static String extractRegion(String orderValue) {
// 从订单 JSON 中提取地区
return orderValue.split(",")[2];
}
private static double extractAmount(String orderValue) {
// 从订单 JSON 中提取金额
return Double.parseDouble(orderValue.split(",")[1]);
}
private static String extractProductId(String orderValue) {
// 从订单 JSON 中提取商品 ID
return orderValue.split(",")[0];
}
}
小结
Kafka Streams 是构建实时流处理应用的强大工具:
- 轻量级:只是一个 Java 库,无需独立集群
- DSL API:提供声明式的流处理编程模型
- KStream/KTable:支持流和表两种数据抽象
- 状态管理:内置容错的状态存储
- Exactly-Once:支持精确一次处理语义
- 窗口操作:支持滚动、滑动、会话窗口
下一步
接下来让我们学习 Kafka Connect,了解如何将 Kafka 与外部数据系统集成。