跳到主要内容

Flink 流处理引擎

Apache Flink 是一个分布式流处理引擎,提供高吞吐、低延迟、精确一次语义的流处理能力。与 Spark 的微批处理不同,Flink 是真正的流处理引擎,每个事件到来时立即处理。

Flink 是 Apache 顶级项目,最初由德国柏林工业大学的研究项目 Stratosphere 发展而来。Flink 的核心设计理念是"流优先",将批处理视为流处理的特殊情况——有界流。

核心特点

  1. 真正的流处理:事件级别处理,毫秒级延迟
  2. 精确一次语义:保证数据不丢失、不重复
  3. 状态管理:支持有状态计算,状态可持久化
  4. 事件时间:支持基于事件时间的处理,正确处理乱序数据
  5. 高可用:支持故障恢复和状态回滚
维度Spark StreamingFlink
处理模式微批处理(Micro-batch)事件级流处理
延迟秒级毫秒级
时间语义处理时间为主事件时间为主
状态管理有限支持完善的状态管理
精确一次需要额外配置原生支持
窗口操作相对简单非常丰富
迭代计算支持有限原生支持迭代

为什么选择 Flink?

  • 低延迟要求:需要毫秒级响应的场景
  • 精确一次语义:金融交易、支付等不能丢失或重复的场景
  • 复杂事件处理:需要基于事件时间进行复杂模式匹配
  • 大规模状态:需要维护大量状态数据的场景
场景说明
实时 ETL数据清洗、转换、同步
实时监控系统监控、业务监控、异常检测
实时推荐个性化推荐、广告投放
实时风控欺诈检测、异常识别
实时报表实时大屏、指标统计
CEP 复杂事件模式匹配、事件序列检测
版本发布时间重要特性
Flink 1.02016DataStream API 稳定版
Flink 1.22017Side Outputs、Process Function
Flink 1.52018Flip-6 架构重构、新部署模型
Flink 1.92019Blink 合并、Hive 集成
Flink 1.112020CDC 支持、PyFlink 增强
Flink 1.132021状态 TTL、内存模型优化
Flink 1.152022SQL Gateway、Pulsar 连接器
Flink 1.182023Adaptive Scheduler、批执行模式改进
Flink 2.02025解耦状态管理、异步状态 API、DataStream V2

Flink 2.0 重要更新

Flink 2.0 是自 Flink 1.0 发布九年来的首个主要版本,包含 25 个 FLIP 和 369 个问题修复,标志着 Flink 进入云原生和 AI 时代的新篇章:

1. 解耦状态管理架构(Disaggregated State Management)

这是 Flink 2.0 最核心的架构变革,专为云原生环境设计:

  • ForSt 状态后端:全新的解耦状态后端,将状态存储从计算资源分离,使用分布式文件系统(DFS)作为主要存储介质
  • 解决云原生挑战
    • 解决容器化环境中本地磁盘受限的问题
    • 消除 Compaction 造成的资源使用峰值
    • 支持 TB 级状态作业的快速扩缩容
    • 实现轻量快速的检查点

2. 异步执行模型与异步状态 API

异步执行模型是 Flink 2.0 的核心创新之一,它将状态访问与计算解耦,实现非阻塞的状态操作。这对于使用 ForSt 等远程状态后端的场景尤为重要,可以显著提升吞吐量。

// 方式一:SQL 作业启用异步状态处理
// 在 Table 环境中设置配置
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.getConfig().set("table.exec.async-state.enabled", "true");

// 方式二:通过配置文件启用(config.yaml)
// table.exec.async-state.enabled: true

// 启用后,Flink 会自动将支持异步状态访问的 SQL 算子切换到异步模式
// 包括:Group Aggregation、Window Aggregation、Join 等有状态算子

异步状态 API 的核心特性

  • 乱序记录处理:解耦状态访问与计算,实现并行执行
  • 非阻塞检查点:检查点期间支持非阻塞状态操作,降低延迟
  • 语义保证:保持水位线传播、定时器处理、键顺序等核心保证
  • 透明启用:对于 SQL 作业,启用后无需修改代码即可获得性能提升

支持的异步 SQL 算子

算子类型是否支持异步说明
Group Aggregation分组聚合
Window Aggregation窗口聚合
Interval Join时间区间 Join
Temporal Join维表 Join
Deduplication去重
Top-NTop-N 查询

性能提升:在 Nexmark 基准测试中,11 个有状态查询完全兼容异步执行模型,对于重 I/O 的状态查询,吞吐量可提升 75%-120%

3. 物化表(Materialized Tables)

物化表是流批统一处理的核心,让用户通过单一管道管理实时和历史数据:

  • 查询修改支持:支持 Schema 和查询更新,无需重新处理历史数据
  • Kubernetes/YARN 提交:原生支持向生产集群提交刷新作业
  • Paimon 集成:与 Apache Paimon 深度集成,提供高性能 ACID 事务

4. 自适应批执行增强

  • 自适应广播 Join:运行时动态判断是否适合广播 Join
  • 自动 Join 倾斜优化:动态拆分倾斜分区,消除长尾延迟
  • 性能提升:TPC-DS 10TB 基准测试中性能提升 8-16%

5. AI 集成

  • Flink CDC 3.3:支持在 Transform 表达式中动态调用 AI 模型(OpenAI 等)
  • Flink SQL AI 语法:支持在 SQL 中定义和调用 AI 模型

6. DataStream V2 API(实验性)

全新的 DataStream API,解决原 API 的设计问题:

  • 提供底层构建块:DataStream、ProcessFunction、Partitioning
  • 提供高级扩展:Window、Join 等
  • 目前处于实验阶段,不建议生产使用

7. 破坏性变更(升级注意)

Flink 2.0 移除了多个已弃用的 API:

移除的 API迁移方案
DataSet API迁移到 DataStream API 或 Table API/SQL
Scala DataStream API迁移到 Java DataStream API
SourceFunction/SinkFunction迁移到 Source/Sink V2
TableSource/TableSink迁移到 DynamicTableSource/DynamicTableSink

8. 序列化改进

Flink 2.0 对序列化机制进行了重大优化:

  • 高效集合序列化器:为 Map、List、Set 等集合类型引入了更高效的内置序列化器,默认启用,显著提升性能
  • Kryo 升级到 5.6:更快的序列化速度,更好的内存效率,对新版本 Java 有更好的支持
// 集合类型序列化已自动优化,无需额外配置
// 对于自定义类型,仍可使用 Kryo 序列化
env.getConfig().enableForceKryo();
env.getConfig().registerKryoType(MyCustomClass.class);

运行环境要求

  • 最低 Java 版本:Java 11(不再支持 Java 8)
  • 推荐 Java 版本:Java 17(默认)
  • 支持 Java 21

Docker 镜像默认使用 Java 17,从源码构建时也默认使用 Java 17。

系统架构

Flink 采用主从架构,主要组件包括:

JobManager

JobManager 是 Flink 的主节点,负责协调分布式执行。它包含以下组件:

  • Dispatcher:接收作业提交,负责启动新的 JobMaster
  • ResourceManager:负责资源管理,申请和释放 TaskManager
  • JobMaster:管理单个作业的执行,负责调度、检查点协调、故障恢复

JobManager 的核心职责

  1. 作业调度:将作业分解为任务并调度执行
  2. 资源管理:向 ResourceManager 申请资源
  3. 检查点协调:协调检查点的生成
  4. 故障恢复:处理任务失败和恢复

TaskManager

TaskManager 是 Flink 的工作节点,负责执行具体的计算任务:

  • 任务执行:执行具体的计算任务
  • 数据交换:与其他 TaskManager 交换数据
  • 状态存储:存储和管理状态数据
  • 资源汇报:向 JobManager 汇报状态

每个 TaskManager 包含多个 Slot,每个 Slot 可以执行一个任务链。

ResourceManager

ResourceManager 负责资源管理:

  • 资源分配:为作业分配 TaskManager 资源
  • 资源回收:回收空闲资源
  • 资源适配:支持 YARN、Kubernetes、Mesos 等资源管理器

并行度与槽位

并行度(Parallelism)

并行度决定了一个算子同时有多少个任务实例执行:

// 设置全局并行度
env.setParallelism(4);

// 设置单个算子并行度
dataStream.map(...).setParallelism(2);

// 设置默认并行度(配置文件)
// parallelism.default: 4

并行度选择建议

  • 通常设置为 TaskManager 总 Slot 数的倍数
  • 每个 Slot 处理的数据量应适中,避免过大导致内存问题
  • 考虑数据倾斜情况,适当增加并行度

槽位(Slot)

Slot 是 TaskManager 的资源单位,每个 Slot 可以执行一个任务链:

  • 资源隔离:不同 Slot 之间内存隔离
  • 任务共享:同一作业的任务可以共享 Slot(Slot Sharing Group)
TaskManager(4 Slots)
├── Slot 1: Source → Map → Filter
├── Slot 2: Source → Map → Filter
├── Slot 3: KeyBy → Window → Sink
└── Slot 4: KeyBy → Window → Sink

Slot 共享的优势

  • 减少数据传输开销(同一 Slot 内的任务可以直接传递数据)
  • 提高资源利用率

