跳到主要内容

Flink 流处理引擎

Apache Flink是一个分布式流处理引擎,提供高吞吐、低延迟、精确一次语义的流处理能力。与Spark的微批处理不同,Flink是真正的流处理引擎,每个事件到来时立即处理。

Flink是Apache顶级项目,最初由德国柏林工业大学的研究项目Stratosphere发展而来。Flink的核心设计理念是"流优先",将批处理视为流处理的特殊情况。

Flink的核心特点:

  1. 真正的流处理:事件级别处理,毫秒级延迟
  2. 精确一次语义:保证数据不丢失、不重复
  3. 状态管理:支持有状态计算,状态可持久化
  4. 事件时间:支持基于事件时间的处理
  5. 高可用:支持故障恢复和状态回滚
维度Spark StreamingFlink
处理模式微批处理事件级流处理
延迟秒级毫秒级
时间语义处理时间为主事件时间为主
状态管理有限支持完善的状态管理
精确一次需要额外配置原生支持
窗口操作相对简单非常丰富
场景说明
实时ETL数据清洗、转换、同步
实时监控系统监控、业务监控
实时推荐个性化推荐、广告投放
实时风控欺诈检测、异常识别
实时报表实时大屏、指标统计

系统架构

Flink采用主从架构,主要组件包括:

JobManager

JobManager是Flink的主节点,负责:

  • 作业调度:将作业分解为任务并调度执行
  • 资源管理:向ResourceManager申请资源
  • 检查点协调:协调检查点的生成
  • 故障恢复:处理任务失败和恢复

TaskManager

TaskManager是Flink的工作节点,负责:

  • 任务执行:执行具体的计算任务
  • 数据交换:与其他TaskManager交换数据
  • 状态存储:存储和管理状态数据
  • 资源汇报:向JobManager汇报状态

ResourceManager

ResourceManager负责资源管理:

  • 资源分配:为作业分配TaskManager资源
  • 资源回收:回收空闲资源
  • 资源适配:支持YARN、Kubernetes等资源管理器

并行度与槽位

并行度(Parallelism)

并行度决定了一个算子同时有多少个任务实例执行:

// 设置全局并行度
env.setParallelism(4);

// 设置单个算子并行度
dataStream.map(...).setParallelism(2);

槽位(Slot)

Slot是TaskManager的资源单位,每个Slot可以执行一个任务链:

  • 资源隔离:不同Slot之间内存隔离
  • 任务共享:同一作业的任务可以共享Slot

作业执行流程

Flink作业的执行流程:

  1. 编译:将DataStream API转换为StreamGraph
  2. 优化:将StreamGraph优化为JobGraph
  3. 提交:将JobGraph提交给JobManager
  4. 调度:JobManager将任务调度到TaskManager
  5. 执行:TaskManager执行任务并返回结果

DataStream API

DataStream API是Flink流处理的核心API,提供了丰富的算子操作。

程序结构

一个Flink程序的基本结构:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;

// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 添加数据源
DataStream<String> text = env.socketTextStream("localhost", 9999);

// 3. 数据转换
DataStream<String> result = text
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) {
for (String word : value.split(" ")) {
out.collect(word);
}
}
});

// 4. 输出结果
result.print();

// 5. 执行程序
env.execute("WordCount");

数据源(Source)

Flink支持多种数据源:

// 从集合读取
DataStream<String> fromCollection = env.fromCollection(Arrays.asList("a", "b", "c"));

// 从文件读取
DataStream<String> fromFile = env.readTextFile("/path/to/file");

// 从Socket读取
DataStream<String> fromSocket = env.socketTextStream("localhost", 9999);

// 从Kafka读取
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> fromKafka = env
.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));

// 自定义Source
DataStream<Event> customSource = env.addSource(new MySourceFunction());

转换操作(Transformation)

基本转换

// map:一对一转换
DataStream<Integer> mapped = stream.map(x -> x * 2);

// filter:过滤
DataStream<Integer> filtered = stream.filter(x -> x > 0);

