Flink 流处理引擎
Apache Flink是一个分布式流处理引擎,提供高吞吐、低延迟、精确一次语义的流处理能力。与Spark的微批处理不同,Flink是真正的流处理引擎,每个事件到来时立即处理。
Flink 概述
什么是 Flink?
Flink是Apache顶级项目,最初由德国柏林工业大学的研究项目Stratosphere发展而来。Flink的核心设计理念是"流优先",将批处理视为流处理的特殊情况。
Flink的核心特点:
- 真正的流处理:事件级别处理,毫秒级延迟
- 精确一次语义:保证数据不丢失、不重复
- 状态管理:支持有状态计算,状态可持久化
- 事件时间:支持基于事件时间的处理
- 高可用:支持故障恢复和状态回滚
Flink vs Spark Streaming
| 维度 | Spark Streaming | Flink |
|---|---|---|
| 处理模式 | 微批处理 | 事件级流处理 |
| 延迟 | 秒级 | 毫秒级 |
| 时间语义 | 处理时间为主 | 事件时间为主 |
| 状态管理 | 有限支持 | 完善的状态管理 |
| 精确一次 | 需要额外配置 | 原生支持 |
| 窗口操作 | 相对简单 | 非常丰富 |
Flink 应用场景
| 场景 | 说明 |
|---|---|
| 实时ETL | 数据清洗、转换、同步 |
| 实时监控 | 系统监控、业务监控 |
| 实时推荐 | 个性化推荐、广告投放 |
| 实时风控 | 欺诈检测、异常识别 |
| 实时报表 | 实时大屏、指标统计 |
Flink 架构
系统架构
Flink采用主从架构,主要组件包括:
JobManager
JobManager是Flink的主节点,负责:
- 作业调度:将作业分解为任务并调度执行
- 资源管理:向ResourceManager申请资源
- 检查点协调:协调检查点的生成
- 故障恢复:处理任务失败和恢复
TaskManager
TaskManager是Flink的工作节点,负责:
- 任务执行:执行具体的计算任务
- 数据交换:与其他TaskManager交换数据
- 状态存储:存储和管理状态数据
- 资源汇报:向JobManager汇报状态
ResourceManager
ResourceManager负责资源管理:
- 资源分配:为作业分配TaskManager资源
- 资源回收:回收空闲资源
- 资源适配:支持YARN、Kubernetes等资源管理器
并行度与槽位
并行度(Parallelism)
并行度决定了一个算子同时有多少个任务实例执行:
// 设置全局并行度
env.setParallelism(4);
// 设置单个算子并行度
dataStream.map(...).setParallelism(2);
槽位(Slot)
Slot是TaskManager的资源单位,每个Slot可以执行一个任务链:
- 资源隔离:不同Slot之间内存隔离
- 任务共享:同一作业的任务可以共享Slot
作业执行流程
Flink作业的执行流程:
- 编译:将DataStream API转换为StreamGraph
- 优化:将StreamGraph优化为JobGraph
- 提交:将JobGraph提交给JobManager
- 调度:JobManager将任务调度到TaskManager
- 执行:TaskManager执行任务并返回结果
DataStream API
DataStream API是Flink流处理的核心API,提供了丰富的算子操作。
程序结构
一个Flink程序的基本结构:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 添加数据源
DataStream<String> text = env.socketTextStream("localhost", 9999);
// 3. 数据转换
DataStream<String> result = text
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) {
for (String word : value.split(" ")) {
out.collect(word);
}
}
});
// 4. 输出结果
result.print();
// 5. 执行程序
env.execute("WordCount");
数据源(Source)
Flink支持多种数据源:
// 从集合读取
DataStream<String> fromCollection = env.fromCollection(Arrays.asList("a", "b", "c"));
// 从文件读取
DataStream<String> fromFile = env.readTextFile("/path/to/file");
// 从Socket读取
DataStream<String> fromSocket = env.socketTextStream("localhost", 9999);
// 从Kafka读取
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> fromKafka = env
.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));
// 自定义Source
DataStream<Event> customSource = env.addSource(new MySourceFunction());
转换操作(Transformation)
基本转换
// map:一对一转换
DataStream<Integer> mapped = stream.map(x -> x * 2);
// filter:过滤
DataStream<Integer> filtered = stream.filter(x -> x > 0);
// flatMap:一对多转换
DataStream<String> flatMapped = stream.flatMap((value, out) -> {
for (String word : value.split(" ")) {
out.collect(word);
}
});
KeyedStream 操作
// keyBy:按键分组
KeyedStream<Event, String> keyed = stream.keyBy(event -> event.getUserId());
// 滚动聚合
DataStream<Long> counts = keyed.map(event -> 1L).sum("count");
// reduce:自定义聚合
DataStream<Event> reduced = keyed.reduce((a, b) -> {
a.setCount(a.getCount() + b.getCount());
return a;
});
// 聚合函数
DataStream<Tuple2<String, Integer>> aggregated = keyed
.aggregate(new AggregateFunction<Event, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> createAccumulator() {
return Tuple2.of("", 0);
}
@Override
public Tuple2<String, Integer> add(Event value, Tuple2<String, Integer> acc) {
return Tuple2.of(value.getUserId(), acc.f1 + 1);
}
@Override
public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
return acc;
}
@Override
public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
return Tuple2.of(a.f0, a.f1 + b.f1);
}
});
多流转换
// union:合并多个流
DataStream<String> unioned = stream1.union(stream2, stream3);
// connect:连接两个流(可以不同类型)
ConnectedStreams<String, Integer> connected = stream1.connect(stream2);
// CoMap:对连接流分别处理
DataStream<String> coMapped = connected
.map(new CoMapFunction<String, Integer, String>() {
@Override
public String map1(String value) {
return "String: " + value;
}
@Override
public String map2(Integer value) {
return "Integer: " + value;
}
});
// join:流Join
DataStream<Tuple2<String, String>> joined = stream1
.join(stream2)
.where(e -> e.getKey())
.equalTo(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Event1, Event2, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> join(Event1 e1, Event2 e2) {
return Tuple2.of(e1.getName(), e2.getName());
}
});
数据输出(Sink)
// 输出到控制台
stream.print();
// 输出到文件
stream.writeAsText("/path/to/output");
// 输出到Kafka
stream.addSink(new FlinkKafkaProducer<>("topic", new SimpleStringSchema(), props));
// 输出到数据库
stream.addSink(new JdbcSink());
// 自定义Sink
stream.addSink(new MySinkFunction());
窗口操作
窗口是Flink处理无界流的核心概念,将无限的数据流划分为有限的数据块。
窗口类型
滚动窗口(Tumbling Window)
滚动窗口大小固定,窗口之间不重叠:
// 滚动事件时间窗口,大小为5秒
DataStream<T> result = stream
.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("count");
// 滚动处理时间窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
滑动窗口(Sliding Window)
滑动窗口大小固定,窗口之间有重叠:
// 滑动窗口,大小10秒,滑动步长5秒
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
会话窗口(Session Window)
会话窗口根据数据活跃度划分,无数据时窗口关闭:
// 会话窗口,间隔10秒
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
全局窗口(Global Window)
全局窗口包含所有数据,需要自定义触发器:
.window(GlobalWindows.create())
.trigger(new MyTrigger())
窗口函数
增量聚合函数
// reduce
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((a, b) -> {
a.setCount(a.getCount() + b.getCount());
return a;
});
// aggregate
.aggregate(new AggregateFunction<Event, Long, Long>() {
@Override
public Long createAccumulator() { return 0L; }
@Override
public Long add(Event value, Long acc) { return acc + 1; }
@Override
public Long getResult(Long acc) { return acc; }
@Override
public Long merge(Long a, Long b) { return a + b; }
});
全窗口函数
// process:可以获取窗口上下文信息
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
@Override
public void process(String key, Context ctx, Iterable<Event> events, Collector<Result> out) {
long count = 0;
for (Event e : events) {
count++;
}
out.collect(new Result(key, count, ctx.window().getStart(), ctx.window().getEnd()));
}
});
时间语义
Flink支持三种时间语义:
处理时间(Processing Time)
处理时间是事件被处理时的机器时间:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 或在窗口中指定
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
事件时间(Event Time)
事件时间是事件实际发生的时间,需要从数据中提取:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 提取时间戳和水位线
DataStream<Event> withTimestamps = stream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
摄入时间(Ingestion Time)
摄入时间是事件进入Flink系统的时间:
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
水位线(Watermark)
水位线用于衡量事件时间的进度,处理乱序数据:
// 有界乱序水位线
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
// 单调递增水位线(无乱序)
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
// 自定义水位线
WatermarkStrategy.<Event>forGenerator(ctx -> new MyWatermarkGenerator())
状态管理
状态管理是Flink的核心特性,支持有状态计算。
状态类型
Keyed State
Keyed State绑定到每个Key上,只能用于KeyedStream:
public class CountFunction extends KeyedProcessFunction<String, Event, Result> {
// ValueState:单个值
private ValueState<Long> countState;
// ListState:列表
private ListState<String> listState;
// MapState:映射
private MapState<String, Integer> mapState;
// ReducingState:聚合状态
private ReducingState<Long> reducingState;
@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));
}
@Override
public void processElement(Event value, Context ctx, Collector<Result> out) throws Exception {
Long count = countState.value();
if (count == null) {
count = 0L;
}
count++;
countState.update(count);
out.collect(new Result(value.getKey(), count));
}
}
Operator State
Operator State绑定到每个算子实例上:
public class MySource implements SourceFunction<Event>, CheckpointedFunction {
private ListState<Event> state;
private List<Event> buffered = new ArrayList<>();
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("state", Event.class));
if (context.isRestored()) {
for (Event e : state.get()) {
buffered.add(e);
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.clear();
state.addAll(buffered);
}
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// 发送数据
}
@Override
public void cancel() {}
}
状态后端
状态后端决定状态的存储方式:
// HashMapStateBackend:状态存内存
env.setStateBackend(new HashMapStateBackend());
// EmbeddedRocksDBStateBackend:状态存RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// 配置检查点目录
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");
容错机制
Flink通过检查点机制实现容错,保证精确一次语义。
检查点配置
// 启用检查点,每1秒一次
env.enableCheckpointing(1000);
// 高级配置
CheckpointConfig config = env.getCheckpointConfig();
// 设置模式:精确一次或至少一次
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置超时时间
config.setCheckpointTimeout(60000);
// 设置最小间隔
config.setMinPauseBetweenCheckpoints(500);
// 设置最大并发检查点数
config.setMaxConcurrentCheckpoints(1);
// 启用外部持久化检查点
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
保存点(Savepoint)
保存点是手动触发的检查点,用于版本升级和迁移:
# 触发保存点
flink savepoint <job-id> <target-directory>
# 从保存点恢复
flink run -s <savepoint-path> -d <jar-file>
# 取消作业并创建保存点
flink cancel -s <target-directory> <job-id>
Flink SQL
Flink SQL提供了更简洁的流处理方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 注册表
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id INT," +
" user_id INT," +
" amount DECIMAL(10, 2)," +
" order_time TIMESTAMP(3)," +
" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'orders'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// 执行SQL查询
Table result = tableEnv.sqlQuery(
"SELECT " +
" user_id," +
" TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start," +
" SUM(amount) as total_amount " +
"FROM orders " +
"GROUP BY " +
" user_id," +
" TUMBLE(order_time, INTERVAL '1' HOUR)"
);
// 输出结果
tableEnv.toDataStream(result).print();
小结
本章介绍了Flink的核心概念和使用方法:
- Flink架构:JobManager、TaskManager、ResourceManager协作
- DataStream API:Source、Transformation、Sink三部分
- 窗口操作:滚动、滑动、会话、全局四种窗口类型
- 时间语义:事件时间、处理时间、摄入时间
- 状态管理:Keyed State和Operator State
- 容错机制:检查点和保存点
Flink是目前最强大的流处理引擎,特别适合对延迟和精确性要求高的实时场景。在实际项目中,Flink常与Kafka、HBase、Elasticsearch等组件配合使用。