作业执行流程

Flink 作业的执行流程分为几个阶段:

用户代码 → StreamGraph → JobGraph → ExecutionGraph → 物理执行
  1. StreamGraph:逻辑执行计划,表示数据流的拓扑结构
  2. JobGraph:优化后的执行计划,将多个算子链接成算子链
  3. ExecutionGraph:并行化的执行计划,包含任务的并行实例
  4. 物理执行:在 TaskManager 上实际执行

算子链(Operator Chain)

Flink 会尽可能将多个算子链接在一起,减少线程切换和网络传输:

// 禁用算子链
stream.map(...).disableChaining();

// 开始新的算子链
stream.map(...).startNewChain();

DataStream API

DataStream API 是 Flink 流处理的核心 API,提供了丰富的算子操作。

程序结构

一个完整的 Flink 程序包含以下结构:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;

public class FlinkJob {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 配置环境(可选)
env.setParallelism(4);
env.enableCheckpointing(60000);

// 3. 添加数据源
DataStream<String> source = env.socketTextStream("localhost", 9999);

// 4. 数据转换
DataStream<String> transformed = source
.flatMap(new MyFlatMapFunction())
.keyBy(value -> value.f0)
.sum(1);

// 5. 输出结果
transformed.print();

// 6. 执行程序
env.execute("My Flink Job");
}
}

重要提示:Flink 程序是懒执行的,只有调用 execute() 方法时才会真正执行。

数据源(Source)

Flink 支持多种内置数据源,也可以自定义数据源。

内置数据源

// 从集合读取
DataStream<String> fromCollection = env.fromCollection(
Arrays.asList("a", "b", "c")
);

// 从元素创建
DataStream<String> fromElements = env.fromElements("a", "b", "c");

// 从文件读取
DataStream<String> fromFile = env.readTextFile("/path/to/file");

// 从 Socket 读取
DataStream<String> fromSocket = env.socketTextStream("localhost", 9999);

Kafka 连接器

Kafka 是最常用的消息队列,Flink 提供了完善的 Kafka 连接器:

import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;

// 创建 Kafka Source(推荐方式,Flink 1.14+)
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

DataStream<String> stream = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");

Kafka Source 配置选项

KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("broker1:9092,broker2:9092")
.setTopics("topic1", "topic2") // 订阅多个主题
.setTopics(Pattern.compile("topic-.*")) // 正则匹配主题
.setGroupId("my-consumer-group")
// 起始位置配置
.setStartingOffsets(OffsetsInitializer.earliest()) // 从最早开始
.setStartingOffsets(OffsetsInitializer.latest()) // 从最新开始
.setStartingOffsets(OffsetsInitializer.timestamp(123456789L)) // 从时间戳开始
// 消费者属性
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperty("partition.discovery.interval.ms", "30000") // 分区发现
.build();

Kafka Sink

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;

KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();

stream.sinkTo(sink);

精确一次写入 Kafka

KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(...)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("my-transaction")
.setProperty("transaction.timeout.ms", "900000") // 15分钟
.build();

JDBC 连接器

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;

// JDBC Sink
stream.addSink(JdbcSink.sink(
"INSERT INTO users (id, name, age) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name=?, age=?",
(ps, record) -> {
ps.setInt(1, record.getId());
ps.setString(2, record.getName());
ps.setInt(3, record.getAge());
ps.setString(4, record.getName());
ps.setInt(5, record.getAge());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/mydb")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("password")
.build()
));

文件 Sink

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.AvroParquetWriterFactory;

// 文件 Sink
FileSink<String> fileSink = FileSink
.forRowFormat(new Path("/output/path"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(60000) // 滚动间隔
.withInactivityInterval(30000) // 不活跃间隔
.withMaxPartSize(128 * 1024 * 1024) // 文件大小
.build()
)
.build();

stream.sinkTo(fileSink);

自定义 Source

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

// 非并行 Source
public class MySource implements SourceFunction<Event> {
private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (isRunning) {
// 生成数据
Event event = generateEvent();
ctx.collect(event);
Thread.sleep(100);
}
}

@Override
public void cancel() {
isRunning = false;
}
}

// 并行 Source
public class MyParallelSource implements ParallelSourceFunction<Event> {
// 实现同上,但可以并行执行
}

// 使用
DataStream<Event> stream = env.addSource(new MySource());
DataStream<Event> parallelStream = env.addSource(new MyParallelSource()).setParallelism(4);

转换操作(Transformation)

基本转换

// map:一对一转换
DataStream<Integer> mapped = stream.map(x -> x * 2);

// 使用 RichMapFunction(可以获取运行时上下文)
DataStream<Integer> richMapped = stream.map(new RichMapFunction<String, Integer>() {
@Override
public void open(Configuration parameters) {
// 初始化资源
}

@Override
public Integer map(String value) {
// 可以获取状态、计数器等
return value.length();
}

@Override
public void close() {
// 释放资源
}
});

// filter:过滤
DataStream<Integer> filtered = stream.filter(x -> x > 0);

// flatMap:一对多转换
DataStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) {
for (String word : value.split(" ")) {
out.collect(word);
}
}
});

KeyedStream 操作

keyBy 将流按键分组,相同 Key 的数据会被路由到同一个并行实例:

// keyBy 方式
KeyedStream<Event, String> keyed = stream.keyBy(event -> event.getUserId());

// 使用 KeySelector
KeyedStream<Event, String> keyed = stream.keyBy(
new KeySelector<Event, String>() {
@Override
public String getKey(Event event) {
return event.getUserId();
}
}
);

// 使用字段位置(Tuple 类型)
KeyedStream<Tuple2<String, Integer>, String> keyed = tupleStream.keyBy(0);

// 使用字段名称(POJO 类型)
KeyedStream<Event, String> keyed = stream.keyBy("userId");

KeyedStream 聚合操作

// sum:求和
DataStream<Tuple2<String, Integer>> sumResult = keyed.sum(1);

// min/max:最小/最大值
DataStream<Tuple2<String, Integer>> minResult = keyed.min(1);

// minBy/maxBy:返回最小/最大值对应的完整记录
DataStream<Tuple2<String, Integer>> minByResult = keyed.minBy(1);

// reduce:自定义聚合
DataStream<Event> reduced = keyed.reduce((a, b) -> {
a.setCount(a.getCount() + b.getCount());
return a;
});

多流转换

// union:合并多个流(类型必须相同)
DataStream<String> unioned = stream1.union(stream2, stream3);

// connect:连接两个流(可以不同类型)
ConnectedStreams<String, Integer> connected = stream1.connect(stream2);

// CoMap:对连接流分别处理
DataStream<String> coMapped = connected
.map(new CoMapFunction<String, Integer, String>() {
@Override
public String map1(String value) {
return "String: " + value;
}

@Override
public String map2(Integer value) {
return "Integer: " + value;
}
});

// CoFlatMap:对连接流分别进行 flatMap
DataStream<String> coFlatMapped = connected
.flatMap(new CoFlatMapFunction<String, Integer, String>() {
@Override
public void flatMap1(String value, Collector<String> out) {
out.collect("From stream1: " + value);
}

@Override
public void flatMap2(Integer value, Collector<String> out) {
out.collect("From stream2: " + value);
}
});

Interval Join

时间区间内的 Join,适用于两个流的事件在特定时间范围内关联:

// 两个流按键分组
KeyedStream<Event1, String> stream1 = ...;
KeyedStream<Event2, String> stream2 = ...;

// Interval Join
DataStream<String> joined = stream1
.intervalJoin(stream2)
.between(Time.seconds(-5), Time.seconds(5)) // 时间范围
.process(new ProcessJoinFunction<Event1, Event2, String>() {
@Override
public void processElement(Event1 left, Event2 right, Context ctx, Collector<String> out) {
out.collect(left.getId() + " - " + right.getId());
}
});

ProcessFunction

ProcessFunction 是 Flink 中最强大的处理函数,可以访问状态、定时器和侧输出流。

基本 ProcessFunction

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

// 基本用法
DataStream<String> processed = stream.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) {
// 正常输出
out.collect(value.toString());

// 获取当前处理时间
long processingTime = ctx.timerService().currentProcessingTime();

// 获取当前事件时间
long eventTime = ctx.timerService().currentWatermark();
}
});

KeyedProcessFunction

KeyedProcessFunction 用于 KeyedStream,可以访问 Keyed State 和注册定时器:

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;

public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Event, Result> {

private ValueState<Long> countState;
private ValueState<Long> timerState;

@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));
timerState = getRuntimeContext().getState(
new ValueStateDescriptor<>("timer", Long.class));
}

@Override
public void processElement(Event value, Context ctx, Collector<Result> out) throws Exception {
// 更新计数
Long count = countState.value();
if (count == null) {
count = 0L;
}
count++;
countState.update(count);

// 注册处理时间定时器(10秒后触发)
long timer = ctx.timerService().currentProcessingTime() + 10000;
ctx.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);

// 注册事件时间定时器
// ctx.timerService().registerEventTimeTimer(targetTime);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
// 定时器触发时调用
Long count = countState.value();
out.collect(new Result(ctx.getCurrentKey(), count));

