Flink 流处理引擎
Apache Flink 是一个分布式流处理引擎,提供高吞吐、低延迟、精确一次语义的流处理能力。与 Spark 的微批处理不同,Flink 是真正的流处理引擎,每个事件到来时立即处理。
Flink 概述
什么是 Flink?
Flink 是 Apache 顶级项目,最初由德国柏林工业大学的研究项目 Stratosphere 发展而来。Flink 的核心设计理念是"流优先",将批处理视为流处理的特殊情况——有界流。
核心特点:
- 真正的流处理:事件级别处理,毫秒级延迟
- 精确一次语义:保证数据不丢失、不重复
- 状态管理:支持有状态计算,状态可持久化
- 事件时间:支持基于事件时间的处理,正确处理乱序数据
- 高可用:支持故障恢复和状态回滚
Flink vs Spark Streaming
| 维度 | Spark Streaming | Flink |
|---|---|---|
| 处理模式 | 微批处理(Micro-batch) | 事件级流处理 |
| 延迟 | 秒级 | 毫秒级 |
| 时间语义 | 处理时间为主 | 事件时间为主 |
| 状态管理 | 有限支持 | 完善的状态管理 |
| 精确一次 | 需要额外配置 | 原生支持 |
| 窗口操作 | 相对简单 | 非常丰富 |
| 迭代计算 | 支持有限 | 原生支持迭代 |
为什么选择 Flink?
- 低延迟要求:需要毫秒级响应的场景
- 精确一次语义:金融交易、支付等不能丢失或重复的场景
- 复杂事件处理:需要基于事件时间进行复杂模式匹配
- 大规模状态:需要维护大量状态数据的场景
Flink 应用场景
| 场景 | 说明 |
|---|---|
| 实时 ETL | 数据清洗、转换、同步 |
| 实时监控 | 系统监控、业务监控、异常检测 |
| 实时推荐 | 个性化推荐、广告投放 |
| 实时风控 | 欺诈检测、异常识别 |
| 实时报表 | 实时大屏、指标统计 |
| CEP 复杂事件 | 模式匹配、事件序列检测 |
Flink 版本演进
| 版本 | 发布时间 | 重要特性 |
|---|---|---|
| Flink 1.0 | 2016 | DataStream API 稳定版 |
| Flink 1.2 | 2017 | Side Outputs、Process Function |
| Flink 1.5 | 2018 | Flip-6 架构重构、新部署模型 |
| Flink 1.9 | 2019 | Blink 合并、Hive 集成 |
| Flink 1.11 | 2020 | CDC 支持、PyFlink 增强 |
| Flink 1.13 | 2021 | 状态 TTL、内存模型优化 |
| Flink 1.15 | 2022 | SQL Gateway、Pulsar 连接器 |
| Flink 1.18 | 2023 | Adaptive Scheduler、批执行模式改进 |
| Flink 2.0 | 2025 | 解耦状态管理、异步状态 API、DataStream V2 |
Flink 2.0 重要更新:
Flink 2.0 是自 Flink 1.0 发布九年来的首个主要版本,包含 25 个 FLIP 和 369 个问题修复,标志着 Flink 进入云原生和 AI 时代的新篇章:
1. 解耦状态管理架构(Disaggregated State Management)
这是 Flink 2.0 最核心的架构变革,专为云原生环境设计:
- ForSt 状态后端:全新的解耦状态后端,将状态存储从计算资源分离,使用分布式文件系统(DFS)作为主要存储介质
- 解决云原生挑战:
- 解决容器化环境中本地磁盘受限的问题
- 消除 Compaction 造成的资源使用峰值
- 支持 TB 级状态作业的快速扩缩容
- 实现轻量快速的检查点
2. 异步执行模型与异步状态 API
异步执行模型是 Flink 2.0 的核心创新之一,它将状态访问与计算解耦,实现非阻塞的状态操作。这对于使用 ForSt 等远程状态后端的场景尤为重要,可以显著提升吞吐量。
// 方式一:SQL 作业启用异步状态处理
// 在 Table 环境中设置配置
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.getConfig().set("table.exec.async-state.enabled", "true");
// 方式二:通过配置文件启用(config.yaml)
// table.exec.async-state.enabled: true
// 启用后,Flink 会自动将支持异步状态访问的 SQL 算子切换到异步模式
// 包括:Group Aggregation、Window Aggregation、Join 等有状态算子
异步状态 API 的核心特性:
- 乱序记录处理:解耦状态访问与计算,实现并行执行
- 非阻塞检查点:检查点期间支持非阻塞状态操作,降低延迟
- 语义保证:保持水位线传播、定时器处理、键顺序等核心保证
- 透明启用:对于 SQL 作业,启用后无需修改代码即可获得性能提升
支持的异步 SQL 算子:
| 算子类型 | 是否支持异步 | 说明 |
|---|---|---|
| Group Aggregation | 是 | 分组聚合 |
| Window Aggregation | 是 | 窗口聚合 |
| Interval Join | 是 | 时间区间 Join |
| Temporal Join | 是 | 维表 Join |
| Deduplication | 是 | 去重 |
| Top-N | 是 | Top-N 查询 |
性能提升:在 Nexmark 基准测试中,11 个有状态查询完全兼容异步执行模型,对于重 I/O 的状态查询,吞吐量可提升 75%-120%
3. 物化表(Materialized Tables)
物化表是流批统一处理的核心,让用户通过单一管道管理实时和历史数据:
- 查询修改支持:支持 Schema 和查询更新,无需重新处理历史数据
- Kubernetes/YARN 提交:原生支持向生产集群提交刷新作业
- Paimon 集成:与 Apache Paimon 深度集成,提供高性能 ACID 事务
4. 自适应批执行增强
- 自适应广播 Join:运行时动态判断是否适合广播 Join
- 自动 Join 倾斜优化:动态拆分倾斜分区,消除长尾延迟
- 性能提升:TPC-DS 10TB 基准测试中性能提升 8-16%
5. AI 集成
- Flink CDC 3.3:支持在 Transform 表达式中动态调用 AI 模型(OpenAI 等)
- Flink SQL AI 语法:支持在 SQL 中定义和调用 AI 模型
6. DataStream V2 API(实验性)
全新的 DataStream API,解决原 API 的设计问题:
- 提供底层构建块:DataStream、ProcessFunction、Partitioning
- 提供高级扩展:Window、Join 等
- 目前处于实验阶段,不建议生产使用
7. 破坏性变更(升级注意)
Flink 2.0 移除了多个已弃用的 API:
| 移除的 API | 迁移方案 |
|---|---|
| DataSet API | 迁移到 DataStream API 或 Table API/SQL |
| Scala DataStream API | 迁移到 Java DataStream API |
| SourceFunction/SinkFunction | 迁移到 Source/Sink V2 |
| TableSource/TableSink | 迁移到 DynamicTableSource/DynamicTableSink |
8. 序列化改进
Flink 2.0 对序列化机制进行了重大优化:
- 高效集合序列化器:为 Map、List、Set 等集合类型引入了更高效的内置序列化器,默认启用,显著提升性能
- Kryo 升级到 5.6:更快的序列化速度,更好的内存效率,对新版本 Java 有更好的支持
// 集合类型序列化已自动优化,无需额外配置
// 对于自定义类型,仍可使用 Kryo 序列化
env.getConfig().enableForceKryo();
env.getConfig().registerKryoType(MyCustomClass.class);
运行环境要求:
- 最低 Java 版本:Java 11(不再支持 Java 8)
- 推荐 Java 版本:Java 17(默认)
- 支持 Java 21
Docker 镜像默认使用 Java 17,从源码构建时也默认使用 Java 17。
Flink 架构
系统架构
Flink 采用主从架构,主要组件包括:
JobManager
JobManager 是 Flink 的主节点,负责协调分布式执行。它包含以下组件:
- Dispatcher:接收作业提交,负责启动新的 JobMaster
- ResourceManager:负责资源管理,申请和释放 TaskManager
- JobMaster:管理单个作业的执行,负责调度、检查点协调、故障恢复
JobManager 的核心职责:
- 作业调度:将作业分解为任务并调度执行
- 资源管理:向 ResourceManager 申请资源
- 检查点协调:协调检查点的生成
- 故障恢复:处理任务失败和恢复
TaskManager
TaskManager 是 Flink 的工作节点,负责执行具体的计算任务:
- 任务执行:执行具体的计算任务
- 数据交换:与其他 TaskManager 交换数据
- 状态存储:存储和管理状态数据
- 资源汇报:向 JobManager 汇报状态
每个 TaskManager 包含多个 Slot,每个 Slot 可以执行一个任务链。
ResourceManager
ResourceManager 负责资源管理:
- 资源分配:为作业分配 TaskManager 资源
- 资源回收:回收空闲资源
- 资源适配:支持 YARN、Kubernetes、Mesos 等资源管理器
并行度与槽位
并行度(Parallelism)
并行度决定了一个算子同时有多少个任务实例执行:
// 设置全局并行度
env.setParallelism(4);
// 设置单个算子并行度
dataStream.map(...).setParallelism(2);
// 设置默认并行度(配置文件)
// parallelism.default: 4
并行度选择建议:
- 通常设置为 TaskManager 总 Slot 数的倍数
- 每个 Slot 处理的数据量应适中,避免过大导致内存问题
- 考虑数据倾斜情况,适当增加并行度
槽位(Slot)
Slot 是 TaskManager 的资源单位,每个 Slot 可以执行一个任务链:
- 资源隔离:不同 Slot 之间内存隔离
- 任务共享:同一作业的任务可以共享 Slot(Slot Sharing Group)
TaskManager(4 Slots)
├── Slot 1: Source → Map → Filter
├── Slot 2: Source → Map → Filter
├── Slot 3: KeyBy → Window → Sink
└── Slot 4: KeyBy → Window → Sink
Slot 共享的优势:
- 减少数据传输开销(同一 Slot 内的任务可以直接传递数据)
- 提高资源利用率
作业执行流程
Flink 作业的执行流程分为几个阶段:
用户代码 → StreamGraph → JobGraph → ExecutionGraph → 物理执行
- StreamGraph:逻辑执行计划,表示数据流的拓扑结构
- JobGraph:优化后的执行计划,将多个算子链接成算子链
- ExecutionGraph:并行化的执行计划,包含任务的并行实例
- 物理执行:在 TaskManager 上实际执行
算子链(Operator Chain):
Flink 会尽可能将多个算子链接在一起,减少线程切换和网络传输:
// 禁用算子链
stream.map(...).disableChaining();
// 开始新的算子链
stream.map(...).startNewChain();
DataStream API
DataStream API 是 Flink 流处理的核心 API,提供了丰富的算子操作。
程序结构
一个完整的 Flink 程序包含以下结构:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
public class FlinkJob {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 配置环境(可选)
env.setParallelism(4);
env.enableCheckpointing(60000);
// 3. 添加数据源
DataStream<String> source = env.socketTextStream("localhost", 9999);
// 4. 数据转换
DataStream<String> transformed = source
.flatMap(new MyFlatMapFunction())
.keyBy(value -> value.f0)
.sum(1);
// 5. 输出结果
transformed.print();
// 6. 执行程序
env.execute("My Flink Job");
}
}
重要提示:Flink 程序是懒执行的,只有调用 execute() 方法时才会真正执行。
数据源(Source)
Flink 支持多种内置数据源,也可以自定义数据源。
内置数据源
// 从集合读取
DataStream<String> fromCollection = env.fromCollection(
Arrays.asList("a", "b", "c")
);
// 从元素创建
DataStream<String> fromElements = env.fromElements("a", "b", "c");
// 从文件读取
DataStream<String> fromFile = env.readTextFile("/path/to/file");
// 从 Socket 读取
DataStream<String> fromSocket = env.socketTextStream("localhost", 9999);
Kafka 连接器
Kafka 是最常用的消息队列,Flink 提供了完善的 Kafka 连接器:
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
// 创建 Kafka Source(推荐方式,Flink 1.14+)
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");
Kafka Source 配置选项:
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("broker1:9092,broker2:9092")
.setTopics("topic1", "topic2") // 订阅多个主题
.setTopics(Pattern.compile("topic-.*")) // 正则匹配主题
.setGroupId("my-consumer-group")
// 起始位置配置
.setStartingOffsets(OffsetsInitializer.earliest()) // 从最早开始
.setStartingOffsets(OffsetsInitializer.latest()) // 从最新开始
.setStartingOffsets(OffsetsInitializer.timestamp(123456789L)) // 从时间戳开始
// 消费者属性
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperty("partition.discovery.interval.ms", "30000") // 分区发现
.build();
Kafka Sink
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink);
精确一次写入 Kafka:
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(...)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("my-transaction")
.setProperty("transaction.timeout.ms", "900000") // 15分钟
.build();
JDBC 连接器
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
// JDBC Sink
stream.addSink(JdbcSink.sink(
"INSERT INTO users (id, name, age) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name=?, age=?",
(ps, record) -> {
ps.setInt(1, record.getId());
ps.setString(2, record.getName());
ps.setInt(3, record.getAge());
ps.setString(4, record.getName());
ps.setInt(5, record.getAge());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/mydb")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("password")
.build()
));
文件 Sink
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.AvroParquetWriterFactory;
// 文件 Sink
FileSink<String> fileSink = FileSink
.forRowFormat(new Path("/output/path"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(60000) // 滚动间隔
.withInactivityInterval(30000) // 不活跃间隔
.withMaxPartSize(128 * 1024 * 1024) // 文件大小
.build()
)
.build();
stream.sinkTo(fileSink);
自定义 Source
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
// 非并行 Source
public class MySource implements SourceFunction<Event> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (isRunning) {
// 生成数据
Event event = generateEvent();
ctx.collect(event);
Thread.sleep(100);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
// 并行 Source
public class MyParallelSource implements ParallelSourceFunction<Event> {
// 实现同上,但可以并行执行
}
// 使用
DataStream<Event> stream = env.addSource(new MySource());
DataStream<Event> parallelStream = env.addSource(new MyParallelSource()).setParallelism(4);
转换操作(Transformation)
基本转换
// map:一对一转换
DataStream<Integer> mapped = stream.map(x -> x * 2);
// 使用 RichMapFunction(可以获取运行时上下文)
DataStream<Integer> richMapped = stream.map(new RichMapFunction<String, Integer>() {
@Override
public void open(Configuration parameters) {
// 初始化资源
}
@Override
public Integer map(String value) {
// 可以获取状态、计数器等
return value.length();
}
@Override
public void close() {
// 释放资源
}
});
// filter:过滤
DataStream<Integer> filtered = stream.filter(x -> x > 0);
// flatMap:一对多转换
DataStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) {
for (String word : value.split(" ")) {
out.collect(word);
}
}
});
KeyedStream 操作
keyBy 将流按键分组,相同 Key 的数据会被路由到同一个并行实例:
// keyBy 方式
KeyedStream<Event, String> keyed = stream.keyBy(event -> event.getUserId());
// 使用 KeySelector
KeyedStream<Event, String> keyed = stream.keyBy(
new KeySelector<Event, String>() {
@Override
public String getKey(Event event) {
return event.getUserId();
}
}
);
// 使用字段位置(Tuple 类型)
KeyedStream<Tuple2<String, Integer>, String> keyed = tupleStream.keyBy(0);
// 使用字段名称(POJO 类型)
KeyedStream<Event, String> keyed = stream.keyBy("userId");
KeyedStream 聚合操作:
// sum:求和
DataStream<Tuple2<String, Integer>> sumResult = keyed.sum(1);
// min/max:最小/最大值
DataStream<Tuple2<String, Integer>> minResult = keyed.min(1);
// minBy/maxBy:返回最小/最大值对应的完整记录
DataStream<Tuple2<String, Integer>> minByResult = keyed.minBy(1);
// reduce:自定义聚合
DataStream<Event> reduced = keyed.reduce((a, b) -> {
a.setCount(a.getCount() + b.getCount());
return a;
});
多流转换
// union:合并多个流(类型必须相同)
DataStream<String> unioned = stream1.union(stream2, stream3);
// connect:连接两个流(可以不同类型)
ConnectedStreams<String, Integer> connected = stream1.connect(stream2);
// CoMap:对连接流分别处理
DataStream<String> coMapped = connected
.map(new CoMapFunction<String, Integer, String>() {
@Override
public String map1(String value) {
return "String: " + value;
}
@Override
public String map2(Integer value) {
return "Integer: " + value;
}
});
// CoFlatMap:对连接流分别进行 flatMap
DataStream<String> coFlatMapped = connected
.flatMap(new CoFlatMapFunction<String, Integer, String>() {
@Override
public void flatMap1(String value, Collector<String> out) {
out.collect("From stream1: " + value);
}
@Override
public void flatMap2(Integer value, Collector<String> out) {
out.collect("From stream2: " + value);
}
});
Interval Join
时间区间内的 Join,适用于两个流的事件在特定时间范围内关联:
// 两个流按键分组
KeyedStream<Event1, String> stream1 = ...;
KeyedStream<Event2, String> stream2 = ...;
// Interval Join
DataStream<String> joined = stream1
.intervalJoin(stream2)
.between(Time.seconds(-5), Time.seconds(5)) // 时间范围
.process(new ProcessJoinFunction<Event1, Event2, String>() {
@Override
public void processElement(Event1 left, Event2 right, Context ctx, Collector<String> out) {
out.collect(left.getId() + " - " + right.getId());
}
});
ProcessFunction
ProcessFunction 是 Flink 中最强大的处理函数,可以访问状态、定时器和侧输出流。
基本 ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
// 基本用法
DataStream<String> processed = stream.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) {
// 正常输出
out.collect(value.toString());
// 获取当前处理时间
long processingTime = ctx.timerService().currentProcessingTime();
// 获取当前事件时间
long eventTime = ctx.timerService().currentWatermark();
}
});
KeyedProcessFunction
KeyedProcessFunction 用于 KeyedStream,可以访问 Keyed State 和注册定时器:
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Event, Result> {
private ValueState<Long> countState;
private ValueState<Long> timerState;
@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));
timerState = getRuntimeContext().getState(
new ValueStateDescriptor<>("timer", Long.class));
}
@Override
public void processElement(Event value, Context ctx, Collector<Result> out) throws Exception {
// 更新计数
Long count = countState.value();
if (count == null) {
count = 0L;
}
count++;
countState.update(count);
// 注册处理时间定时器(10秒后触发)
long timer = ctx.timerService().currentProcessingTime() + 10000;
ctx.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
// 注册事件时间定时器
// ctx.timerService().registerEventTimeTimer(targetTime);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
// 定时器触发时调用
Long count = countState.value();
out.collect(new Result(ctx.getCurrentKey(), count));
// 清除状态
countState.clear();
timerState.clear();
}
@Override
public void close() {
// 释放资源
}
}
// 使用
DataStream<Result> result = keyedStream.process(new MyKeyedProcessFunction());
定时器(Timer)
定时器是 Flink 实现复杂事件处理的关键机制:
public class TimerExample extends KeyedProcessFunction<String, Event, String> {
private ValueState<Long> lastModified;
@Override
public void open(Configuration parameters) {
lastModified = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastModified", Long.class));
}
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
// 更新最后修改时间
lastModified.update(ctx.timerService().currentProcessingTime());
// 注册一个1分钟后的定时器
ctx.timerService().registerProcessingTimeTimer(
ctx.timerService().currentProcessingTime() + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 检查是否已经1分钟没有更新
Long last = lastModified.value();
if (last != null && timestamp - last >= 60000) {
out.collect("Key " + ctx.getCurrentKey() + " has been idle for 1 minute");
lastModified.clear();
}
}
}
侧输出流(Side Output)
侧输出流用于将数据分流到不同的输出:
import org.apache.flink.util.OutputTag;
// 定义侧输出标签
OutputTag<Event> lateData = new OutputTag<Event>("late-data") {};
OutputTag<String> alerts = new OutputTag<String>("alerts") {};
// 主处理逻辑
DataStream<Event> mainStream = stream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event value, Context ctx, Collector<Event> out) {
if (value.isLate()) {
// 发送到侧输出流
ctx.output(lateData, value);
} else if (value.needsAlert()) {
ctx.output(alerts, "Alert: " + value);
} else {
// 正常输出
out.collect(value);
}
}
});
// 获取侧输出流
DataStream<Event> lateStream = mainStream.getSideOutput(lateData);
DataStream<String> alertStream = mainStream.getSideOutput(alerts);
侧输出流应用场景:
- 迟到数据处理
- 异常数据分流
- 告警数据输出
- 调试和监控
数据输出(Sink)
// 输出到控制台
stream.print();
stream.printToErr(); // 输出到标准错误
// 输出到文件
stream.writeAsText("/path/to/output");
stream.writeAsCsv("/path/to/output");
// 输出到 Socket
stream.writeToSocket("localhost", 9999, new SimpleStringSchema());
// 输出到 Kafka(见上文 Kafka Sink)
// 输出到数据库(见上文 JDBC 连接器)
// 自定义 Sink
stream.addSink(new MySinkFunction());
窗口操作
窗口是 Flink 处理无界流的核心概念,将无限的数据流划分为有限的数据块。
窗口类型
滚动窗口(Tumbling Window)
滚动窗口大小固定,窗口之间不重叠:
// 滚动事件时间窗口,大小为5秒
DataStream<T> result = stream
.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("count");
// 滚动处理时间窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// 带偏移量的滚动窗口(窗口从整点前1小时开始)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.hours(-1)))
滑动窗口(Sliding Window)
滑动窗口大小固定,窗口之间有重叠:
// 滑动窗口,大小10秒,滑动步长5秒
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
// 处理时间滑动窗口
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
会话窗口(Session Window)
会话窗口根据数据活跃度划分,无数据时窗口关闭:
// 会话窗口,间隔10秒
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
// 动态间隔会话窗口
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Event>() {
@Override
public long extract(Event element) {
// 根据数据动态确定间隔
return element.getSessionTimeout();
}
}))
全局窗口(Global Window)
全局窗口包含所有数据,需要自定义触发器:
.window(GlobalWindows.create())
.trigger(new MyTrigger())
窗口函数
增量聚合函数
增量聚合函数在数据到达时立即聚合,效率更高:
// reduce
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((a, b) -> {
a.setCount(a.getCount() + b.getCount());
return a;
});
// aggregate
.aggregate(new AggregateFunction<Event, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event value, Long acc) {
return acc + 1;
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
});
全窗口函数
全窗口函数可以访问窗口内的所有元素:
// process:可以获取窗口上下文信息
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
@Override
public void process(String key, Context ctx, Iterable<Event> events, Collector<Result> out) {
long count = 0;
for (Event e : events) {
count++;
}
// 获取窗口信息
long start = ctx.window().getStart();
long end = ctx.window().getEnd();
out.collect(new Result(key, count, start, end));
}
});
增量聚合 + 全窗口函数
可以结合两种方式,既高效又灵活:
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(
// 增量聚合
new AggregateFunction<Event, Long, Long>() {
@Override
public Long createAccumulator() { return 0L; }
@Override
public Long add(Event value, Long acc) { return acc + value.getAmount(); }
@Override
public Long getResult(Long acc) { return acc; }
@Override
public Long merge(Long a, Long b) { return a + b; }
},
// 全窗口函数
new ProcessWindowFunction<Long, Result, String, TimeWindow>() {
@Override
public void process(String key, Context ctx, Iterable<Long> values, Collector<Result> out) {
Long total = values.iterator().next();
out.collect(new Result(key, total, ctx.window().getStart(), ctx.window().getEnd()));
}
}
);
触发器(Trigger)
触发器决定何时触发窗口计算:
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
public class MyTrigger extends Trigger<Event, TimeWindow> {
@Override
public TriggerResult onElement(Event element, long timestamp, TimeWindow window, TriggerContext ctx) {
// 每个元素到达时触发
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
// 处理时间定时器触发
return TriggerResult.FIRE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
// 事件时间定时器触发
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) {
// 清除状态
}
}
// 使用自定义触发器
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(new MyTrigger())
TriggerResult 类型:
| 结果 | 说明 |
|---|---|
| CONTINUE | 不做任何操作 |
| FIRE | 触发计算,保留窗口数据 |
| PURGE | 清除窗口数据 |
| FIRE_AND_PURGE | 触发计算并清除窗口数据 |
移除器(Evictor)
移除器可以在触发后、窗口函数执行前移除元素:
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.evictor(new MyEvictor())
CEP 复杂事件处理
Flink CEP(Complex Event Processing)是 Flink 提供的复杂事件处理库,用于在无界流中检测事件模式。CEP 允许你定义复杂的事件模式,并在数据流中查找匹配的序列。
CEP 核心概念
事件模式
CEP 的核心是定义事件模式,描述你想要检测的事件序列:
- 个体模式:匹配单个事件
- 组合模式:多个个体模式组合成完整模式
- 模式组:模式内的嵌套模式
模式类型
| 模式类型 | 说明 | 示例 |
|---|---|---|
| 单例模式 | 匹配单个事件 | Pattern.begin("start") |
| 循环模式 | 匹配多个事件 | Pattern.times(3) |
| 可选模式 | 事件可能不出现 | Pattern.optional() |
定义模式
基本模式定义
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
// 定义简单模式:连续三个温度超过阈值的事件
Pattern<TemperatureEvent, ?> pattern = Pattern
.<TemperatureEvent>begin("start")
.where(SimpleCondition.of(event -> event.getTemperature() > 30))
.times(3)
.consecutive(); // 要求连续
// 应用模式到数据流
PatternStream<TemperatureEvent> patternStream = CEP.pattern(
temperatureStream,
pattern
);
// 处理匹配结果
DataStream<Alert> alerts = patternStream.select(
new PatternSelectFunction<TemperatureEvent, Alert>() {
@Override
public Alert select(Map<String, List<TemperatureEvent>> pattern) throws Exception {
List<TemperatureEvent> events = pattern.get("start");
return new Alert("Temperature too high for 3 consecutive times", events);
}
}
);
组合模式
组合模式将多个个体模式连接成完整的模式序列:
// 定义组合模式:登录失败后成功的序列
Pattern<LoginEvent, ?> loginPattern = Pattern
// 开始:登录失败
.<LoginEvent>begin("fail")
.where(SimpleCondition.of(event -> event.getType().equals("fail")))
.times(3) // 连续失败3次
.consecutive()
// 接着:登录成功
.followedBy("success")
.where(SimpleCondition.of(event -> event.getType().equals("success")))
// 时间限制:10秒内完成整个序列
.within(Time.seconds(10));
// 处理匹配结果
DataStream<SecurityAlert> securityAlerts = CEP.pattern(loginStream, loginPattern)
.select(new PatternSelectFunction<LoginEvent, SecurityAlert>() {
@Override
public SecurityAlert select(Map<String, List<LoginEvent>> pattern) {
List<LoginEvent> fails = pattern.get("fail");
LoginEvent success = pattern.get("success").get(0);
return new SecurityAlert(
"Suspicious login: 3 fails followed by success",
fails.get(0).getUserId(),
success.getTimestamp()
);
}
});
模式连接方式
| 连接方式 | 说明 | 使用场景 |
|---|---|---|
next() | 严格连续,中间不能有其他事件 | 紧密相关的事件序列 |
followedBy() | 非严格连续,中间可以有其他事件 | 不严格要求连续 |
followedByAny() | 非确定性松散连续 | 多种可能的匹配路径 |
notNext() | 严格连续的不匹配 | 排除某些模式 |
notFollowedBy() | 非严格连续的不匹配 | 排除某些模式 |
// 严格连续:A 紧接着 B
Pattern.begin("A").where(...).next("B").where(...)
// 非严格连续:A 之后有 B(中间可以隔其他事件)
Pattern.begin("A").where(...).followedBy("B").where(...)
// 排除模式:A 之后不能紧接 C
Pattern.begin("A").where(...).notNext("C").where(...)
条件定义
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
// 简单条件
.where(SimpleCondition.of(event -> event.getValue() > 100))
// 迭代条件(可以访问之前匹配的事件)
.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
// 获取之前匹配的事件
Iterable<Event> previousEvents = ctx.getEventsForPattern("start");
for (Event e : previousEvents) {
if (value.getValue() > e.getValue() * 2) {
return true;
}
}
return false;
}
})
// 组合条件
.where(condition1)
.or(condition2)
.and(condition3)
.until(stopCondition) // 直到满足停止条件
循环模式
// 匹配恰好 3 次
Pattern.begin("start").times(3)
// 匹配 2 到 4 次
Pattern.begin("start").times(2, 4)
// 匹配 1 次或多次
Pattern.begin("start").oneOrMore()
// 匹配 0 次或多次
Pattern.begin("start").timesOrMore(0)
// 可选(0 次或 1 次)
Pattern.begin("start").optional()
// 贪婪匹配(尽可能多匹配)
Pattern.begin("start").oneOrMore().greedy()
// 连续性约束
Pattern.begin("start").times(3).consecutive() // 严格连续
Pattern.begin("start").times(3).allowCombinations() // 允许非连续
处理匹配结果
select 方法
select 方法用于处理成功匹配的模式:
// 使用 PatternSelectFunction
DataStream<Result> results = patternStream.select(
new PatternSelectFunction<Event, Result>() {
@Override
public Result select(Map<String, List<Event>> pattern) throws Exception {
// pattern 是一个 Map,key 是模式名称,value 是匹配的事件列表
Event start = pattern.get("start").get(0);
List<Event> middle = pattern.get("middle");
Event end = pattern.get("end").get(0);
return new Result(start, middle, end);
}
}
);
// 使用 Lambda 表达式(Flink 1.11+)
DataStream<Result> results = patternStream.select(pattern -> {
Event start = pattern.get("start").get(0);
List<Event> events = pattern.get("middle");
return new Result(start, events);
});
flatSelect 方法
flatSelect 可以输出多条结果:
DataStream<Result> results = patternStream.flatSelect(
new PatternFlatSelectFunction<Event, Result>() {
@Override
public void flatSelect(Map<String, List<Event>> pattern,
Collector<Result> out) throws Exception {
// 可以输出多条结果
for (Event event : pattern.get("events")) {
out.collect(new Result(event));
}
}
}
);
处理超时事件
当模式设置了时间限制,未完成匹配的事件序列会触发超时:
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
// 定义带超时处理的函数
class MyPatternProcessFunction extends PatternProcessFunction<Event, Result>
implements TimedOutPartialMatchHandler<Event> {
@Override
public void processMatch(Map<String, List<Event>> match,
Context ctx, Collector<Result> out) throws Exception {
// 处理成功匹配
out.collect(new Result(match));
}
@Override
public void processTimedOutMatch(Map<String, List<Event>> match,
Context ctx) throws Exception {
// 处理超时的部分匹配
// 可以输出到侧输出流或执行其他操作
ctx.output(new OutputTag<Event>("timeout"){}, match.get("start").get(0));
}
}
// 应用带超时处理的函数
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Result> results = patternStream.process(new MyPatternProcessFunction());
CEP 应用示例
实时欺诈检测
// 检测短时间内大额交易
Pattern<Transaction, ?> fraudPattern = Pattern
.<Transaction>begin("small")
.where(SimpleCondition.of(t -> t.getAmount() < 10))
.followedBy("large")
.where(SimpleCondition.of(t -> t.getAmount() > 10000))
.within(Time.minutes(5));
DataStream<FraudAlert> fraudAlerts = CEP.pattern(transactions, fraudPattern)
.select(pattern -> {
Transaction small = pattern.get("small").get(0);
Transaction large = pattern.get("large").get(0);
return new FraudAlert(
"Suspicious pattern: small then large transaction",
small.getAccountId(),
large.getTimestamp()
);
});
用户行为分析
// 检测用户浏览-收藏-购买路径
Pattern<UserAction, ?> purchasePattern = Pattern
.<UserAction>begin("view")
.where(SimpleCondition.of(a -> a.getType().equals("view")))
.followedBy("favorite")
.where(SimpleCondition.of(a -> a.getType().equals("favorite")))
.followedBy("purchase")
.where(SimpleCondition.of(a -> a.getType().equals("purchase")))
.within(Time.hours(1));
DataStream<ConversionResult> conversions = CEP.pattern(userActions, purchasePattern)
.select(pattern -> {
UserAction view = pattern.get("view").get(0);
UserAction favorite = pattern.get("favorite").get(0);
UserAction purchase = pattern.get("purchase").get(0);
return new ConversionResult(
view.getUserId(),
view.getProductId(),
view.getTimestamp(),
purchase.getTimestamp()
);
});
CEP 最佳实践
- 合理设置时间限制:使用
within()限制模式匹配时间,避免状态无限增长 - 优化条件判断:条件函数应尽量简单高效,避免复杂计算
- 处理超时事件:对于有时间限制的模式,考虑超时事件的处理
- 控制状态大小:复杂模式会产生大量状态,注意监控和调优
- 使用侧输出流:对于超时或不完整匹配,使用侧输出流收集
时间语义
Flink 支持三种时间语义,正确理解和使用时间对于流处理至关重要。
处理时间(Processing Time)
处理时间是事件被处理时的机器时间:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 或在窗口中指定
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
特点:
- 简单,无需考虑事件时间
- 性能最好
- 结果不可重现
适用场景:
- 对时间精度要求不高
- 不需要处理迟到数据
- 简单的实时监控
事件时间(Event Time)
事件时间是事件实际发生的时间,需要从数据中提取:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 提取时间戳和水位线
DataStream<Event> withTimestamps = stream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
特点:
- 结果可重现
- 正确处理乱序数据
- 需要处理水位线和迟到数据
摄入时间(Ingestion Time)
摄入时间是事件进入 Flink 系统的时间:
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
水位线(Watermark)
水位线用于衡量事件时间的进度,处理乱序数据。
水位线概念
水位线是一个时间戳,表示"到此时间戳为止的数据已经全部到达"。水位线是单调递增的,当水位线 W 到达某个算子时,该算子可以确认所有时间戳小于 W 的事件都已经处理完毕。
事件流: E1(10:00) E2(10:02) E3(10:01) E4(10:05) E5(10:03)
水位线: W(10:00) W(10:02) W(10:04)
水位线策略
// 有界乱序水位线(允许一定程度的乱序)
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
// 单调递增水位线(无乱序)
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
// 自定义水位线生成器
WatermarkStrategy.<Event>forGenerator(ctx -> new MyWatermarkGenerator())
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
处理空闲源
当某个分区没有数据时,水位线可能无法前进:
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(1)) // 1分钟无数据则标记为空闲
多流的水位线对齐
当有多个输入流时,Flink 会取所有流的最小水位线作为当前水位线:
Stream1: W(10:00) → W(10:02) → W(10:04)
Stream2: W(10:01) → W(10:03) → W(10:03)
Result: W(10:00) → W(10:02) → W(10:03) // 取最小值
迟到数据处理
即使有了水位线,仍可能有迟到数据:
// 方式1:侧输出流收集迟到数据
OutputTag<Event> lateData = new OutputTag<Event>("late-data") {};
DataStream<Result> result = stream
.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Duration.ofSeconds(10)) // 允许迟到10秒
.sideOutputLateData(lateData) // 超过允许时间的迟到数据
.aggregate(new MyAggregateFunction());
// 获取迟到数据
DataStream<Event> lateStream = result.getSideOutput(lateData);
// 方式2:直接丢弃(默认行为)
状态管理
状态管理是 Flink 的核心特性,支持有状态计算。
状态类型
Keyed State
Keyed State 绑定到每个 Key 上,只能用于 KeyedStream:
public class CountFunction extends KeyedProcessFunction<String, Event, Result> {
// ValueState:单个值
private ValueState<Long> countState;
// ListState:列表
private ListState<String> listState;
// MapState:映射
private MapState<String, Integer> mapState;
// ReducingState:聚合状态
private ReducingState<Long> reducingState;
// AggregatingState:更灵活的聚合状态
private AggregatingState<Event, Long> aggregatingState;
@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));
listState = getRuntimeContext().getListState(
new ListStateDescriptor<>("list", String.class));
mapState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("map", String.class, Integer.class));
}
@Override
public void processElement(Event value, Context ctx, Collector<Result> out) throws Exception {
// ValueState 操作
Long count = countState.value();
if (count == null) {
count = 0L;
}
count++;
countState.update(count);
// ListState 操作
listState.add(value.getId());
Iterable<String> allItems = listState.get();
// MapState 操作
mapState.put(value.getId(), value.getCount());
Integer mapValue = mapState.get(value.getId());
boolean contains = mapState.contains(value.getId());
// 清除状态
// countState.clear();
// listState.clear();
// mapState.clear();
out.collect(new Result(value.getKey(), count));
}
}
Operator State
Operator State 绑定到每个算子实例上:
public class MySource implements SourceFunction<Event>, CheckpointedFunction {
private ListState<Event> state;
private List<Event> buffered = new ArrayList<>();
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("state", Event.class));
// 从检查点恢复
if (context.isRestored()) {
for (Event e : state.get()) {
buffered.add(e);
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.clear();
state.addAll(buffered);
}
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// 发送数据
}
@Override
public void cancel() {}
}
广播状态(Broadcast State)
广播状态用于将配置或规则广播到所有并行实例:
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
// 定义广播状态描述符
MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>("rules", String.class, Rule.class);
// 广播规则流
DataStream<Rule> rules = ...;
BroadcastStream<Rule> ruleBroadcastStream = rules.broadcast(ruleStateDescriptor);
// 连接数据流和广播流
DataStream<Event> events = ...;
DataStream<Result> result = events
.connect(ruleBroadcastStream)
.process(new BroadcastProcessFunction<Event, Rule, Result>() {
@Override
public void processElement(Event event, ReadOnlyContext ctx, Collector<Result> out) throws Exception {
// 读取广播状态(只读)
ReadOnlyBroadcastState<String, Rule> rules =
ctx.getBroadcastState(ruleStateDescriptor);
Rule rule = rules.get(event.getRuleId());
if (rule != null) {
// 应用规则
out.collect(applyRule(event, rule));
}
}
@Override
public void processBroadcastElement(Rule rule, Context ctx, Collector<Result> out) throws Exception {
// 更新广播状态
BroadcastState<String, Rule> rules = ctx.getBroadcastState(ruleStateDescriptor);
rules.put(rule.getId(), rule);
}
});
状态后端
状态后端决定状态的存储方式:
// HashMapStateBackend:状态存内存(默认)
env.setStateBackend(new HashMapStateBackend());
// EmbeddedRocksDBStateBackend:状态存RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// 配置检查点目录
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");
状态后端对比:
| 特性 | HashMapStateBackend | EmbeddedRocksDBStateBackend |
|---|---|---|
| 存储位置 | JVM 堆内存 | 本地磁盘(RocksDB) |
| 状态大小 | 受限于内存 | 可超过内存 |
| 访问延迟 | 微秒级 | 毫秒级 |
| 适用场景 | 小状态、低延迟 | 大状态、生产环境 |
ForSt 状态后端(Flink 2.0+):
ForSt(For Streaming)是 Flink 2.0 引入的全新解耦状态后端,专为云原生环境设计。它将状态存储从计算资源分离,使用分布式文件系统(DFS)作为主要存储介质,解决了容器化环境中本地磁盘受限、扩缩容慢、检查点重等问题。
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
// 通过配置启用 ForSt 状态后端
Configuration config = new Configuration();
config.setString("state.backend.type", "forst");
config.setString("state.checkpoints.dir", "hdfs:///flink/checkpoints");
// ForSt 远程存储路径
config.setString("state.forst.remote-storage", "hdfs:///flink/forst-state");
// 本地缓存大小(用于加速状态访问)
config.setString("state.forst.local-cache-size", "1gb");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
配置文件方式(config.yaml):
# 状态后端配置
state:
backend:
type: forst
# ForSt 远程存储路径(使用分布式文件系统)
forst:
remote-storage: hdfs:///flink/forst-state
# 本地缓存大小(用于加速状态访问)
local-cache-size: 1gb
checkpoints:
dir: hdfs:///flink/checkpoints
# 检查点配置
execution:
checkpointing:
interval: 60000
mode: EXACTLY_ONCE
timeout: 600000
min-pause: 30000
# 启用异步状态处理(与 ForSt 配合使用,SQL 作业)
# 对于 SQL 作业,设置以下参数启用异步状态 API
table:
exec:
async-state:
enabled: true
ForSt 核心特性:
| 特性 | 说明 |
|---|---|
| 解耦存储 | 状态存储在分布式文件系统,不受容器本地磁盘限制 |
| 快速扩缩容 | TB 级状态作业可快速恢复和扩缩容,无需完整状态迁移 |
| 轻量检查点 | 原生支持轻量快速的检查点,检查点大小不随状态线性增长 |
| 异步执行 | 与异步执行模型配合,避免状态访问阻塞计算 |
| 本地缓存 | 支持本地缓存热数据,平衡延迟和成本 |
ForSt 与 RocksDB 对比:
| 维度 | RocksDB | ForSt |
|---|---|---|
| 存储位置 | 本地磁盘 | 分布式文件系统 |
| 扩缩容 | 需要完整状态迁移 | 快速恢复(秒级) |
| 云原生适配 | 受容器磁盘限制 | 完全适配 K8s 环境 |
| 检查点开销 | 较大(需完整快照或增量) | 轻量(状态已在 DFS) |
| 状态大小限制 | 受本地磁盘限制 | 可达 TB 级别 |
| 适用场景 | 传统集群 | 云原生环境(K8s) |
ForSt 适用场景:
- 云原生环境:Kubernetes 部署,容器本地磁盘有限
- 大状态作业:状态达到数百 GB 或 TB 级别
- 需要快速扩缩容:业务高峰期需要快速扩容
- 降低运维成本:无需单独配置本地 SSD 存储
RocksDB 配置优化:
对于使用 RocksDB 的生产作业,以下配置可以显著提升性能:
import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.rocksdb.Options;
// 创建 RocksDB 状态后端并启用增量检查点
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
// 设置本地存储目录(建议使用 SSD)
backend.setLocalRecoveryDirectory(new Path("/mnt/ssd/flink-state"));
// 通过配置参数优化 RocksDB
Configuration config = new Configuration();
// 块缓存大小,影响读性能
config.setString("state.backend.rocksdb.block.cache-size", "256m");
// 写缓冲区大小,影响写性能
config.setString("state.backend.rocksdb.writebuffer.size", "64m");
// 写缓冲区数量
config.setString("state.backend.rocksdb.writebuffer.count", "4");
// 后台线程数(建议根据 CPU 核数调整)
config.setString("state.backend.rocksdb.thread.num", "8");
// 启用 Bloom 过滤器,提升读取性能
config.setString("state.backend.rocksdb.use-bloom-filter", "true");
env.setStateBackend(backend);
关键配置参数说明:
| 参数 | 说明 | 建议值 |
|---|---|---|
state.backend.rocksdb.localdir | 本地存储目录 | SSD 磁盘路径 |
state.backend.rocksdb.block.cache-size | 块缓存大小 | 状态大小的 10-20% |
state.backend.rocksdb.writebuffer.size | 写缓冲区大小 | 64-128MB |
state.backend.rocksdb.writebuffer.count | 写缓冲区数量 | 4-8 |
state.backend.rocksdb.thread.num | 后台线程数 | CPU 核数 |
state.backend.rocksdb.use-bloom-filter | 启用 Bloom 过滤器 | true(读密集场景) |
配置文件方式(config.yaml):
state:
backend:
type: rocksdb
incremental: true
localdir: /mnt/ssd/flink-state
block.cache-size: 256m
writebuffer.size: 64m
writebuffer.count: 4
thread.num: 8
use-bloom-filter: true
状态后端选择建议:
| 场景 | 推荐状态后端 |
|---|---|
| 状态小于内存,追求低延迟 | HashMapStateBackend |
| 状态较大,生产环境 | EmbeddedRocksDBStateBackend |
| 云原生环境,TB 级状态 | ForSt(Flink 2.0+) |
| 需要快速扩缩容 | ForSt(Flink 2.0+) |
| 读密集型作业 | RocksDB + Bloom 过滤器 |
| 写密集型作业 | RocksDB + 增大 writebuffer |
状态 TTL
状态 TTL 用于自动清理过期状态,防止状态无限增长:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24)) // TTL 时间
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 更新策略
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 可见性
.cleanupInRocksdbCompactFilter(1000) // RocksDB 压缩时清理
.build();
ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>("myState", String.class);
descriptor.enableTimeToLive(ttlConfig);
TTL 配置说明:
// 更新类型
.setUpdateType(UpdateType.OnCreateAndWrite) // 创建和写入时更新 TTL(默认)
.setUpdateType(UpdateType.OnReadAndWrite) // 读取和写入时更新 TTL
.setUpdateType(UpdateType.Disabled) // 禁用 TTL 更新
// 可见性
.setStateVisibility(StateVisibility.NeverReturnExpired) // 永不返回过期数据(默认)
.setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp) // 如果未清理则返回
// 清理策略
.cleanupFullSnapshot() // 全量快照时清理
.cleanupIncrementally(10, true) // 增量清理
.cleanupInRocksdbCompactFilter(1000) // RocksDB 压缩时清理
.cleanupInBackground(1000, true) // 后台清理
容错机制
Flink 通过检查点机制实现容错,保证精确一次语义。
检查点原理
Chandy-Lamport 算法
Flink 的检查点基于 Chandy-Lamport 分布式快照算法的变体:
- 注入 Barrier:JobManager 向数据源注入检查点 Barrier
- Barrier 传递:Barrier 随数据流一起传递
- Barrier 对齐:多输入算子等待所有输入的 Barrier 到达
- 状态快照:Barrier 对齐后,算子进行状态快照
- 完成检查点:所有算子完成快照后,检查点完成
Source-1 ──── [record] [record] | barrier | [record] ────> Operator
Source-2 ──── [record] | barrier | [record] [record] ────> Operator
│
Barrier 对齐 ─────────┘
快照状态
传递 Barrier
检查点内容
一个完整的检查点包含:
- 算子状态:每个算子实例的状态
- Keyed 状态:按键分区的状态
- 源偏移量:数据源的读取位置(如 Kafka offset)
- Sink 预提交:两阶段提交 Sink 的预提交信息
检查点配置
// 启用检查点,每1秒一次
env.enableCheckpointing(1000);
// 高级配置
CheckpointConfig config = env.getCheckpointConfig();
// 设置模式:精确一次或至少一次
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置超时时间
config.setCheckpointTimeout(60000);
// 设置最小间隔(两次检查点之间的最小间隔)
config.setMinPauseBetweenCheckpoints(500);
// 设置最大并发检查点数
config.setMaxConcurrentCheckpoints(1);
// 设置可容忍的检查点失败次数
config.setTolerableCheckpointFailureNumber(3);
// 启用外部持久化检查点
config.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置检查点存储位置
config.setCheckpointStorage("hdfs:///checkpoints");
// 启用非对齐检查点(处理反压情况)
config.enableUnalignedCheckpoints();
// 设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
关键配置说明:
| 参数 | 说明 | 建议值 |
|---|---|---|
| checkpointInterval | 检查点间隔 | 1-5分钟 |
| checkpointTimeout | 检查点超时 | 5-10分钟 |
| minPauseBetweenCheckpoints | 最小间隔 | 30秒 |
| tolerableCheckpointFailureNumber | 可容忍失败次数 | 3-5 |
增量检查点
对于大状态作业,增量检查点可以显著减少 I/O:
// 启用增量检查点
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
// 注意:HashMapStateBackend 不支持增量检查点
增量检查点只保存自上次检查点以来的状态变化,适合:
- 状态变化较小的作业
- 需要减少检查点时间的作业
非对齐检查点
当作业存在反压时,Barrier 对齐可能很慢。非对齐检查点可以解决:
config.enableUnalignedCheckpoints();
config.setAlignmentTimeout(Duration.ofSeconds(30)); // 对齐超时后启用非对齐
非对齐检查点原理:
- 不等待 Barrier 对齐
- 将网络缓冲区中的数据也保存到检查点
- 恢复时重放这些数据
保存点(Savepoint)
保存点是手动触发的检查点,用于版本升级和迁移:
# 触发保存点
flink savepoint <job-id> <target-directory>
# 从保存点恢复
flink run -s <savepoint-path> -d <jar-file>
# 取消作业并创建保存点
flink cancel -s <target-directory> <job-id>
# 从保存点恢复并允许状态兼容性
flink run -s <savepoint-path> -d --allowNonRestoredState <jar-file>
保存点与检查点的区别:
| 特性 | 检查点 | 保存点 |
|---|---|---|
| 触发方式 | 自动 | 手动 |
| 目的 | 故障恢复 | 版本迁移、备份 |
| 生命周期 | 自动管理 | 手动管理 |
| 格式 | 可增量和全量 | 全量 |
Flink SQL
Flink SQL 提供了更简洁的流处理方式,支持标准 SQL 语法。
Table API 与 SQL
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
// 创建 Table 环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 或者从 StreamExecutionEnvironment 创建
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
连接器配置
Kafka 连接器
-- 创建 Kafka 源表
CREATE TABLE orders (
order_id INT,
user_id INT,
product_id INT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'order-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
-- 创建 Kafka Sink 表
CREATE TABLE order_output (
user_id INT,
total_amount DECIMAL(10, 2),
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'order-output',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
JDBC 连接器
-- 创建 JDBC 表
CREATE TABLE users (
id INT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = 'users',
'username' = 'root',
'password' = 'password',
'driver' = 'com.mysql.cj.jdbc.Driver'
);
-- Lookup Join(维表关联)
SELECT o.order_id, o.amount, u.name
FROM orders AS o
LEFT JOIN users FOR SYSTEM_TIME AS OF o.proctime AS u
ON o.user_id = u.id;
文件系统连接器
-- 创建文件系统表
CREATE TABLE file_sink (
user_id INT,
total_amount DECIMAL(10, 2),
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///output/orders',
'format' = 'parquet',
'partition.default-name' = 'default',
'sink.partition-commit.policy.kind' = 'success-file'
);
窗口查询
-- 滚动窗口
SELECT
user_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end,
SUM(amount) as total_amount
FROM orders
GROUP BY
user_id,
TUMBLE(order_time, INTERVAL '1' HOUR);
-- 滑动窗口
SELECT
user_id,
HOP_START(order_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_start,
HOP_END(order_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_end,
SUM(amount) as total_amount
FROM orders
GROUP BY
user_id,
HOP(order_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR);
-- 会话窗口
SELECT
user_id,
SESSION_START(order_time, INTERVAL '30' MINUTE) as window_start,
SESSION_END(order_time, INTERVAL '30' MINUTE) as window_end,
SUM(amount) as total_amount
FROM orders
GROUP BY
user_id,
SESSION(order_time, INTERVAL '30' MINUTE);
-- 窗口 TVF(Table-Valued Function,推荐)
SELECT
user_id,
window_start,
window_end,
SUM(amount) as total_amount
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY user_id, window_start, window_end;
Join 操作
-- Regular Join(无界流 Join,状态会无限增长)
SELECT o.order_id, o.amount, u.name
FROM orders o
JOIN users u ON o.user_id = u.id;
-- Interval Join(时间区间 Join)
SELECT o.order_id, o.amount, p.product_name
FROM orders o, products p
WHERE o.product_id = p.id
AND o.order_time BETWEEN p.update_time - INTERVAL '1' HOUR AND p.update_time;
-- Temporal Join(维表 Join,需要 Lookup Source)
SELECT o.order_id, o.amount, u.name
FROM orders AS o
JOIN user_dim FOR SYSTEM_TIME AS OF o.proctime AS u
ON o.user_id = u.id;
函数
-- 内置函数
SELECT
UPPER(name),
LOWER(name),
SUBSTRING(name, 1, 5),
CONCAT(name, '-', CAST(age AS STRING)),
DATE_FORMAT(order_time, 'yyyy-MM-dd'),
DATE_ADD(order_time, INTERVAL '1' DAY),
DATEDIFF(order_time, CURRENT_DATE)
FROM orders;
-- 聚合函数
SELECT
COUNT(*) as total,
SUM(amount) as total_amount,
AVG(amount) as avg_amount,
MAX(amount) as max_amount,
MIN(amount) as min_amount
FROM orders;
-- 窗口函数
SELECT
order_id,
user_id,
amount,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY amount DESC) as rn,
RANK() OVER (PARTITION BY user_id ORDER BY amount DESC) as rk,
SUM(amount) OVER (PARTITION BY user_id ORDER BY order_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total
FROM orders;
-- 自定义函数(UDF)
-- 注册 UDF
tableEnv.createTemporarySystemFunction("myUpper", MyUpperFunction.class);
-- 使用 UDF
SELECT myUpper(name) FROM orders;
SQL 语法增强(Flink 2.0+)
Flink 2.0 引入了多项 SQL 语法增强,简化开发体验:
C-style 转义字符串
支持使用 C 风格的转义字符串,便于处理特殊字符:
-- 使用 C-style 转义字符串(前缀 E)
SELECT E'\t\n\r' as special_chars; -- 制表符、换行符、回车符
-- 传统方式
SELECT CHAR(9) || CHAR(10) || CHAR(13) as special_chars;
-- 实际应用:处理包含特殊字符的数据
SELECT
id,
E'Hello\tWorld\n' as formatted_text
FROM orders;
QUALIFY 子句
QUALIFY 子句简化了窗口函数结果的过滤,语法更简洁:
-- 传统方式:使用子查询过滤窗口函数结果
SELECT * FROM (
SELECT
user_id,
order_id,
amount,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY amount DESC) as rn
FROM orders
) WHERE rn <= 3;
-- 使用 QUALIFY 子句(更简洁)
SELECT
user_id,
order_id,
amount
FROM orders
QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY amount DESC) <= 3;
-- Top-N 查询示例
SELECT
user_id,
product_id,
COUNT(*) as purchase_count
FROM user_purchases
QUALIFY ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) <= 10;
-- 去重示例
SELECT *
FROM orders
QUALIFY ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime) = 1;
表函数调用简化
Flink 2.0 中,表函数调用不再强制需要 TABLE() 包装:
-- 传统方式(仍然支持)
SELECT * FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR));
-- 简化方式(Flink 2.0+)
SELECT * FROM TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR);
-- 窗口表函数示例
SELECT
window_start,
window_end,
user_id,
COUNT(*) as order_count
FROM TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
GROUP BY window_start, window_end, user_id;
-- HOP 窗口
SELECT * FROM HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '30' MINUTE, INTERVAL '1' HOUR);
-- SESSION 窗口
SELECT * FROM SESSION(TABLE orders, DESCRIPTOR(order_time), INTERVAL '30' MINUTE);
SQL Gateway 应用模式
Flink 2.0 中,SQL Gateway 支持 Application 模式执行 SQL 作业,替代了已移除的 Per-Job 部署模式:
# 通过 SQL Gateway 以 Application 模式提交作业
# 适用于生产环境的资源隔离需求
# 启动 SQL Gateway
./bin/sql-gateway.sh start
# 提交 SQL 作业(Application 模式)
curl -X POST http://localhost:8083/v1/sessions/session_id/statements \
-H "Content-Type: application/json" \
-d '{
"statement": "INSERT INTO sink_table SELECT * FROM source_table",
"executionConfig": {
"execution.mode": "APPLICATION"
}
}'
物化表(Materialized Table)
物化表是 Flink 2.0 引入的新型表类型,它是流批统一处理的核心。物化表让用户可以声明式地管理实时和历史数据,通过单一管道实现数据的统一处理,无需关心底层是流还是批执行模式。
物化表核心概念
物化表的核心思想是将数据的定义、存储和刷新策略解耦:
- 定义:通过 SQL 定义数据的计算逻辑
- 存储:底层使用 Apache Paimon 作为存储格式(Flink 2.0 目前唯一支持的存储)
- 刷新:支持流式刷新(实时)和批式刷新(定时)
创建物化表
-- 创建物化表
CREATE MATERIALIZED TABLE user_order_summary
(
-- 主键定义(必须)
PRIMARY KEY (user_id, window_start) NOT ENFORCED
)
-- 刷新策略
REFRESH MODE AUTO
WITH (
-- 存储:使用 Paimon 作为底层存储
'connector' = 'paimon',
'path' = 'hdfs:///warehouse/user_order_summary',
-- 刷新配置
'refresh.mode' = 'streaming', -- 流式刷新(实时)
-- 'refresh.mode' = 'batch', -- 批式刷新(定时)
'refresh.interval' = '1 h' -- 批式刷新间隔
)
AS SELECT
user_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
GROUP BY
user_id,
TUMBLE(order_time, INTERVAL '1' HOUR);
刷新模式说明
物化表支持三种刷新模式:
| 模式 | 说明 | 适用场景 |
|---|---|---|
| STREAMING | 流式刷新,数据变化实时反映 | 实时大屏、实时监控 |
| BATCH | 批式刷新,按固定间隔刷新 | 日报表、小时报表 |
| AUTO | 自动选择,根据查询模式决定 | 通用场景 |
-- 流式刷新物化表(实时更新)
CREATE MATERIALIZED TABLE realtime_metrics
REFRESH MODE STREAMING
WITH ('connector' = 'paimon', 'path' = '...')
AS SELECT ...;
-- 批式刷新物化表(每小时刷新)
CREATE MATERIALIZED TABLE hourly_report
REFRESH MODE BATCH EVERY INTERVAL '1' HOUR
WITH ('connector' = 'paimon', 'path' = '...')
AS SELECT ...;
查询物化表
-- 直接查询物化表(获取最新数据)
SELECT * FROM user_order_summary
WHERE window_start >= TIMESTAMP '2024-01-01 00:00:00';
-- 物化表支持时间旅行(查看历史数据)
SELECT * FROM user_order_summary
FOR SYSTEM_TIME AS OF TIMESTAMP '2024-01-15 10:00:00';
-- 物化表之间的 Join
SELECT
u.user_id,
u.total_amount,
p.product_count
FROM user_order_summary u
JOIN product_summary p ON u.window_start = p.window_start;
物化表管理
-- 手动刷新物化表
CALL sys.refresh_materialized_table('user_order_summary');
-- 查看物化表状态
SHOW CREATE MATERIALIZED TABLE user_order_summary;
-- 修改物化表定义(不重新处理历史数据)
ALTER MATERIALIZED TABLE user_order_summary
SET REFRESH MODE BATCH EVERY INTERVAL '2' HOUR;
-- 删除物化表
DROP MATERIALIZED TABLE user_order_summary;
物化表与传统方案对比
| 维度 | 传统方案(流批分离) | 物化表 |
|---|---|---|
| 代码维护 | 需要维护两套代码 | 单一 SQL 定义 |
| 数据一致性 | 可能不一致 | 保证一致 |
| Schema 变更 | 需要重新处理 | 支持无缝变更 |
| 执行模式 | 需要手动选择 | 自动适配 |
| 学习成本 | 需要理解流批差异 | 只需理解 SQL |
物化表最佳实践
- 合理设计主键:主键决定数据的唯一性,直接影响存储和查询效率
- 选择合适的刷新模式:根据业务时效性要求选择流式或批式刷新
- 利用时间旅行:物化表支持历史数据查询,可用于数据审计和问题排查
- 监控刷新延迟:确保物化表数据及时更新
部署模式
Flink 支持多种部署模式,适应不同的应用场景。
Session 模式
Session 模式下,多个作业共享一个 Flink 集群:
# 启动 Session 集群
bin/start-cluster.sh
# 提交作业
bin/flink run -c com.example.MyJob myjob.jar
# 停止集群
bin/stop-cluster.sh
适用场景:
- 短作业、频繁提交
- 资源共享需求
- 开发测试环境
Per-Job 模式
每个作业独立一个 Flink 集群:
# YARN Per-Job 模式
bin/flink run -m yarn-cluster -c com.example.MyJob myjob.jar
# Kubernetes Per-Job 模式
bin/flink run-application -t kubernetes-application -c com.example.MyJob myjob.jar
适用场景:
- 长时间运行的作业
- 需要资源隔离
- 生产环境
Application 模式
Application 模式下,作业的 main() 方法在集群中执行:
# YARN Application 模式
bin/flink run-application -t yarn-application -c com.example.MyJob myjob.jar
# Kubernetes Application 模式
bin/flink run-application -t kubernetes-application \
-Dkubernetes.cluster-id=my-cluster \
-Dkubernetes.container.image=flink:latest \
-c com.example.MyJob \
local:///opt/flink/usrlib/myjob.jar
优势:
- 减少客户端负载
- 更好的资源隔离
- 支持作业内多个执行
资源管理器集成
YARN
# flink-conf.yaml
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2g
taskmanager.numberOfTaskSlots: 4
# 提交到 YARN
bin/flink run -m yarn-cluster \
-yjm 1024 -ytm 2048 -ys 4 \
-c com.example.MyJob myjob.jar
Kubernetes
# flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 4
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
# 部署到 Kubernetes
kubectl apply -f flink-configuration-configmap.yaml
kubectl apply -f jobmanager-deployment.yaml
kubectl apply -f taskmanager-deployment.yaml
性能调优
内存调优
Flink 的内存模型:
Total Process Memory
├── JVM Heap
│ ├── Framework Heap
│ └── Task Heap
├── Managed Memory
├── Network Memory
├── JVM Metaspace
└── JVM Overhead
关键配置:
# TaskManager 内存配置
taskmanager.memory.process.size: 4096m
taskmanager.memory.task.heap.size: 1024m
taskmanager.memory.managed.size: 1024m
taskmanager.memory.network.min: 64m
taskmanager.memory.network.max: 256m
# JobManager 内存配置
jobmanager.memory.process.size: 1600m
jobmanager.memory.heap.size: 1024m
状态后端优化
// RocksDB 优化配置
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
backend.setDbStoragePath("/mnt/ssd/flink-state"); // 使用 SSD
// RocksDB 配置
Options options = new Options()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableAutoCompactions(false)
.setMaxOpenFiles(-1);
网络调优
# 网络缓冲区配置
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
taskmanager.network.numberOfBuffers: 2048
并行度设置
// 全局并行度
env.setParallelism(4);
// 算子并行度
stream.map(...).setParallelism(8);
// Kafka Source 并行度(建议与分区数一致)
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("...")
.setTopics("...")
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka")
.setParallelism(partitionCount);
反压处理
反压是指下游处理速度跟不上上游,导致数据积压。
检测反压:
- Flink Web UI 的 Backpressure 标签
- 使用 REST API:
/jobs/<job-id>/vertices/<vertex-id>/backpressure
处理方法:
// 1. 增加并行度
stream.map(...).setParallelism(16);
// 2. 优化算子实现(减少 GC)
// 使用基本类型而非对象
// 避免创建大量临时对象
// 3. 使用异步 I/O
AsyncDataStream.orderedWait(
stream,
new AsyncDatabaseRequest(),
1000, // 超时时间
TimeUnit.MILLISECONDS,
100 // 最大并发请求数
);
// 4. 启用非对齐检查点
config.enableUnalignedCheckpoints();
背压监控
关键指标:
| 指标 | 说明 |
|---|---|
| outPoolUsage | 输出缓冲区使用率 |
| inPoolUsage | 输入缓冲区使用率 |
| backPressuredTimeMsPerSecond | 每秒背压时间 |
| busyTimeMsPerSecond | 每秒忙碌时间 |
| idleTimeMsPerSecond | 每秒空闲时间 |
监控与运维
关键监控指标
| 类别 | 指标 | 说明 |
|---|---|---|
| 作业状态 | fullRestarts | 完全重启次数 |
| numberOfFailedCheckpoints | 失败的检查点数 | |
| numberOfCompletedCheckpoints | 完成的检查点数 | |
| 检查点 | lastCheckpointDuration | 最近检查点耗时 |
| lastCheckpointSize | 最近检查点大小 | |
| checkpointAlignmentTime | 检查点对齐时间 | |
| 延迟 | currentOutputWatermark | 当前输出水位线 |
| numRecordsInPerSecond | 每秒输入记录数 | |
| numRecordsOutPerSecond | 每秒输出记录数 | |
| 背压 | backPressuredTimeMsPerSecond | 每秒背压时间 |
| busyTimeMsPerSecond | 每秒忙碌时间 |
日志配置
# log4j.properties
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender
appender.file.name = MainAppender
appender.file.type = FILE
appender.file.fileName = ${sys:log.file}
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
常见问题排查
1. 检查点失败
问题:Checkpoint expired before completing
原因:检查点超时
解决:
- 增加 checkpointTimeout
- 启用非对齐检查点
- 检查是否有背压
2. 内存溢出
问题:java.lang.OutOfMemoryError: Java heap space
原因:堆内存不足
解决:
- 增加 taskmanager.memory.task.heap.size
- 检查是否有状态泄漏
- 使用 RocksDB 状态后端
3. 数据倾斜
问题:部分子任务处理慢
原因:数据分布不均
解决:
- 重新设计 keyBy 的 key
- 使用预聚合减少数据量
- 增加并行度
最佳实践
1. 状态设计
- 最小化状态大小:只保存必要的数据
- 使用 TTL:自动清理过期状态
- 选择合适的状态后端:大状态使用 RocksDB
2. 时间处理
- 优先使用事件时间:保证结果可重现
- 合理设置水位线延迟:平衡延迟和正确性
- 处理迟到数据:使用侧输出流收集
3. 容错配置
// 生产环境推荐配置
env.enableCheckpointing(60000); // 1分钟
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10分钟
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 30秒
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.minutes(5), Time.seconds(10)));
4. 性能优化
- 合理设置并行度:与数据量和集群资源匹配
- 使用算子链:减少网络传输
- 避免数据倾斜:设计合理的分区键
- 使用异步 I/O:提高外部系统访问效率
5. 监控告警
- 监控检查点成功率和耗时
- 监控背压情况
- 监控消费延迟(Kafka Lag)
- 设置合理的告警阈值
Flink 2.0 迁移指南
从 Flink 1.x 升级到 Flink 2.0 需要注意以下破坏性变更。
API 迁移
DataSet API 迁移
DataSet API 已完全移除,需要迁移到 DataStream API 或 Table API/SQL:
// 旧版 DataSet API(已移除)
// DataSet<Tuple2<String, Integer>> result = env.fromElements(...)
// .groupBy(0)
// .sum(1);
// 迁移方案 1:使用 DataStream API(批执行模式)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<Tuple2<String, Integer>> result = env.fromElements(...)
.keyBy(t -> t.f0)
.sum(1);
// 迁移方案 2:使用 Table API/SQL
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.executeSql("CREATE TABLE ...");
Table result = tableEnv.sqlQuery("SELECT key, SUM(value) FROM ... GROUP BY key");
Source/Sink 迁移
SourceFunction 和 SinkFunction 已移除,需要迁移到新的 Source/Sink V2 API:
// 旧版 SourceFunction(已移除)
// public class MySource implements SourceFunction<Event> { ... }
// 新版 Source API
public class MySource implements Source<Event, MySplit, EnumMySplit> {
@Override
public SplitEnumerator<MySplit> createEnumerator(
SplitEnumeratorContext<MySplit> enumContext) {
// 创建枚举器
}
@Override
public SourceReader<Event, MySplit> createReader(
SourceReaderContext readerContext) {
// 创建读取器
}
}
Java 版本升级
Flink 2.0 不再支持 Java 8,最低要求 Java 11:
# 检查 Java 版本
java -version # 需要至少 Java 11
# 推荐使用 Java 17
# 支持 Java 21
配置文件迁移
旧版 flink-conf.yaml 已不再支持,需要迁移到新版 config.yaml:
# 使用迁移工具转换配置文件
bin/config-migration.sh --from flink-conf.yaml --to config.yaml
状态兼容性
重要:Flink 1.x 和 2.x 之间的状态不兼容,升级时需要:
- 使用保存点(Savepoint)导出 1.x 状态
- 升级到 2.x 后,从保存点恢复时可能需要调整状态描述符
连接器适配
由于 Source/Sink API 变更,连接器需要升级:
| 连接器 | 适配状态 |
|---|---|
| Kafka | Flink 2.0 发布后立即支持 |
| Paimon | Flink 2.0 发布后立即支持 |
| JDBC | Flink 2.0 发布后立即支持 |
| Elasticsearch | Flink 2.0 发布后立即支持 |
| 其他连接器 | 计划在 Flink 2.3 前完成迁移 |
DataStream V2 API(实验性)
Flink 2.0 引入了全新的 DataStream V2 API,旨在解决原 API 的设计问题。
设计目标
原 DataStream API 存在以下问题:
- 流批语义混合,难以清晰表达意图
- 扩展性受限,添加新功能需要大量改动
- 与 Table API 的互操作性不够顺畅
DataStream V2 通过重新设计 API 结构来解决这些问题。
核心 API
import org.apache.flink.api.common.v2.DataStream;
import org.apache.flink.api.common.v2.ProcessFunction;
import org.apache.flink.api.common.v2.Partitioning;
// 创建 DataStream
DataStream<Event> stream = env.fromSource(source, WatermarkStrategy.noWatermarks());
// 基本转换
DataStream<String> result = stream
.process(new MyProcessFunction())
.partition(Partitioning.forward())
.map(event -> event.toString());
// ProcessFunction
public class MyProcessFunction extends ProcessFunction<Event, String> {
private ValueState<Long> countState;
@Override
public void open(OpenContext openContext) {
// 初始化状态
countState = openContext.getState(
new ValueStateDescriptor<>("count", Long.class));
}
@Override
public void processElement(Event event, Context ctx, Collector<String> out) {
// 处理元素
Long count = countState.value();
if (count == null) count = 0L;
countState.update(count + 1);
out.collect("Processed: " + event.getId() + ", count: " + count);
}
}
高级扩展
DataStream V2 提供高级扩展,简化常见操作:
// 窗口扩展
DataStream<WindowResult> windowed = stream
.window(WindowStrategy.tumbling(Duration.ofMinutes(5)))
.aggregate(new MyAggregateFunction());
// Join 扩展
DataStream<JoinResult> joined = stream1
.join(stream2)
.where(event -> event.getKey())
.equalTo(event -> event.getKey())
.window(WindowStrategy.tumbling(Duration.ofMinutes(5)));
注意事项
DataStream V2 API 目前处于实验阶段,接口可能发生变化,不建议在生产环境使用。后续版本将逐步完善并稳定下来。
AI 集成
Flink 2.0 加强了与 AI 工作流的集成。
Flink CDC AI 模型调用
Flink CDC 3.3 支持在 Transform 表达式中动态调用 AI 模型:
-- 在 CDC 作业中调用 OpenAI 模型
CREATE TABLE orders (
id INT,
content STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'database-name' = 'mydb',
'table-name' = 'orders'
);
-- 使用 AI 模型进行实时分析
CREATE TABLE analyzed_orders WITH (
'connector' = 'kafka',
'topic' = 'analyzed-orders',
'properties.bootstrap.servers' = 'localhost:9092'
) AS SELECT
id,
content,
openai_chat('Analyze sentiment: ' || content) as sentiment,
openai_embedding(content) as embedding
FROM orders;
Flink SQL AI 语法
Flink SQL 引入了专门的 AI 模型语法:
-- 定义 AI 模型
CREATE AI MODEL my_embedding_model
WITH (
'provider' = 'openai',
'model' = 'text-embedding-ada-002'
);
-- 在 SQL 中调用 AI 模型
SELECT
id,
content,
AI_EMBEDDING(my_embedding_model, content) as embedding
FROM documents;
-- 使用 AI 函数
SELECT
id,
content,
AI_CHAT('openai', 'gpt-4',
'Summarize: ' || content) as summary
FROM articles;
实时 AI 应用场景
| 场景 | 说明 |
|---|---|
| 实时风险控制 | 实时分析交易内容,检测欺诈行为 |
| 个性化推荐 | 基于用户行为实时生成推荐 |
| 智能日志解析 | 使用 AI 模型解析和分类日志 |
| 语义搜索 | 实时生成文本嵌入向量,支持向量检索 |
小结
本章介绍了 Flink 的核心概念和使用方法:
- Flink 架构:JobManager、TaskManager、ResourceManager 协作
- DataStream API:Source、Transformation、Sink 三部分
- 窗口操作:滚动、滑动、会话、全局四种窗口类型
- 时间语义:事件时间、处理时间、摄入时间
- 状态管理:Keyed State、Operator State、广播状态、状态后端
- 容错机制:检查点原理、配置、增量检查点、非对齐检查点
- Flink SQL:连接器、窗口查询、Join 操作
- 部署模式:Session、Per-Job、Application 三种模式
- 性能调优:内存调优、状态后端优化、反压处理
Flink 2.0 重点特性:
- 解耦状态管理:ForSt 状态后端专为云原生设计
- 异步执行模型:支持非阻塞状态操作,提升吞吐量
- 物化表:统一流批处理,简化实时数仓开发
- DataStream V2:全新的 API 设计(实验性)
- AI 集成:支持在 SQL 和 CDC 中调用 AI 模型
版本选择建议:
| 场景 | 推荐版本 |
|---|---|
| 云原生环境、需要快速扩缩容 | Flink 2.0+ |
| 需要异步状态处理 | Flink 2.0+ |
| 稳定性优先 | Flink 1.18+ |
| 使用 DataSet API | Flink 1.x(需尽快迁移) |
Flink 是目前最强大的流处理引擎,特别适合对延迟和精确性要求高的实时场景。在实际项目中,Flink 常与 Kafka、HBase、Elasticsearch 等组件配合使用。随着 Flink 2.0 的发布,Flink 在云原生和 AI 时代的竞争力进一步增强。