// flatMap:一对多转换
DataStream<String> flatMapped = stream.flatMap((value, out) -> {
for (String word : value.split(" ")) {
out.collect(word);
}
});

KeyedStream 操作

// keyBy:按键分组
KeyedStream<Event, String> keyed = stream.keyBy(event -> event.getUserId());

// 滚动聚合
DataStream<Long> counts = keyed.map(event -> 1L).sum("count");

// reduce:自定义聚合
DataStream<Event> reduced = keyed.reduce((a, b) -> {
a.setCount(a.getCount() + b.getCount());
return a;
});

// 聚合函数
DataStream<Tuple2<String, Integer>> aggregated = keyed
.aggregate(new AggregateFunction<Event, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> createAccumulator() {
return Tuple2.of("", 0);
}

@Override
public Tuple2<String, Integer> add(Event value, Tuple2<String, Integer> acc) {
return Tuple2.of(value.getUserId(), acc.f1 + 1);
}

@Override
public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
return acc;
}

@Override
public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
return Tuple2.of(a.f0, a.f1 + b.f1);
}
});

多流转换

// union:合并多个流
DataStream<String> unioned = stream1.union(stream2, stream3);

// connect:连接两个流(可以不同类型)
ConnectedStreams<String, Integer> connected = stream1.connect(stream2);

// CoMap:对连接流分别处理
DataStream<String> coMapped = connected
.map(new CoMapFunction<String, Integer, String>() {
@Override
public String map1(String value) {
return "String: " + value;
}

@Override
public String map2(Integer value) {
return "Integer: " + value;
}
});

// join:流Join
DataStream<Tuple2<String, String>> joined = stream1
.join(stream2)
.where(e -> e.getKey())
.equalTo(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Event1, Event2, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> join(Event1 e1, Event2 e2) {
return Tuple2.of(e1.getName(), e2.getName());
}
});

数据输出(Sink)

// 输出到控制台
stream.print();

// 输出到文件
stream.writeAsText("/path/to/output");

// 输出到Kafka
stream.addSink(new FlinkKafkaProducer<>("topic", new SimpleStringSchema(), props));

// 输出到数据库
stream.addSink(new JdbcSink());

// 自定义Sink
stream.addSink(new MySinkFunction());

窗口操作

窗口是Flink处理无界流的核心概念,将无限的数据流划分为有限的数据块。

窗口类型

滚动窗口(Tumbling Window)

滚动窗口大小固定,窗口之间不重叠:

// 滚动事件时间窗口,大小为5秒
DataStream<T> result = stream
.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("count");

// 滚动处理时间窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

滑动窗口(Sliding Window)

滑动窗口大小固定,窗口之间有重叠:

// 滑动窗口,大小10秒,滑动步长5秒
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

会话窗口(Session Window)

会话窗口根据数据活跃度划分,无数据时窗口关闭:

// 会话窗口,间隔10秒
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))

全局窗口(Global Window)

全局窗口包含所有数据,需要自定义触发器:

.window(GlobalWindows.create())
.trigger(new MyTrigger())

窗口函数

增量聚合函数

// reduce
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((a, b) -> {
a.setCount(a.getCount() + b.getCount());
return a;
});

// aggregate
.aggregate(new AggregateFunction<Event, Long, Long>() {
@Override
public Long createAccumulator() { return 0L; }

@Override
public Long add(Event value, Long acc) { return acc + 1; }

@Override
public Long getResult(Long acc) { return acc; }

@Override
public Long merge(Long a, Long b) { return a + b; }
});

全窗口函数

// process:可以获取窗口上下文信息
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Event, Result, String, TimeWindow>() {
@Override
public void process(String key, Context ctx, Iterable<Event> events, Collector<Result> out) {
long count = 0;
for (Event e : events) {
count++;
}
out.collect(new Result(key, count, ctx.window().getStart(), ctx.window().getEnd()));
}
});

时间语义

Flink支持三种时间语义:

处理时间(Processing Time)

处理时间是事件被处理时的机器时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// 或在窗口中指定
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

事件时间(Event Time)