// 清除状态
countState.clear();
timerState.clear();
}

@Override
public void close() {
// 释放资源
}
}

// 使用
DataStream<Result> result = keyedStream.process(new MyKeyedProcessFunction());

定时器(Timer)

定时器是 Flink 实现复杂事件处理的关键机制:

public class TimerExample extends KeyedProcessFunction<String, Event, String> {

private ValueState<Long> lastModified;

@Override
public void open(Configuration parameters) {
lastModified = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastModified", Long.class));
}

@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
// 更新最后修改时间
lastModified.update(ctx.timerService().currentProcessingTime());

// 注册一个1分钟后的定时器
ctx.timerService().registerProcessingTimeTimer(
ctx.timerService().currentProcessingTime() + 60000);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 检查是否已经1分钟没有更新
Long last = lastModified.value();
if (last != null && timestamp - last >= 60000) {
out.collect("Key " + ctx.getCurrentKey() + " has been idle for 1 minute");
lastModified.clear();
}
}
}

侧输出流(Side Output)

侧输出流用于将数据分流到不同的输出:

import org.apache.flink.util.OutputTag;

// 定义侧输出标签
OutputTag<Event> lateData = new OutputTag<Event>("late-data") {};
OutputTag<String> alerts = new OutputTag<String>("alerts") {};

// 主处理逻辑
DataStream<Event> mainStream = stream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event value, Context ctx, Collector<Event> out) {
if (value.isLate()) {
// 发送到侧输出流
ctx.output(lateData, value);
} else if (value.needsAlert()) {
ctx.output(alerts, "Alert: " + value);
} else {
// 正常输出
out.collect(value);
}
}
});

// 获取侧输出流
DataStream<Event> lateStream = mainStream.getSideOutput(lateData);
DataStream<String> alertStream = mainStream.getSideOutput(alerts);

侧输出流应用场景

  • 迟到数据处理
  • 异常数据分流
  • 告警数据输出
  • 调试和监控

数据输出(Sink)

// 输出到控制台
stream.print();
stream.printToErr(); // 输出到标准错误

// 输出到文件
stream.writeAsText("/path/to/output");
stream.writeAsCsv("/path/to/output");

// 输出到 Socket
stream.writeToSocket("localhost", 9999, new SimpleStringSchema());

// 输出到 Kafka(见上文 Kafka Sink)

// 输出到数据库(见上文 JDBC 连接器)

// 自定义 Sink
stream.addSink(new MySinkFunction());

窗口操作

窗口是 Flink 处理无界流的核心概念,将无限的数据流划分为有限的数据块。

窗口类型

滚动窗口(Tumbling Window)

滚动窗口大小固定,窗口之间不重叠:

// 滚动事件时间窗口,大小为5秒
DataStream<T> result = stream
.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("count");

// 滚动处理时间窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

// 带偏移量的滚动窗口(窗口从整点前1小时开始)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.hours(-1)))

滑动窗口(Sliding Window)

滑动窗口大小固定,窗口之间有重叠:

// 滑动窗口,大小10秒,滑动步长5秒
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

// 处理时间滑动窗口
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

会话窗口(Session Window)

会话窗口根据数据活跃度划分,无数据时窗口关闭:

// 会话窗口,间隔10秒
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))

// 动态间隔会话窗口
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Event>() {
@Override
public long extract(Event element) {
// 根据数据动态确定间隔
return element.getSessionTimeout();
}
}))

全局窗口(Global Window)

全局窗口包含所有数据,需要自定义触发器:

.window(GlobalWindows.create())
.trigger(new MyTrigger())

窗口函数

增量聚合函数

增量聚合函数在数据到达时立即聚合,效率更高:

// reduce
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((a, b) -> {
a.setCount(a.getCount() + b.getCount());
return a;
});

// aggregate
.aggregate(new AggregateFunction<Event, Long, Long>() {
@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(Event value, Long acc) {
return acc + 1;
}

@Override
public Long getResult(Long acc) {
return acc;
}

@Override
public Long merge(Long a, Long b) {
return a + b;
}
});

全窗口函数

全窗口函数可以访问窗口内的所有元素:

// process:可以获取窗口上下文信息
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
@Override
public void process(String key, Context ctx, Iterable<Event> events, Collector<Result> out) {
long count = 0;
for (Event e : events) {
count++;
}

// 获取窗口信息
long start = ctx.window().getStart();
long end = ctx.window().getEnd();

out.collect(new Result(key, count, start, end));
}
});

增量聚合 + 全窗口函数

可以结合两种方式,既高效又灵活:

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(
// 增量聚合
new AggregateFunction<Event, Long, Long>() {
@Override
public Long createAccumulator() { return 0L; }
@Override
public Long add(Event value, Long acc) { return acc + value.getAmount(); }
@Override
public Long getResult(Long acc) { return acc; }
@Override
public Long merge(Long a, Long b) { return a + b; }
},
// 全窗口函数
new ProcessWindowFunction<Long, Result, String, TimeWindow>() {
@Override
public void process(String key, Context ctx, Iterable<Long> values, Collector<Result> out) {
Long total = values.iterator().next();
out.collect(new Result(key, total, ctx.window().getStart(), ctx.window().getEnd()));
}
}
);

触发器(Trigger)

触发器决定何时触发窗口计算:

import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;

public class MyTrigger extends Trigger<Event, TimeWindow> {

@Override
public TriggerResult onElement(Event element, long timestamp, TimeWindow window, TriggerContext ctx) {
// 每个元素到达时触发
return TriggerResult.FIRE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
// 处理时间定时器触发
return TriggerResult.FIRE;
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
// 事件时间定时器触发
return TriggerResult.FIRE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) {
// 清除状态
}
}

// 使用自定义触发器
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(new MyTrigger())

TriggerResult 类型

结果说明
CONTINUE不做任何操作
FIRE触发计算,保留窗口数据
PURGE清除窗口数据
FIRE_AND_PURGE触发计算并清除窗口数据

移除器(Evictor)

移除器可以在触发后、窗口函数执行前移除元素:

import org.apache.flink.streaming.api.windowing.evictors.Evictor;

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.evictor(new MyEvictor())

CEP 复杂事件处理

Flink CEP(Complex Event Processing)是 Flink 提供的复杂事件处理库,用于在无界流中检测事件模式。CEP 允许你定义复杂的事件模式,并在数据流中查找匹配的序列。

CEP 核心概念

事件模式

CEP 的核心是定义事件模式,描述你想要检测的事件序列:

  • 个体模式:匹配单个事件
  • 组合模式:多个个体模式组合成完整模式
  • 模式组:模式内的嵌套模式

模式类型

模式类型说明示例
单例模式匹配单个事件Pattern.begin("start")
循环模式匹配多个事件Pattern.times(3)
可选模式事件可能不出现Pattern.optional()

定义模式

基本模式定义

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;

// 定义简单模式:连续三个温度超过阈值的事件
Pattern<TemperatureEvent, ?> pattern = Pattern
.<TemperatureEvent>begin("start")
.where(SimpleCondition.of(event -> event.getTemperature() > 30))
.times(3)
.consecutive(); // 要求连续

// 应用模式到数据流
PatternStream<TemperatureEvent> patternStream = CEP.pattern(
temperatureStream,
pattern
);

// 处理匹配结果
DataStream<Alert> alerts = patternStream.select(
new PatternSelectFunction<TemperatureEvent, Alert>() {
@Override
public Alert select(Map<String, List<TemperatureEvent>> pattern) throws Exception {
List<TemperatureEvent> events = pattern.get("start");
return new Alert("Temperature too high for 3 consecutive times", events);
}
}
);

组合模式

组合模式将多个个体模式连接成完整的模式序列:

// 定义组合模式:登录失败后成功的序列
Pattern<LoginEvent, ?> loginPattern = Pattern
// 开始:登录失败
.<LoginEvent>begin("fail")
.where(SimpleCondition.of(event -> event.getType().equals("fail")))
.times(3) // 连续失败3次
.consecutive()
// 接着:登录成功
.followedBy("success")
.where(SimpleCondition.of(event -> event.getType().equals("success")))
// 时间限制:10秒内完成整个序列
.within(Time.seconds(10));

// 处理匹配结果
DataStream<SecurityAlert> securityAlerts = CEP.pattern(loginStream, loginPattern)
.select(new PatternSelectFunction<LoginEvent, SecurityAlert>() {
@Override
public SecurityAlert select(Map<String, List<LoginEvent>> pattern) {
List<LoginEvent> fails = pattern.get("fail");
LoginEvent success = pattern.get("success").get(0);
return new SecurityAlert(
"Suspicious login: 3 fails followed by success",
fails.get(0).getUserId(),
success.getTimestamp()
);
}
});

模式连接方式

连接方式说明使用场景
next()严格连续,中间不能有其他事件紧密相关的事件序列
followedBy()非严格连续,中间可以有其他事件不严格要求连续
followedByAny()非确定性松散连续多种可能的匹配路径
notNext()严格连续的不匹配排除某些模式
notFollowedBy()非严格连续的不匹配排除某些模式
// 严格连续:A 紧接着 B
Pattern.begin("A").where(...).next("B").where(...)