事件时间是事件实际发生的时间,需要从数据中提取:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 提取时间戳和水位线
DataStream<Event> withTimestamps = stream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);

摄入时间(Ingestion Time)

摄入时间是事件进入Flink系统的时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

水位线(Watermark)

水位线用于衡量事件时间的进度,处理乱序数据:

// 有界乱序水位线
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())

// 单调递增水位线(无乱序)
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())

// 自定义水位线
WatermarkStrategy.<Event>forGenerator(ctx -> new MyWatermarkGenerator())

状态管理

状态管理是Flink的核心特性,支持有状态计算。

状态类型

Keyed State

Keyed State绑定到每个Key上,只能用于KeyedStream:

public class CountFunction extends KeyedProcessFunction<String, Event, Result> {

// ValueState:单个值
private ValueState<Long> countState;

// ListState:列表
private ListState<String> listState;

// MapState:映射
private MapState<String, Integer> mapState;

// ReducingState:聚合状态
private ReducingState<Long> reducingState;

@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));
}

@Override
public void processElement(Event value, Context ctx, Collector<Result> out) throws Exception {
Long count = countState.value();
if (count == null) {
count = 0L;
}
count++;
countState.update(count);
out.collect(new Result(value.getKey(), count));
}
}

Operator State

Operator State绑定到每个算子实例上:

public class MySource implements SourceFunction<Event>, CheckpointedFunction {

private ListState<Event> state;
private List<Event> buffered = new ArrayList<>();

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("state", Event.class));

if (context.isRestored()) {
for (Event e : state.get()) {
buffered.add(e);
}
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.clear();
state.addAll(buffered);
}

@Override
public void run(SourceContext<Event> ctx) throws Exception {
// 发送数据
}

@Override
public void cancel() {}
}

状态后端

状态后端决定状态的存储方式:

// HashMapStateBackend:状态存内存
env.setStateBackend(new HashMapStateBackend());

// EmbeddedRocksDBStateBackend:状态存RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend());

// 配置检查点目录
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");

容错机制

Flink通过检查点机制实现容错,保证精确一次语义。

检查点配置

// 启用检查点,每1秒一次
env.enableCheckpointing(1000);

// 高级配置
CheckpointConfig config = env.getCheckpointConfig();

// 设置模式:精确一次或至少一次
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 设置超时时间
config.setCheckpointTimeout(60000);

// 设置最小间隔
config.setMinPauseBetweenCheckpoints(500);

// 设置最大并发检查点数
config.setMaxConcurrentCheckpoints(1);

// 启用外部持久化检查点
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

保存点(Savepoint)

保存点是手动触发的检查点,用于版本升级和迁移:

# 触发保存点
flink savepoint <job-id> <target-directory>

# 从保存点恢复
flink run -s <savepoint-path> -d <jar-file>

# 取消作业并创建保存点
flink cancel -s <target-directory> <job-id>

Flink SQL提供了更简洁的流处理方式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 注册表
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id INT," +
" user_id INT," +
" amount DECIMAL(10, 2)," +
" order_time TIMESTAMP(3)," +
" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'orders'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);

// 执行SQL查询
Table result = tableEnv.sqlQuery(
"SELECT " +
" user_id," +
" TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start," +
" SUM(amount) as total_amount " +
"FROM orders " +
"GROUP BY " +
" user_id," +
" TUMBLE(order_time, INTERVAL '1' HOUR)"
);

// 输出结果
tableEnv.toDataStream(result).print();

小结

本章介绍了Flink的核心概念和使用方法:

  1. Flink架构:JobManager、TaskManager、ResourceManager协作
  2. DataStream API:Source、Transformation、Sink三部分
  3. 窗口操作:滚动、滑动、会话、全局四种窗口类型
  4. 时间语义:事件时间、处理时间、摄入时间
  5. 状态管理:Keyed State和Operator State
  6. 容错机制:检查点和保存点

Flink是目前最强大的流处理引擎,特别适合对延迟和精确性要求高的实时场景。在实际项目中,Flink常与Kafka、HBase、Elasticsearch等组件配合使用。