// 非严格连续:A 之后有 B(中间可以隔其他事件)
Pattern.begin("A").where(...).followedBy("B").where(...)

// 排除模式:A 之后不能紧接 C
Pattern.begin("A").where(...).notNext("C").where(...)

条件定义

import org.apache.flink.cep.pattern.conditions.IterativeCondition;

// 简单条件
.where(SimpleCondition.of(event -> event.getValue() > 100))

// 迭代条件(可以访问之前匹配的事件)
.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
// 获取之前匹配的事件
Iterable<Event> previousEvents = ctx.getEventsForPattern("start");
for (Event e : previousEvents) {
if (value.getValue() > e.getValue() * 2) {
return true;
}
}
return false;
}
})

// 组合条件
.where(condition1)
.or(condition2)
.and(condition3)
.until(stopCondition) // 直到满足停止条件

循环模式

// 匹配恰好 3 次
Pattern.begin("start").times(3)

// 匹配 2 到 4 次
Pattern.begin("start").times(2, 4)

// 匹配 1 次或多次
Pattern.begin("start").oneOrMore()

// 匹配 0 次或多次
Pattern.begin("start").timesOrMore(0)

// 可选(0 次或 1 次)
Pattern.begin("start").optional()

// 贪婪匹配(尽可能多匹配)
Pattern.begin("start").oneOrMore().greedy()

// 连续性约束
Pattern.begin("start").times(3).consecutive() // 严格连续
Pattern.begin("start").times(3).allowCombinations() // 允许非连续

处理匹配结果

select 方法

select 方法用于处理成功匹配的模式:

// 使用 PatternSelectFunction
DataStream<Result> results = patternStream.select(
new PatternSelectFunction<Event, Result>() {
@Override
public Result select(Map<String, List<Event>> pattern) throws Exception {
// pattern 是一个 Map,key 是模式名称,value 是匹配的事件列表
Event start = pattern.get("start").get(0);
List<Event> middle = pattern.get("middle");
Event end = pattern.get("end").get(0);

return new Result(start, middle, end);
}
}
);

// 使用 Lambda 表达式(Flink 1.11+)
DataStream<Result> results = patternStream.select(pattern -> {
Event start = pattern.get("start").get(0);
List<Event> events = pattern.get("middle");
return new Result(start, events);
});

flatSelect 方法

flatSelect 可以输出多条结果:

DataStream<Result> results = patternStream.flatSelect(
new PatternFlatSelectFunction<Event, Result>() {
@Override
public void flatSelect(Map<String, List<Event>> pattern,
Collector<Result> out) throws Exception {
// 可以输出多条结果
for (Event event : pattern.get("events")) {
out.collect(new Result(event));
}
}
}
);

处理超时事件

当模式设置了时间限制,未完成匹配的事件序列会触发超时:

import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;

// 定义带超时处理的函数
class MyPatternProcessFunction extends PatternProcessFunction<Event, Result>
implements TimedOutPartialMatchHandler<Event> {

@Override
public void processMatch(Map<String, List<Event>> match,
Context ctx, Collector<Result> out) throws Exception {
// 处理成功匹配
out.collect(new Result(match));
}

@Override
public void processTimedOutMatch(Map<String, List<Event>> match,
Context ctx) throws Exception {
// 处理超时的部分匹配
// 可以输出到侧输出流或执行其他操作
ctx.output(new OutputTag<Event>("timeout"){}, match.get("start").get(0));
}
}

// 应用带超时处理的函数
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Result> results = patternStream.process(new MyPatternProcessFunction());

CEP 应用示例

实时欺诈检测

// 检测短时间内大额交易
Pattern<Transaction, ?> fraudPattern = Pattern
.<Transaction>begin("small")
.where(SimpleCondition.of(t -> t.getAmount() < 10))
.followedBy("large")
.where(SimpleCondition.of(t -> t.getAmount() > 10000))
.within(Time.minutes(5));

DataStream<FraudAlert> fraudAlerts = CEP.pattern(transactions, fraudPattern)
.select(pattern -> {
Transaction small = pattern.get("small").get(0);
Transaction large = pattern.get("large").get(0);
return new FraudAlert(
"Suspicious pattern: small then large transaction",
small.getAccountId(),
large.getTimestamp()
);
});

用户行为分析

// 检测用户浏览-收藏-购买路径
Pattern<UserAction, ?> purchasePattern = Pattern
.<UserAction>begin("view")
.where(SimpleCondition.of(a -> a.getType().equals("view")))
.followedBy("favorite")
.where(SimpleCondition.of(a -> a.getType().equals("favorite")))
.followedBy("purchase")
.where(SimpleCondition.of(a -> a.getType().equals("purchase")))
.within(Time.hours(1));

DataStream<ConversionResult> conversions = CEP.pattern(userActions, purchasePattern)
.select(pattern -> {
UserAction view = pattern.get("view").get(0);
UserAction favorite = pattern.get("favorite").get(0);
UserAction purchase = pattern.get("purchase").get(0);

return new ConversionResult(
view.getUserId(),
view.getProductId(),
view.getTimestamp(),
purchase.getTimestamp()
);
});

CEP 最佳实践

  1. 合理设置时间限制:使用 within() 限制模式匹配时间,避免状态无限增长
  2. 优化条件判断:条件函数应尽量简单高效,避免复杂计算
  3. 处理超时事件:对于有时间限制的模式,考虑超时事件的处理
  4. 控制状态大小:复杂模式会产生大量状态,注意监控和调优
  5. 使用侧输出流:对于超时或不完整匹配,使用侧输出流收集

时间语义

Flink 支持三种时间语义,正确理解和使用时间对于流处理至关重要。

处理时间(Processing Time)

处理时间是事件被处理时的机器时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// 或在窗口中指定
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

特点

  • 简单,无需考虑事件时间
  • 性能最好
  • 结果不可重现

适用场景

  • 对时间精度要求不高
  • 不需要处理迟到数据
  • 简单的实时监控

事件时间(Event Time)

事件时间是事件实际发生的时间,需要从数据中提取:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 提取时间戳和水位线
DataStream<Event> withTimestamps = stream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);

特点

  • 结果可重现
  • 正确处理乱序数据
  • 需要处理水位线和迟到数据

摄入时间(Ingestion Time)

摄入时间是事件进入 Flink 系统的时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

水位线(Watermark)

水位线用于衡量事件时间的进度,处理乱序数据。

水位线概念

水位线是一个时间戳,表示"到此时间戳为止的数据已经全部到达"。水位线是单调递增的,当水位线 W 到达某个算子时,该算子可以确认所有时间戳小于 W 的事件都已经处理完毕。

事件流:  E1(10:00) E2(10:02) E3(10:01) E4(10:05) E5(10:03)
水位线: W(10:00) W(10:02) W(10:04)

水位线策略

// 有界乱序水位线(允许一定程度的乱序)
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())

// 单调递增水位线(无乱序)
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())

// 自定义水位线生成器
WatermarkStrategy.<Event>forGenerator(ctx -> new MyWatermarkGenerator())
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())

处理空闲源

当某个分区没有数据时,水位线可能无法前进:

WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(1)) // 1分钟无数据则标记为空闲

多流的水位线对齐

当有多个输入流时,Flink 会取所有流的最小水位线作为当前水位线:

Stream1: W(10:00) → W(10:02) → W(10:04)
Stream2: W(10:01) → W(10:03) → W(10:03)
Result: W(10:00) → W(10:02) → W(10:03) // 取最小值

迟到数据处理

即使有了水位线,仍可能有迟到数据:

// 方式1:侧输出流收集迟到数据
OutputTag<Event> lateData = new OutputTag<Event>("late-data") {};

DataStream<Result> result = stream
.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Duration.ofSeconds(10)) // 允许迟到10秒
.sideOutputLateData(lateData) // 超过允许时间的迟到数据
.aggregate(new MyAggregateFunction());

// 获取迟到数据
DataStream<Event> lateStream = result.getSideOutput(lateData);

// 方式2:直接丢弃(默认行为)

状态管理

状态管理是 Flink 的核心特性,支持有状态计算。

状态类型

Keyed State

Keyed State 绑定到每个 Key 上,只能用于 KeyedStream:

public class CountFunction extends KeyedProcessFunction<String, Event, Result> {

// ValueState:单个值
private ValueState<Long> countState;

// ListState:列表
private ListState<String> listState;

// MapState:映射
private MapState<String, Integer> mapState;

// ReducingState:聚合状态
private ReducingState<Long> reducingState;

// AggregatingState:更灵活的聚合状态
private AggregatingState<Event, Long> aggregatingState;

@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));

listState = getRuntimeContext().getListState(
new ListStateDescriptor<>("list", String.class));

mapState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("map", String.class, Integer.class));
}

@Override
public void processElement(Event value, Context ctx, Collector<Result> out) throws Exception {
// ValueState 操作
Long count = countState.value();
if (count == null) {
count = 0L;
}
count++;
countState.update(count);

// ListState 操作
listState.add(value.getId());
Iterable<String> allItems = listState.get();

// MapState 操作
mapState.put(value.getId(), value.getCount());
Integer mapValue = mapState.get(value.getId());
boolean contains = mapState.contains(value.getId());

// 清除状态
// countState.clear();
// listState.clear();
// mapState.clear();

out.collect(new Result(value.getKey(), count));
}
}

Operator State

Operator State 绑定到每个算子实例上:

public class MySource implements SourceFunction<Event>, CheckpointedFunction {

private ListState<Event> state;
private List<Event> buffered = new ArrayList<>();

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("state", Event.class));

// 从检查点恢复
if (context.isRestored()) {
for (Event e : state.get()) {
buffered.add(e);
}
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.clear();
state.addAll(buffered);
}

@Override
public void run(SourceContext<Event> ctx) throws Exception {
// 发送数据
}

@Override
public void cancel() {}
}

广播状态(Broadcast State)

广播状态用于将配置或规则广播到所有并行实例:

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;

// 定义广播状态描述符
MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>("rules", String.class, Rule.class);

// 广播规则流
DataStream<Rule> rules = ...;
BroadcastStream<Rule> ruleBroadcastStream = rules.broadcast(ruleStateDescriptor);

// 连接数据流和广播流
DataStream<Event> events = ...;
DataStream<Result> result = events
.connect(ruleBroadcastStream)
.process(new BroadcastProcessFunction<Event, Rule, Result>() {

@Override
public void processElement(Event event, ReadOnlyContext ctx, Collector<Result> out) throws Exception {
// 读取广播状态(只读)
ReadOnlyBroadcastState<String, Rule> rules =
ctx.getBroadcastState(ruleStateDescriptor);

Rule rule = rules.get(event.getRuleId());
if (rule != null) {
// 应用规则
out.collect(applyRule(event, rule));
}
}

@Override
public void processBroadcastElement(Rule rule, Context ctx, Collector<Result> out) throws Exception {
// 更新广播状态
BroadcastState<String, Rule> rules = ctx.getBroadcastState(ruleStateDescriptor);
rules.put(rule.getId(), rule);
}
});

状态后端

状态后端决定状态的存储方式:

// HashMapStateBackend:状态存内存(默认)
env.setStateBackend(new HashMapStateBackend());

// EmbeddedRocksDBStateBackend:状态存RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend());

// 配置检查点目录
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");

状态后端对比

特性HashMapStateBackendEmbeddedRocksDBStateBackend
存储位置JVM 堆内存本地磁盘(RocksDB)
状态大小受限于内存可超过内存
访问延迟微秒级毫秒级
适用场景小状态、低延迟大状态、生产环境

ForSt 状态后端(Flink 2.0+)

ForSt(For Streaming)是 Flink 2.0 引入的全新解耦状态后端,专为云原生环境设计。它将状态存储从计算资源分离,使用分布式文件系统(DFS)作为主要存储介质,解决了容器化环境中本地磁盘受限、扩缩容慢、检查点重等问题。

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;

// 通过配置启用 ForSt 状态后端
Configuration config = new Configuration();
config.setString("state.backend.type", "forst");
config.setString("state.checkpoints.dir", "hdfs:///flink/checkpoints");
// ForSt 远程存储路径
config.setString("state.forst.remote-storage", "hdfs:///flink/forst-state");
// 本地缓存大小(用于加速状态访问)
config.setString("state.forst.local-cache-size", "1gb");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

配置文件方式(config.yaml)

# 状态后端配置
state:
backend:
type: forst
# ForSt 远程存储路径(使用分布式文件系统)
forst:
remote-storage: hdfs:///flink/forst-state
# 本地缓存大小(用于加速状态访问)
local-cache-size: 1gb
checkpoints:
dir: hdfs:///flink/checkpoints

# 检查点配置
execution:
checkpointing:
interval: 60000
mode: EXACTLY_ONCE
timeout: 600000
min-pause: 30000

# 启用异步状态处理(与 ForSt 配合使用,SQL 作业)
# 对于 SQL 作业,设置以下参数启用异步状态 API
table:
exec:
async-state:
enabled: true

ForSt 核心特性

特性说明
解耦存储状态存储在分布式文件系统,不受容器本地磁盘限制
快速扩缩容TB 级状态作业可快速恢复和扩缩容,无需完整状态迁移
轻量检查点原生支持轻量快速的检查点,检查点大小不随状态线性增长
异步执行与异步执行模型配合,避免状态访问阻塞计算
本地缓存支持本地缓存热数据,平衡延迟和成本

ForSt 与 RocksDB 对比

维度RocksDBForSt
存储位置本地磁盘分布式文件系统
扩缩容需要完整状态迁移快速恢复(秒级)
云原生适配受容器磁盘限制完全适配 K8s 环境
检查点开销较大(需完整快照或增量)轻量(状态已在 DFS)
状态大小限制受本地磁盘限制可达 TB 级别
适用场景传统集群云原生环境(K8s)

ForSt 适用场景

  • 云原生环境:Kubernetes 部署,容器本地磁盘有限
  • 大状态作业:状态达到数百 GB 或 TB 级别
  • 需要快速扩缩容:业务高峰期需要快速扩容
  • 降低运维成本:无需单独配置本地 SSD 存储

RocksDB 配置优化

对于使用 RocksDB 的生产作业,以下配置可以显著提升性能:

import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.rocksdb.Options;

// 创建 RocksDB 状态后端并启用增量检查点
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);

// 设置本地存储目录(建议使用 SSD)
backend.setLocalRecoveryDirectory(new Path("/mnt/ssd/flink-state"));

// 通过配置参数优化 RocksDB
Configuration config = new Configuration();
// 块缓存大小,影响读性能
config.setString("state.backend.rocksdb.block.cache-size", "256m");
// 写缓冲区大小,影响写性能
config.setString("state.backend.rocksdb.writebuffer.size", "64m");
// 写缓冲区数量
config.setString("state.backend.rocksdb.writebuffer.count", "4");
// 后台线程数(建议根据 CPU 核数调整)
config.setString("state.backend.rocksdb.thread.num", "8");
// 启用 Bloom 过滤器,提升读取性能
config.setString("state.backend.rocksdb.use-bloom-filter", "true");

env.setStateBackend(backend);

关键配置参数说明

参数说明建议值
state.backend.rocksdb.localdir本地存储目录SSD 磁盘路径
state.backend.rocksdb.block.cache-size块缓存大小状态大小的 10-20%
state.backend.rocksdb.writebuffer.size写缓冲区大小64-128MB
state.backend.rocksdb.writebuffer.count写缓冲区数量4-8
state.backend.rocksdb.thread.num后台线程数CPU 核数
state.backend.rocksdb.use-bloom-filter启用 Bloom 过滤器true(读密集场景)

配置文件方式(config.yaml)

state:
backend:
type: rocksdb
incremental: true
localdir: /mnt/ssd/flink-state
block.cache-size: 256m
writebuffer.size: 64m
writebuffer.count: 4
thread.num: 8
use-bloom-filter: true

状态后端选择建议

场景推荐状态后端
状态小于内存,追求低延迟HashMapStateBackend
状态较大,生产环境EmbeddedRocksDBStateBackend
云原生环境,TB 级状态ForSt(Flink 2.0+)
需要快速扩缩容ForSt(Flink 2.0+)
读密集型作业RocksDB + Bloom 过滤器
写密集型作业RocksDB + 增大 writebuffer

状态 TTL

状态 TTL 用于自动清理过期状态,防止状态无限增长:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24)) // TTL 时间
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 更新策略
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 可见性
.cleanupInRocksdbCompactFilter(1000) // RocksDB 压缩时清理
.build();

ValueStateDescriptor<String> descriptor = new ValueStateDescriptor<>("myState", String.class);
descriptor.enableTimeToLive(ttlConfig);

TTL 配置说明

// 更新类型
.setUpdateType(UpdateType.OnCreateAndWrite) // 创建和写入时更新 TTL(默认)
.setUpdateType(UpdateType.OnReadAndWrite) // 读取和写入时更新 TTL
.setUpdateType(UpdateType.Disabled) // 禁用 TTL 更新

// 可见性
.setStateVisibility(StateVisibility.NeverReturnExpired) // 永不返回过期数据(默认)
.setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp) // 如果未清理则返回

// 清理策略
.cleanupFullSnapshot() // 全量快照时清理
.cleanupIncrementally(10, true) // 增量清理
.cleanupInRocksdbCompactFilter(1000) // RocksDB 压缩时清理
.cleanupInBackground(1000, true) // 后台清理

容错机制

Flink 通过检查点机制实现容错,保证精确一次语义。

检查点原理

Chandy-Lamport 算法

Flink 的检查点基于 Chandy-Lamport 分布式快照算法的变体:

  1. 注入 Barrier:JobManager 向数据源注入检查点 Barrier
  2. Barrier 传递:Barrier 随数据流一起传递
  3. Barrier 对齐:多输入算子等待所有输入的 Barrier 到达
  4. 状态快照:Barrier 对齐后,算子进行状态快照
  5. 完成检查点:所有算子完成快照后,检查点完成
Source-1  ──── [record] [record] | barrier | [record] ────>  Operator
Source-2 ──── [record] | barrier | [record] [record] ────> Operator

Barrier 对齐 ─────────┘
快照状态
传递 Barrier

检查点内容

一个完整的检查点包含:

  1. 算子状态:每个算子实例的状态
  2. Keyed 状态:按键分区的状态
  3. 源偏移量:数据源的读取位置(如 Kafka offset)
  4. Sink 预提交:两阶段提交 Sink 的预提交信息

检查点配置

// 启用检查点,每1秒一次
env.enableCheckpointing(1000);

// 高级配置
CheckpointConfig config = env.getCheckpointConfig();

// 设置模式:精确一次或至少一次
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 设置超时时间
config.setCheckpointTimeout(60000);

// 设置最小间隔(两次检查点之间的最小间隔)
config.setMinPauseBetweenCheckpoints(500);

// 设置最大并发检查点数
config.setMaxConcurrentCheckpoints(1);

// 设置可容忍的检查点失败次数
config.setTolerableCheckpointFailureNumber(3);

// 启用外部持久化检查点
config.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 设置检查点存储位置
config.setCheckpointStorage("hdfs:///checkpoints");

// 启用非对齐检查点(处理反压情况)
config.enableUnalignedCheckpoints();

// 设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

关键配置说明

参数说明建议值
checkpointInterval检查点间隔1-5分钟
checkpointTimeout检查点超时5-10分钟
minPauseBetweenCheckpoints最小间隔30秒
tolerableCheckpointFailureNumber可容忍失败次数3-5

增量检查点

对于大状态作业,增量检查点可以显著减少 I/O:

// 启用增量检查点
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));

// 注意:HashMapStateBackend 不支持增量检查点

增量检查点只保存自上次检查点以来的状态变化,适合:

  • 状态变化较小的作业
  • 需要减少检查点时间的作业

非对齐检查点

当作业存在反压时,Barrier 对齐可能很慢。非对齐检查点可以解决:

config.enableUnalignedCheckpoints();
config.setAlignmentTimeout(Duration.ofSeconds(30)); // 对齐超时后启用非对齐

非对齐检查点原理

  • 不等待 Barrier 对齐
  • 将网络缓冲区中的数据也保存到检查点
  • 恢复时重放这些数据

保存点(Savepoint)

保存点是手动触发的检查点,用于版本升级和迁移:

# 触发保存点
flink savepoint <job-id> <target-directory>

# 从保存点恢复
flink run -s <savepoint-path> -d <jar-file>

# 取消作业并创建保存点
flink cancel -s <target-directory> <job-id>

# 从保存点恢复并允许状态兼容性
flink run -s <savepoint-path> -d --allowNonRestoredState <jar-file>

保存点与检查点的区别

特性检查点保存点
触发方式自动手动
目的故障恢复版本迁移、备份
生命周期自动管理手动管理
格式可增量和全量全量

Flink SQL 提供了更简洁的流处理方式,支持标准 SQL 语法。

Table API 与 SQL

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

// 创建 Table 环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// 或者从 StreamExecutionEnvironment 创建
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

连接器配置

Kafka 连接器

-- 创建 Kafka 源表
CREATE TABLE orders (
order_id INT,
user_id INT,
product_id INT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'order-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);

-- 创建 Kafka Sink 表
CREATE TABLE order_output (
user_id INT,
total_amount DECIMAL(10, 2),
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'order-output',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);

JDBC 连接器

-- 创建 JDBC 表
CREATE TABLE users (
id INT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = 'users',
'username' = 'root',
'password' = 'password',
'driver' = 'com.mysql.cj.jdbc.Driver'
);

-- Lookup Join(维表关联)
SELECT o.order_id, o.amount, u.name
FROM orders AS o
LEFT JOIN users FOR SYSTEM_TIME AS OF o.proctime AS u
ON o.user_id = u.id;

文件系统连接器

-- 创建文件系统表
CREATE TABLE file_sink (
user_id INT,
total_amount DECIMAL(10, 2),
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs:///output/orders',
'format' = 'parquet',
'partition.default-name' = 'default',
'sink.partition-commit.policy.kind' = 'success-file'
);

窗口查询

-- 滚动窗口
SELECT
user_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end,
SUM(amount) as total_amount
FROM orders
GROUP BY
user_id,
TUMBLE(order_time, INTERVAL '1' HOUR);

-- 滑动窗口
SELECT
user_id,
HOP_START(order_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_start,
HOP_END(order_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) as window_end,
SUM(amount) as total_amount
FROM orders
GROUP BY
user_id,
HOP(order_time, INTERVAL '30' MINUTE, INTERVAL '1' HOUR);

-- 会话窗口
SELECT
user_id,
SESSION_START(order_time, INTERVAL '30' MINUTE) as window_start,
SESSION_END(order_time, INTERVAL '30' MINUTE) as window_end,
SUM(amount) as total_amount
FROM orders
GROUP BY
user_id,
SESSION(order_time, INTERVAL '30' MINUTE);

-- 窗口 TVF(Table-Valued Function,推荐)
SELECT
user_id,
window_start,
window_end,
SUM(amount) as total_amount
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY user_id, window_start, window_end;

Join 操作

-- Regular Join(无界流 Join,状态会无限增长)
SELECT o.order_id, o.amount, u.name
FROM orders o
JOIN users u ON o.user_id = u.id;

-- Interval Join(时间区间 Join)
SELECT o.order_id, o.amount, p.product_name
FROM orders o, products p
WHERE o.product_id = p.id
AND o.order_time BETWEEN p.update_time - INTERVAL '1' HOUR AND p.update_time;

-- Temporal Join(维表 Join,需要 Lookup Source)
SELECT o.order_id, o.amount, u.name
FROM orders AS o
JOIN user_dim FOR SYSTEM_TIME AS OF o.proctime AS u
ON o.user_id = u.id;

函数

-- 内置函数
SELECT
UPPER(name),
LOWER(name),
SUBSTRING(name, 1, 5),
CONCAT(name, '-', CAST(age AS STRING)),
DATE_FORMAT(order_time, 'yyyy-MM-dd'),
DATE_ADD(order_time, INTERVAL '1' DAY),
DATEDIFF(order_time, CURRENT_DATE)
FROM orders;

-- 聚合函数
SELECT
COUNT(*) as total,
SUM(amount) as total_amount,
AVG(amount) as avg_amount,
MAX(amount) as max_amount,
MIN(amount) as min_amount
FROM orders;

-- 窗口函数
SELECT
order_id,
user_id,
amount,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY amount DESC) as rn,
RANK() OVER (PARTITION BY user_id ORDER BY amount DESC) as rk,
SUM(amount) OVER (PARTITION BY user_id ORDER BY order_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total
FROM orders;

-- 自定义函数(UDF)
-- 注册 UDF
tableEnv.createTemporarySystemFunction("myUpper", MyUpperFunction.class);

-- 使用 UDF
SELECT myUpper(name) FROM orders;

Flink 2.0 引入了多项 SQL 语法增强,简化开发体验:

C-style 转义字符串

支持使用 C 风格的转义字符串,便于处理特殊字符:

-- 使用 C-style 转义字符串(前缀 E)
SELECT E'\t\n\r' as special_chars; -- 制表符、换行符、回车符

-- 传统方式
SELECT CHAR(9) || CHAR(10) || CHAR(13) as special_chars;

-- 实际应用:处理包含特殊字符的数据
SELECT
id,
E'Hello\tWorld\n' as formatted_text
FROM orders;

QUALIFY 子句

QUALIFY 子句简化了窗口函数结果的过滤,语法更简洁:

-- 传统方式:使用子查询过滤窗口函数结果
SELECT * FROM (
SELECT
user_id,
order_id,
amount,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY amount DESC) as rn
FROM orders
) WHERE rn <= 3;

-- 使用 QUALIFY 子句(更简洁)
SELECT
user_id,
order_id,
amount
FROM orders
QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY amount DESC) <= 3;

-- Top-N 查询示例
SELECT
user_id,
product_id,
COUNT(*) as purchase_count
FROM user_purchases
QUALIFY ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) <= 10;

-- 去重示例
SELECT *
FROM orders
QUALIFY ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime) = 1;

表函数调用简化

Flink 2.0 中,表函数调用不再强制需要 TABLE() 包装:

-- 传统方式(仍然支持)
SELECT * FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR));

-- 简化方式(Flink 2.0+)
SELECT * FROM TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR);

-- 窗口表函数示例
SELECT
window_start,
window_end,
user_id,
COUNT(*) as order_count
FROM TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
GROUP BY window_start, window_end, user_id;

-- HOP 窗口
SELECT * FROM HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '30' MINUTE, INTERVAL '1' HOUR);

-- SESSION 窗口
SELECT * FROM SESSION(TABLE orders, DESCRIPTOR(order_time), INTERVAL '30' MINUTE);

SQL Gateway 应用模式

Flink 2.0 中,SQL Gateway 支持 Application 模式执行 SQL 作业,替代了已移除的 Per-Job 部署模式:

# 通过 SQL Gateway 以 Application 模式提交作业
# 适用于生产环境的资源隔离需求

# 启动 SQL Gateway
./bin/sql-gateway.sh start

# 提交 SQL 作业(Application 模式)
curl -X POST http://localhost:8083/v1/sessions/session_id/statements \
-H "Content-Type: application/json" \
-d '{
"statement": "INSERT INTO sink_table SELECT * FROM source_table",
"executionConfig": {
"execution.mode": "APPLICATION"
}
}'

物化表(Materialized Table)

物化表是 Flink 2.0 引入的新型表类型,它是流批统一处理的核心。物化表让用户可以声明式地管理实时和历史数据,通过单一管道实现数据的统一处理,无需关心底层是流还是批执行模式。

物化表核心概念

物化表的核心思想是将数据的定义、存储和刷新策略解耦:

  • 定义:通过 SQL 定义数据的计算逻辑
  • 存储:底层使用 Apache Paimon 作为存储格式(Flink 2.0 目前唯一支持的存储)
  • 刷新:支持流式刷新(实时)和批式刷新(定时)

创建物化表

-- 创建物化表
CREATE MATERIALIZED TABLE user_order_summary
(
-- 主键定义(必须)
PRIMARY KEY (user_id, window_start) NOT ENFORCED
)
-- 刷新策略
REFRESH MODE AUTO
WITH (
-- 存储:使用 Paimon 作为底层存储
'connector' = 'paimon',
'path' = 'hdfs:///warehouse/user_order_summary',
-- 刷新配置
'refresh.mode' = 'streaming', -- 流式刷新(实时)
-- 'refresh.mode' = 'batch', -- 批式刷新(定时)
'refresh.interval' = '1 h' -- 批式刷新间隔
)
AS SELECT
user_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
TUMBLE_END(order_time, INTERVAL '1' HOUR) as window_end,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
GROUP BY
user_id,
TUMBLE(order_time, INTERVAL '1' HOUR);

刷新模式说明

物化表支持三种刷新模式:

模式说明适用场景
STREAMING流式刷新,数据变化实时反映实时大屏、实时监控
BATCH批式刷新,按固定间隔刷新日报表、小时报表
AUTO自动选择,根据查询模式决定通用场景
-- 流式刷新物化表(实时更新)
CREATE MATERIALIZED TABLE realtime_metrics
REFRESH MODE STREAMING
WITH ('connector' = 'paimon', 'path' = '...')
AS SELECT ...;

-- 批式刷新物化表(每小时刷新)
CREATE MATERIALIZED TABLE hourly_report
REFRESH MODE BATCH EVERY INTERVAL '1' HOUR
WITH ('connector' = 'paimon', 'path' = '...')
AS SELECT ...;

查询物化表

-- 直接查询物化表(获取最新数据)
SELECT * FROM user_order_summary
WHERE window_start >= TIMESTAMP '2024-01-01 00:00:00';

-- 物化表支持时间旅行(查看历史数据)
SELECT * FROM user_order_summary
FOR SYSTEM_TIME AS OF TIMESTAMP '2024-01-15 10:00:00';

-- 物化表之间的 Join
SELECT
u.user_id,
u.total_amount,
p.product_count
FROM user_order_summary u
JOIN product_summary p ON u.window_start = p.window_start;

物化表管理

-- 手动刷新物化表
CALL sys.refresh_materialized_table('user_order_summary');

-- 查看物化表状态
SHOW CREATE MATERIALIZED TABLE user_order_summary;

-- 修改物化表定义(不重新处理历史数据)
ALTER MATERIALIZED TABLE user_order_summary
SET REFRESH MODE BATCH EVERY INTERVAL '2' HOUR;

-- 删除物化表
DROP MATERIALIZED TABLE user_order_summary;

物化表与传统方案对比

维度传统方案(流批分离)物化表
代码维护需要维护两套代码单一 SQL 定义
数据一致性可能不一致保证一致
Schema 变更需要重新处理支持无缝变更
执行模式需要手动选择自动适配
学习成本需要理解流批差异只需理解 SQL

物化表最佳实践

  1. 合理设计主键:主键决定数据的唯一性,直接影响存储和查询效率
  2. 选择合适的刷新模式:根据业务时效性要求选择流式或批式刷新
  3. 利用时间旅行:物化表支持历史数据查询,可用于数据审计和问题排查
  4. 监控刷新延迟:确保物化表数据及时更新

部署模式

Flink 支持多种部署模式,适应不同的应用场景。

Session 模式

Session 模式下,多个作业共享一个 Flink 集群:

# 启动 Session 集群
bin/start-cluster.sh

# 提交作业
bin/flink run -c com.example.MyJob myjob.jar

# 停止集群
bin/stop-cluster.sh

适用场景

  • 短作业、频繁提交
  • 资源共享需求
  • 开发测试环境

Per-Job 模式

每个作业独立一个 Flink 集群:

# YARN Per-Job 模式
bin/flink run -m yarn-cluster -c com.example.MyJob myjob.jar

# Kubernetes Per-Job 模式
bin/flink run-application -t kubernetes-application -c com.example.MyJob myjob.jar

适用场景

  • 长时间运行的作业
  • 需要资源隔离
  • 生产环境

Application 模式

Application 模式下,作业的 main() 方法在集群中执行:

# YARN Application 模式
bin/flink run-application -t yarn-application -c com.example.MyJob myjob.jar

# Kubernetes Application 模式
bin/flink run-application -t kubernetes-application \
-Dkubernetes.cluster-id=my-cluster \
-Dkubernetes.container.image=flink:latest \
-c com.example.MyJob \
local:///opt/flink/usrlib/myjob.jar

优势

  • 减少客户端负载
  • 更好的资源隔离
  • 支持作业内多个执行

资源管理器集成

YARN

# flink-conf.yaml
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2g
taskmanager.numberOfTaskSlots: 4

# 提交到 YARN
bin/flink run -m yarn-cluster \
-yjm 1024 -ytm 2048 -ys 4 \
-c com.example.MyJob myjob.jar

Kubernetes

# flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 4
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
# 部署到 Kubernetes
kubectl apply -f flink-configuration-configmap.yaml
kubectl apply -f jobmanager-deployment.yaml
kubectl apply -f taskmanager-deployment.yaml

性能调优

内存调优

Flink 的内存模型:

Total Process Memory
├── JVM Heap
│ ├── Framework Heap
│ └── Task Heap
├── Managed Memory
├── Network Memory
├── JVM Metaspace
└── JVM Overhead

关键配置

# TaskManager 内存配置
taskmanager.memory.process.size: 4096m
taskmanager.memory.task.heap.size: 1024m
taskmanager.memory.managed.size: 1024m
taskmanager.memory.network.min: 64m
taskmanager.memory.network.max: 256m

# JobManager 内存配置
jobmanager.memory.process.size: 1600m
jobmanager.memory.heap.size: 1024m

状态后端优化

// RocksDB 优化配置
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
backend.setDbStoragePath("/mnt/ssd/flink-state"); // 使用 SSD

// RocksDB 配置
Options options = new Options()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setDisableAutoCompactions(false)
.setMaxOpenFiles(-1);

网络调优

# 网络缓冲区配置
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb
taskmanager.network.numberOfBuffers: 2048

并行度设置

// 全局并行度
env.setParallelism(4);

// 算子并行度
stream.map(...).setParallelism(8);

// Kafka Source 并行度(建议与分区数一致)
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("...")
.setTopics("...")
.build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka")
.setParallelism(partitionCount);

反压处理

反压是指下游处理速度跟不上上游,导致数据积压。

检测反压

  • Flink Web UI 的 Backpressure 标签
  • 使用 REST API:/jobs/<job-id>/vertices/<vertex-id>/backpressure

处理方法

// 1. 增加并行度
stream.map(...).setParallelism(16);

// 2. 优化算子实现(减少 GC)
// 使用基本类型而非对象
// 避免创建大量临时对象

// 3. 使用异步 I/O
AsyncDataStream.orderedWait(
stream,
new AsyncDatabaseRequest(),
1000, // 超时时间
TimeUnit.MILLISECONDS,
100 // 最大并发请求数
);

// 4. 启用非对齐检查点
config.enableUnalignedCheckpoints();

背压监控

关键指标:

指标说明
outPoolUsage输出缓冲区使用率
inPoolUsage输入缓冲区使用率
backPressuredTimeMsPerSecond每秒背压时间
busyTimeMsPerSecond每秒忙碌时间
idleTimeMsPerSecond每秒空闲时间

监控与运维

关键监控指标

类别指标说明
作业状态fullRestarts完全重启次数
numberOfFailedCheckpoints失败的检查点数
numberOfCompletedCheckpoints完成的检查点数
检查点lastCheckpointDuration最近检查点耗时
lastCheckpointSize最近检查点大小
checkpointAlignmentTime检查点对齐时间
延迟currentOutputWatermark当前输出水位线
numRecordsInPerSecond每秒输入记录数
numRecordsOutPerSecond每秒输出记录数
背压backPressuredTimeMsPerSecond每秒背压时间
busyTimeMsPerSecond每秒忙碌时间

日志配置

# log4j.properties
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender

appender.file.name = MainAppender
appender.file.type = FILE
appender.file.fileName = ${sys:log.file}
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

常见问题排查

1. 检查点失败

问题:Checkpoint expired before completing
原因:检查点超时
解决:
- 增加 checkpointTimeout
- 启用非对齐检查点
- 检查是否有背压

2. 内存溢出

问题:java.lang.OutOfMemoryError: Java heap space
原因:堆内存不足
解决:
- 增加 taskmanager.memory.task.heap.size
- 检查是否有状态泄漏
- 使用 RocksDB 状态后端

3. 数据倾斜

问题:部分子任务处理慢
原因:数据分布不均
解决:
- 重新设计 keyBy 的 key
- 使用预聚合减少数据量
- 增加并行度

最佳实践

1. 状态设计

  • 最小化状态大小:只保存必要的数据
  • 使用 TTL:自动清理过期状态
  • 选择合适的状态后端:大状态使用 RocksDB

2. 时间处理

  • 优先使用事件时间:保证结果可重现
  • 合理设置水位线延迟:平衡延迟和正确性
  • 处理迟到数据:使用侧输出流收集

3. 容错配置

// 生产环境推荐配置
env.enableCheckpointing(60000); // 1分钟
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(600000); // 10分钟
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // 30秒
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.minutes(5), Time.seconds(10)));

4. 性能优化

  • 合理设置并行度:与数据量和集群资源匹配
  • 使用算子链:减少网络传输
  • 避免数据倾斜:设计合理的分区键
  • 使用异步 I/O:提高外部系统访问效率

5. 监控告警

  • 监控检查点成功率和耗时
  • 监控背压情况
  • 监控消费延迟(Kafka Lag)
  • 设置合理的告警阈值

从 Flink 1.x 升级到 Flink 2.0 需要注意以下破坏性变更。

API 迁移

DataSet API 迁移

DataSet API 已完全移除,需要迁移到 DataStream API 或 Table API/SQL:

// 旧版 DataSet API(已移除)
// DataSet<Tuple2<String, Integer>> result = env.fromElements(...)
// .groupBy(0)
// .sum(1);

// 迁移方案 1:使用 DataStream API(批执行模式)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

DataStream<Tuple2<String, Integer>> result = env.fromElements(...)
.keyBy(t -> t.f0)
.sum(1);

// 迁移方案 2:使用 Table API/SQL
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.executeSql("CREATE TABLE ...");
Table result = tableEnv.sqlQuery("SELECT key, SUM(value) FROM ... GROUP BY key");

Source/Sink 迁移

SourceFunction 和 SinkFunction 已移除,需要迁移到新的 Source/Sink V2 API:

// 旧版 SourceFunction(已移除)
// public class MySource implements SourceFunction<Event> { ... }

// 新版 Source API
public class MySource implements Source<Event, MySplit, EnumMySplit> {
@Override
public SplitEnumerator<MySplit> createEnumerator(
SplitEnumeratorContext<MySplit> enumContext) {
// 创建枚举器
}

@Override
public SourceReader<Event, MySplit> createReader(
SourceReaderContext readerContext) {
// 创建读取器
}
}

Java 版本升级

Flink 2.0 不再支持 Java 8,最低要求 Java 11:

# 检查 Java 版本
java -version # 需要至少 Java 11

# 推荐使用 Java 17
# 支持 Java 21

配置文件迁移

旧版 flink-conf.yaml 已不再支持,需要迁移到新版 config.yaml

# 使用迁移工具转换配置文件
bin/config-migration.sh --from flink-conf.yaml --to config.yaml

状态兼容性

重要:Flink 1.x 和 2.x 之间的状态不兼容,升级时需要:

  1. 使用保存点(Savepoint)导出 1.x 状态
  2. 升级到 2.x 后,从保存点恢复时可能需要调整状态描述符

连接器适配

由于 Source/Sink API 变更,连接器需要升级:

连接器适配状态
KafkaFlink 2.0 发布后立即支持
PaimonFlink 2.0 发布后立即支持
JDBCFlink 2.0 发布后立即支持
ElasticsearchFlink 2.0 发布后立即支持
其他连接器计划在 Flink 2.3 前完成迁移

DataStream V2 API(实验性)

Flink 2.0 引入了全新的 DataStream V2 API,旨在解决原 API 的设计问题。

设计目标

原 DataStream API 存在以下问题:

  • 流批语义混合,难以清晰表达意图
  • 扩展性受限,添加新功能需要大量改动
  • 与 Table API 的互操作性不够顺畅

DataStream V2 通过重新设计 API 结构来解决这些问题。

核心 API

import org.apache.flink.api.common.v2.DataStream;
import org.apache.flink.api.common.v2.ProcessFunction;
import org.apache.flink.api.common.v2.Partitioning;

// 创建 DataStream
DataStream<Event> stream = env.fromSource(source, WatermarkStrategy.noWatermarks());

// 基本转换
DataStream<String> result = stream
.process(new MyProcessFunction())
.partition(Partitioning.forward())
.map(event -> event.toString());

// ProcessFunction
public class MyProcessFunction extends ProcessFunction<Event, String> {

private ValueState<Long> countState;

@Override
public void open(OpenContext openContext) {
// 初始化状态
countState = openContext.getState(
new ValueStateDescriptor<>("count", Long.class));
}

@Override
public void processElement(Event event, Context ctx, Collector<String> out) {
// 处理元素
Long count = countState.value();
if (count == null) count = 0L;
countState.update(count + 1);

out.collect("Processed: " + event.getId() + ", count: " + count);
}
}

高级扩展

DataStream V2 提供高级扩展,简化常见操作:

// 窗口扩展
DataStream<WindowResult> windowed = stream
.window(WindowStrategy.tumbling(Duration.ofMinutes(5)))
.aggregate(new MyAggregateFunction());

// Join 扩展
DataStream<JoinResult> joined = stream1
.join(stream2)
.where(event -> event.getKey())
.equalTo(event -> event.getKey())
.window(WindowStrategy.tumbling(Duration.ofMinutes(5)));

注意事项

实验性功能

DataStream V2 API 目前处于实验阶段,接口可能发生变化,不建议在生产环境使用。后续版本将逐步完善并稳定下来。

AI 集成

Flink 2.0 加强了与 AI 工作流的集成。

Flink CDC 3.3 支持在 Transform 表达式中动态调用 AI 模型:

-- 在 CDC 作业中调用 OpenAI 模型
CREATE TABLE orders (
id INT,
content STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'database-name' = 'mydb',
'table-name' = 'orders'
);

-- 使用 AI 模型进行实时分析
CREATE TABLE analyzed_orders WITH (
'connector' = 'kafka',
'topic' = 'analyzed-orders',
'properties.bootstrap.servers' = 'localhost:9092'
) AS SELECT
id,
content,
openai_chat('Analyze sentiment: ' || content) as sentiment,
openai_embedding(content) as embedding
FROM orders;

Flink SQL 引入了专门的 AI 模型语法:

-- 定义 AI 模型
CREATE AI MODEL my_embedding_model
WITH (
'provider' = 'openai',
'model' = 'text-embedding-ada-002'
);

-- 在 SQL 中调用 AI 模型
SELECT
id,
content,
AI_EMBEDDING(my_embedding_model, content) as embedding
FROM documents;

-- 使用 AI 函数
SELECT
id,
content,
AI_CHAT('openai', 'gpt-4',
'Summarize: ' || content) as summary
FROM articles;

实时 AI 应用场景

场景说明
实时风险控制实时分析交易内容,检测欺诈行为
个性化推荐基于用户行为实时生成推荐
智能日志解析使用 AI 模型解析和分类日志
语义搜索实时生成文本嵌入向量,支持向量检索

小结

本章介绍了 Flink 的核心概念和使用方法:

  1. Flink 架构:JobManager、TaskManager、ResourceManager 协作
  2. DataStream API:Source、Transformation、Sink 三部分
  3. 窗口操作:滚动、滑动、会话、全局四种窗口类型
  4. 时间语义:事件时间、处理时间、摄入时间
  5. 状态管理:Keyed State、Operator State、广播状态、状态后端
  6. 容错机制:检查点原理、配置、增量检查点、非对齐检查点
  7. Flink SQL:连接器、窗口查询、Join 操作
  8. 部署模式:Session、Per-Job、Application 三种模式
  9. 性能调优:内存调优、状态后端优化、反压处理

Flink 2.0 重点特性

  • 解耦状态管理:ForSt 状态后端专为云原生设计
  • 异步执行模型:支持非阻塞状态操作,提升吞吐量
  • 物化表:统一流批处理,简化实时数仓开发
  • DataStream V2:全新的 API 设计(实验性)
  • AI 集成:支持在 SQL 和 CDC 中调用 AI 模型

版本选择建议

场景推荐版本
云原生环境、需要快速扩缩容Flink 2.0+
需要异步状态处理Flink 2.0+
稳定性优先Flink 1.18+
使用 DataSet APIFlink 1.x(需尽快迁移)

Flink 是目前最强大的流处理引擎,特别适合对延迟和精确性要求高的实时场景。在实际项目中,Flink 常与 Kafka、HBase、Elasticsearch 等组件配合使用。随着 Flink 2.0 的发布,Flink 在云原生和 AI 时代的竞争力进一步增强。

参考资源