大数据生态系统
大数据生态系统是一个庞大而复杂的技术体系,由众多组件协同工作,共同完成从数据采集到价值输出的全链路处理。理解这些组件的定位、原理和协作关系,是构建数据平台的基础。
本章将从实际应用角度出发,详细介绍大数据生态中的核心组件,帮助你在技术选型时做出正确决策。
生态系统全景
分层架构
一个完整的大数据平台通常采用分层架构设计,每一层专注于特定的数据处理任务。这种分层设计带来的好处是:各层之间松耦合,可以独立演进和替换。
┌─────────────────────────────────────────────────────────────┐
│ 数据应用层 │
│ BI报表 / 数据服务API / 机器学习平台 │
├─────────────────────────────────────────────────────────────┤
│ 数据仓库层 │
│ Hive / ClickHouse / Doris / StarRocks │
├─────────────────────────────────────────────────────────────┤
│ 计算引擎层 │
│ Spark / Flink / MapReduce / Presto │
├─────────────────────────────────────────────────────────────┤
│ 数据存储层 │
│ HDFS / HBase / Kudu / 对象存储 / 数据湖 │
├─────────────────────────────────────────────────────────────┤
│ 数据传输层 │
│ Kafka / Pulsar / RocketMQ │
├─────────────────────────────────────────────────────────────┤
│ 数据采集层 │
│ Flume / Sqoop / Canal / DataX / Debezium │
├─────────────────────────────────────────────────────────────┤
│ 基础设施层 │
│ YARN / Kubernetes / ZooKeeper / 调度系统 │
└─────────────────────────────────────────────────────────────┘
各层职责
| 层次 | 核心组件 | 核心职责 | 关键指标 |
|---|---|---|---|
| 数据采集层 | Flume、Sqoop、Canal、DataX | 从异构数据源收集数据 | 采集延迟、数据完整性 |
| 数据传输层 | Kafka、Pulsar | 消息缓冲、数据分发、削峰填谷 | 吞吐量、延迟、消息堆积 |
| 数据存储层 | HDFS、HBase、Kudu | 海量数据持久化存储 | 存储容量、读写延迟 |
| 数据仓库层 | Hive、ClickHouse、Doris | 结构化数据管理与分析 | 查询延迟、并发能力 |
| 计算引擎层 | Spark、Flink | 数据处理与分析计算 | 计算吞吐、资源利用率 |
| 资源调度层 | YARN、Kubernetes | 集群资源管理与调度 | 资源利用率、调度延迟 |
| 协调服务层 | ZooKeeper | 分布式协调与一致性 | 可用性、一致性 |
数据采集组件
数据采集是大数据处理的第一步,也是数据质量保障的关键环节。不同类型的数据源需要选用不同的采集工具。
Flume 日志采集
Flume 是 Apache 顶级项目,最初由 Cloudera 开发,是业界最广泛使用的日志采集系统之一。
核心架构
Flume 采用三层架构设计,数据从 Source 流向 Channel,再流向 Sink:
┌──────────────────────────────────────────────────────────────┐
│ Flume Agent │
│ │
│ ┌────────┐ ┌─────────┐ ┌────────┐ │
│ │ Source │ ───▶ │ Channel │ ───▶ │ Sink │ │
│ └────────┘ └─────────┘ └────────┘ │
│ ▲ │ │ │
│ │ │ │ │
│ 数据来源 临时存储 数据目的地 │
└──────────────────────────────────────────────────────────────┘
各组件职责如下:
- Source:负责接收数据,可以是 Avro、Thrift、Exec、Spooling Directory、Kafka 等多种类型
- Channel:负责暂存数据,提供事务支持,保证数据不丢失,可选 Memory、File、Kafka 等
- Sink:负责发送数据到目的地,如 HDFS、HBase、Kafka、Elasticsearch 等
事务机制
Flume 的事务机制是其可靠性的核心保障。在 Source 将数据写入 Channel 时,以及 Sink 从 Channel 读取数据时,都会开启事务:
Source阶段:
1. Source 从数据源读取事件
2. 开启 Channel 事务
3. 批量写入 Channel
4. 提交事务
5. 向数据源确认
Sink阶段:
1. Sink 开启 Channel 事务
2. 批量从 Channel 读取事件
3. 写入目标存储
4. 提交事务
5. 从 Channel 移除事件
这种两阶段事务机制确保了数据在传输过程中即使发生故障也不会丢失。如果 Sink 写入失败,事务会回滚,数据保留在 Channel 中等待重试。
常用 Source 配置
Exec Source 用于实时采集日志文件的增量内容:
# 定义 Agent 名称和组件
agent1.sources = src1
agent1.channels = ch1
agent1.sinks = sink1
# Exec Source 配置:实时采集日志
agent1.sources.src1.type = exec
agent1.sources.src1.command = tail -F /var/log/application.log
agent1.sources.src1.shell = /bin/bash -c
agent1.sources.src1.batchSize = 20
agent1.sources.src1.batchTimeout = 3000
# 注意:Exec Source 无法保证数据不丢失
# 如果 Channel 满了,tail 命令继续读取的数据可能丢失
Spooling Directory Source 提供可靠的数据采集,适合采集已完成写入的日志文件:
agent1.sources.src1.type = spooldir
agent1.sources.src1.spoolDir = /data/logs/incoming
agent1.sources.src1.fileSuffix = .COMPLETED
agent1.sources.src1.deletePolicy = never
agent1.sources.src1.fileHeader = true
agent1.sources.src1.fileHeaderKey = file
agent1.sources.src1.batchSize = 1000
# 只处理匹配的文件
agent1.sources.src1.includePattern = ^.*\.log$
# 忽略临时文件
agent1.sources.src1.ignorePattern = ^.*\.tmp$
Kafka Source 从 Kafka 消费数据:
agent1.sources.src1.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.src1.kafka.bootstrap.servers = kafka1:9092,kafka2:9092
agent1.sources.src1.kafka.topics = app-logs
agent1.sources.src1.kafka.consumer.group.id = flume-consumer
agent1.sources.src1.batchSize = 1000
agent1.sources.src1.batchDurationMillis = 1000
Channel 选择策略
| Channel 类型 | 特点 | 适用场景 | 可靠性 |
|---|---|---|---|
| Memory Channel | 速度快,存储在内存 | 允许少量数据丢失的场景 | 低 |
| File Channel | 持久化到磁盘,支持恢复 | 需要高可靠性的场景 | 高 |
| Kafka Channel | 使用 Kafka 作为存储 | 已有 Kafka 集群的场景 | 高 |
File Channel 配置示例:
agent1.channels.ch1.type = file
agent1.channels.ch1.checkpointDir = /data/flume/checkpoint
agent1.channels.ch1.dataDirs = /data/flume/data
agent1.channels.ch1.capacity = 1000000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.checkpointInterval = 30000
agent1.channels.ch1.maxFileSize = 2146435071
HDFS Sink 配置
HDFS Sink 是大数据场景下最常用的 Sink 之一,支持按时间、大小滚动文件:
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://namenode:8020/data/logs/%Y-%m-%d
agent1.sinks.sink1.hdfs.filePrefix = events
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat = Text
# 滚动策略:文件大小达到 128MB 滚动
agent1.sinks.sink1.hdfs.rollSize = 134217728
# 滚动策略:文件事件数达到 10000 滚动
agent1.sinks.sink1.hdfs.rollCount = 10000
# 滚动策略:每 3600 秒滚动一次
agent1.sinks.sink1.hdfs.rollInterval = 3600
# 空闲超时:30秒无数据写入则滚动
agent1.sinks.sink1.hdfs.idleTimeout = 30
# 使用本地时间戳
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
多路复用与扇出
Flume 支持将一个 Source 的数据分发到多个 Channel,有两种模式:
Replicating 模式(默认):数据复制到所有 Channel
agent1.sources.src1.type = avro
agent1.sources.src1.bind = 0.0.0.0
agent1.sources.src1.port = 4141
agent1.sources.src1.channels = ch1 ch2
agent1.sources.src1.selector.type = replicating
Multiplexing 模式:根据 Header 值路由到不同 Channel
agent1.sources.src1.selector.type = multiplexing
agent1.sources.src1.selector.header = eventType
agent1.sources.src1.selector.mapping.ORDER = ch1
agent1.sources.src1.selector.mapping.PAYMENT = ch2
agent1.sources.src1.selector.mapping.LOG = ch1 ch2
agent1.sources.src1.selector.default = ch1
Sqoop 数据迁移
Sqoop(SQL-to-Hadoop)是 Apache 项目,专门用于在关系数据库和 Hadoop 之间批量传输数据。它利用 MapReduce 实现并行传输,具有高吞吐量和容错能力。
核心原理
Sqoop 的工作原理是将数据传输任务转换为 MapReduce 作业:
┌─────────────┐ ┌─────────────┐
│ MySQL │ │ HDFS │
│ 关系数据库 │ │ /user/data │
└──────┬──────┘ └──────┬──────┘
│ │
│ MapReduce 作业 │
│ ┌──────────────────────────┐ │
└──│ Map 1: SELECT ... LIMIT │───┘
│ Map 2: SELECT ... LIMIT │
│ Map 3: SELECT ... LIMIT │
│ Map 4: SELECT ... LIMIT │
└──────────────────────────┘
并行读取,并行写入
导入时,Sqoop 会根据指定的并行度(-m 参数)将数据分片,每个 Map 任务负责读取一部分数据。分片依据是主键或指定的分片列。
导入数据到 HDFS
从 MySQL 导入数据到 HDFS:
sqoop import \
--connect jdbc:mysql://mysql-host:3306/mydb \
--username root \
--password-file /user/sqoop/.password \
--table users \
--target-dir /data/users \
--fields-terminated-by '\t' \
--lines-terminated-by '\n' \
--null-string '\\N' \
--null-non-string '\\N' \
--m 4
参数说明:
--connect:数据库 JDBC 连接字符串--password-file:密码文件路径(比--password更安全)--table:要导入的表名--target-dir:HDFS 目标目录--fields-terminated-by:字段分隔符--m:并行 Map 任务数
导入数据到 Hive
直接导入到 Hive 表:
sqoop import \
--connect jdbc:mysql://mysql-host:3306/mydb \
--username root \
--password-file /user/sqoop/.password \
--table orders \
--hive-import \
--hive-database dw \
--hive-table orders \
--create-hive-table \
--hive-overwrite \
--m 4
增量导入
Sqoop 支持增量导入,只导入新增或变更的数据:
sqoop import \
--connect jdbc:mysql://mysql-host:3306/mydb \
--username root \
--password-file /user/sqoop/.password \
--table users \
--target-dir /data/users \
--incremental append \
--check-column id \
--last-value 10000 \
--m 4
增量导入模式:
| 模式 | 说明 | 适用场景 |
|---|---|---|
append | 追加新增数据,基于自增列 | 只有插入操作的表 |
lastmodified | 追加或更新数据,基于时间戳 | 有插入和更新的表 |
导出数据
从 HDFS 导出到关系数据库:
sqoop export \
--connect jdbc:mysql://mysql-host:3306/mydb \
--username root \
--password-file /user/sqoop/.password \
--table users_export \
--export-dir /data/users \
--input-fields-terminated-by '\t' \
--input-lines-terminated-by '\n' \
--m 4
导出模式:
| 模式 | 说明 |
|---|---|
insert | 默认模式,插入所有记录 |
update | 更新已存在的记录 |
call | 调用存储过程处理每条记录 |
Canal 数据同步
Canal 是阿里巴巴开源的 MySQL binlog 增量订阅和消费组件。它模拟 MySQL Slave 的行为,实时获取 MySQL 的数据变更。
工作原理
Canal 的工作原理基于 MySQL 的主从复制机制:
MySQL 主从复制流程:
MySQL Master MySQL Slave
│ │
│ 1. 写入 binlog │
├──────────────────────────────────▶ │
│ │ 2. 请求 binlog
│ │
│ 3. 发送 binlog │
├──────────────────────────────────▶ │
│ │ 4. 解析并应用
│ │
Canal 工作流程:
MySQL Master Canal Server
│ │
│ │ 1. 伪装成 Slave
│ 2. 发送 binlog │
├──────────────────────────────────▶ │
│ │ 3. 解析 binlog
│ │
│ │ 4. 发送到下游
│ ├──────────▶ Kafka/HBase/ES
MySQL 主从复制的三个步骤:
- Master 记录 binlog:MySQL Master 将数据变更写入二进制日志(binlog)
- Slave 请求 binlog:Slave 连接 Master,请求指定位置之后的 binlog
- Master 发送 binlog:Master 将 binlog 发送给 Slave,Slave 解析并执行
Canal 正是利用了这个机制,伪装成 MySQL Slave,从而实时获取数据变更。
MySQL 配置要求
使用 Canal 前,需要对 MySQL 进行配置:
# my.cnf 配置
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式(必须)
binlog-format=ROW
# Server ID(与其他实例不冲突)
server-id=1
ROW 模式是必须的,因为只有 ROW 模式的 binlog 包含完整的数据变更信息。
Canal Server 配置
Canal Server 的配置文件 canal.properties:
# Canal Server 绑定地址
canal.ip = 0.0.0.0
canal.port = 11111
# 存储模式
canal.serverMode = kafka
# Kafka 配置(如果使用 Kafka 模式)
canal.mq.servers = kafka1:9092,kafka2:9092
canal.mq.batchSize = 16384
canal.mq.maxRetries = 3
# ZooKeeper 配置(用于高可用)
canal.zkServers = zk1:2181,zk2:2181,zk3:2181
Instance 配置(每个 MySQL 实例对应一个 Instance):
# instance.properties
# 源 MySQL 信息
canal.instance.master.address = mysql-host:3306
canal.instance.master.journal.name =
canal.instance.master.position =
# 数据库账号
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
# 过滤规则
canal.instance.filter.regex = mydb\\..*
canal.instance.filter.black.regex = mydb\\.sys_log
Canal Client 示例
使用 Canal Client 消费数据变更:
// 创建 CanalConnector
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("canal-server", 11111),
"example", // destination
"", // username
"" // password
);
try {
connector.connect();
// 订阅所有表
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
List<CanalEntry.Entry> entries = message.getEntries();
if (batchId != -1 && entries.size() > 0) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
// 处理不同类型的数据变更
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.INSERT) {
// 处理插入
handleInsert(rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.UPDATE) {
// 处理更新
handleUpdate(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.DELETE) {
// 处理删除
handleDelete(rowData.getBeforeColumnsList());
}
}
}
}
}
// 确认消息
connector.ack(batchId);
}
} finally {
connector.disconnect();
}
与 Kafka 集成
Canal 可以直接将数据发送到 Kafka,实现与流处理系统的无缝对接:
# canal.properties
canal.serverMode = kafka
canal.mq.servers = kafka1:9092
canal.mq.topic = canal-topic
# 发送模式
canal.mq.partition = 0
# 或者按表名分区
canal.mq.partitionsNum = 3
canal.mq.partitionHash = .*\\..*:$pk$
发送到 Kafka 的消息格式(JSON):
{
"data": [
{
"id": "1",
"name": "张三",
"age": "25"
}
],
"database": "mydb",
"es": 1589335836000,
"id": 7,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"name": "varchar(50)",
"age": "int(11)"
},
"old": [
{
"age": "24"
}
],
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"age": 4
},
"table": "users",
"ts": 1589335836234,
"type": "UPDATE"
}
DataX 数据同步
DataX 是阿里巴巴开源的离线数据同步工具,采用 Framework + Plugin 架构,支持多种异构数据源之间的数据同步。
架构设计
DataX 采用独特的「Reader-Channel-Writer」架构:
┌─────────────────────────────────────────────────────────────┐
│ DataX Job │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Reader │ ───▶ │ Channel │ ───▶ │ Writer │ │
│ │ (数据读取) │ │ (数据缓冲) │ │ (数据写入) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 从源端读取 内存缓冲交换 写入目标端 │
│ MySQL/Oracle 无磁盘 IO HDFS/Hive │
│ PostgreSQL 高吞吐量 HBase/ES │
└─────────────────────────────────────────────────────────────┘
核心特点:
- 无磁盘 IO:数据在内存中直接从 Reader 传输到 Writer
- 插件化架构:支持自定义 Reader 和 Writer 插件
- 流量控制:支持限速、并发控制
- 数据质量:支持脏数据收集和统计
支持的数据源
| 类型 | Reader(读) | Writer(写) |
|---|---|---|
| RDBMS | MySQL、Oracle、SQL Server、PostgreSQL | MySQL、Oracle、SQL Server、PostgreSQL |
| 大数据存储 | HDFS、Hive、HBase | HDFS、Hive、HBase |
| NoSQL | MongoDB、Elasticsearch | MongoDB、Elasticsearch |
| 数据仓库 | ClickHouse、Doris | ClickHouse、Doris |
| 文件 | TXT、CSV、Excel | TXT、CSV、Excel |
| 流式 | Kafka | Kafka |
配置文件示例
DataX 使用 JSON 格式的配置文件:
{
"job": {
"setting": {
"speed": {
"channel": 4,
"byte": 10485760,
"record": 10000
},
"errorLimit": {
"record": 100,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "password",
"column": ["id", "name", "age", "create_time"],
"splitPk": "id",
"connection": [
{
"table": ["users"],
"jdbcUrl": ["jdbc:mysql://mysql-host:3306/mydb"]
}
],
"where": "create_time >= DATE_SUB(NOW(), INTERVAL 1 DAY)"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://namenode:8020",
"fileType": "text",
"path": "/data/users",
"fileName": "users",
"column": [
{"name": "id", "type": "BIGINT"},
{"name": "name", "type": "STRING"},
{"name": "age", "type": "INT"},
{"name": "create_time", "type": "STRING"}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"compress": "gzip"
}
}
}
]
}
}
执行同步任务
# 执行同步任务
python datax.py job.json
# 查看 job 执行结果
# DataX 会输出详细的统计信息:
# - 总记录数
# - 总字节数
# - 同步速度
# - 错误记录数
与 Sqoop 对比
| 维度 | DataX | Sqoop |
|---|---|---|
| 架构 | 单机多线程 | MapReduce 分布式 |
| 依赖 | 无依赖 | 依赖 Hadoop |
| 数据源 | 更丰富(支持 ES、MongoDB 等) | 主要支持 RDBMS |
| 性能 | 单机性能有限 | 分布式高吞吐 |
| 配置 | JSON 配置文件 | 命令行参数 |
| 适用场景 | 中小规模数据同步 | 大规模批量迁移 |
数据湖技术
数据湖是一种存储企业各种原始数据的架构,数据以原始格式存储,不需要预先定义 Schema。近年来,数据湖技术快速发展,引入了表格式(Table Format)概念,使数据湖具备了事务能力。
什么是表格式
传统数据湖(如 HDFS 上的 Parquet 文件)存在以下问题:
- 缺乏事务支持:无法保证 ACID,并发写入可能导致数据不一致
- 小文件问题:频繁写入产生大量小文件,影响查询性能
- 缺乏元数据管理:无法追踪数据变更历史
- Schema 演进困难:修改表结构需要重写数据
表格式在文件之上增加了一层元数据管理,解决了这些问题:
┌─────────────────────────────────────────────────────────────┐
│ 查询引擎层 │
│ Spark / Flink / Trino / Presto │
├─────────────────────────────────────────────────────────────┤
│ 表格式层 │
│ Delta Lake / Iceberg / Hudi │
│ ┌─────────────────────────────────────────────┐ │
│ │ 元数据(事务日志、Schema、分区) │ │
│ └─────────────────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ 文件格式层 │
│ Parquet / ORC / Avro │
├─────────────────────────────────────────────────────────────┤
│ 存储层 │
│ HDFS / S3 / ADLS / GCS / OSS │
└─────────────────────────────────────────────────────────────┘
Delta Lake
Delta Lake 是 Databricks 开源的表格式,与 Spark 生态深度集成。
核心特性
- ACID 事务:支持序列化隔离级别,保证读写一致性
- 时间旅行:可以查询历史版本的数据
- Schema 演进:支持自动添加列、列类型变更
- UPSERT/DELETE:支持 Merge、Update、Delete 操作
- 流批一体:同一张表既可以作为批处理源,也可以作为流处理源和目的地
核心架构
Delta Lake 的核心是事务日志(_delta_log 目录):
/user/hive/warehouse/events/
├── _delta_log/
│ ├── 00000000000000000000.json # 初始版本
│ ├── 00000000000000000001.json # 第一次变更
│ ├── 00000000000000000002.json # 第二次变更
│ └── 00000000000000000003.checkpoint # 检查点文件
├── part-00000-xxx.parquet
├── part-00001-xxx.parquet
└── part-00002-xxx.parquet
事务日志记录了每次操作的元数据:
{
"commitInfo": {
"timestamp": 1589335836000,
"operation": "WRITE",
"operationParameters": {"mode": "Append"}
},
"add": {
"path": "part-00000-xxx.parquet",
"size": 1024,
"partitionValues": {"date": "2024-01-01"},
"modificationTime": 1589335836000
}
}
Spark 使用示例
// 创建 Delta 表
spark.sql("""
CREATE TABLE events (
event_id LONG,
event_type STRING,
event_time TIMESTAMP,
user_id LONG
) USING DELTA
PARTITIONED BY (date DATE)
""")
// 写入数据
val df = spark.read.json("/data/events.json")
df.write.format("delta")
.mode("append")
.partitionBy("date")
.save("/delta/events")
// 读取数据
val events = spark.read.format("delta").load("/delta/events")
// 时间旅行:查询历史版本
val historyVersion = spark.read.format("delta")
.option("versionAsOf", 0)
.load("/delta/events")
val historyTime = spark.read.format("delta")
.option("timestampAsOf", "2024-01-01")
.load("/delta/events")
// Merge 操作(UPSERT)
spark.sql("""
MERGE INTO events AS target
USING updates AS source
ON target.event_id = source.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
// 查看表历史
spark.sql("DESCRIBE HISTORY events").show()
与 Flink 集成
Delta Lake 也支持 Flink 的流式读写:
// Flink CDC 到 Delta Lake
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> stream = env.addSource(
new FlinkKafkaConsumer<>("topic", new JsonRowDeserializationSchema(), properties)
);
DeltaSink<Row> deltaSink = DeltaSink
.forRowData(new Path("/delta/events"), new Configuration(), schema)
.build();
stream.sinkTo(deltaSink);
Apache Iceberg
Apache Iceberg 是一个开源的表格式,设计目标是支持大规模数据分析工作负载。它的特点是引擎无关性,支持 Spark、Flink、Trino、Hive 等多种引擎。
核心架构
Iceberg 的元数据分为三层:
┌─────────────────────────────────────────────────────────────┐
│ Snapshot(快照) │
│ 每次写入产生一个快照,包含文件列表和统计信息 │
├─────────────────────────────────────────────────────────────┤
│ Manifest List(清单列表) │
│ 快照包含的 Manifest 文件列表 │
├─────────────────────────────────────────────────────────────┤
│ Manifest(清单文件) │
│ 数据文件列表,包含分区信息、统计信息 │
├─────────────────────────────────────────────────────────────┤
│ Data Files(数据文件) │
│ 实际存储的数据文件(Parquet/ORC/Avro) │
└─────────────────────────────────────────────────────────────┘
这种分层架构带来的好处:
- 高效的文件过滤:通过 Manifest 中的统计信息(min/max 值)跳过不需要扫描的文件
- 快照隔离:读取时获取一致的快照视图
- 增量读取:可以只读取两个快照之间的变更
核心特性
| 特性 | 说明 |
|---|---|
| Schema 演进 | 支持添加、删除、重命名、修改列类型,无需迁移数据 |
| 分区演进 | 支持修改分区策略,历史数据保持原有分区 |
| 隐藏分区 | 分区字段不需要在表中存在,如按日期分区但表中只有时间戳 |
| 快照隔离 | 读写互不阻塞 |
| 增量读取 | 支持读取两个快照之间的变更 |
Spark 使用示例
// 创建 Iceberg 表
spark.sql("""
CREATE TABLE db.events (
event_id LONG,
event_type STRING,
event_time TIMESTAMP,
user_id LONG
) USING iceberg
PARTITIONED BY (days(event_time))
""")
// 写入数据
df.writeTo("db.events").append()
// 读取数据
val events = spark.table("db.events")
// 增量读取
val increment = spark.read
.format("iceberg")
.option("start-snapshot-id", "123456789")
.option("end-snapshot-id", "987654321")
.load("db.events")
// 查看快照
spark.sql("SELECT * FROM db.events.snapshots").show()
// 回滚到历史版本
spark.sql("CALL system.rollback_to_snapshot('db.events', 123456789)")
多引擎支持
Iceberg 的一个重要优势是多引擎支持:
-- Spark 创建表
CREATE TABLE db.events USING iceberg PARTITIONED BY (days(event_time));
-- Trino 查询
SELECT * FROM iceberg.db.events WHERE event_time > CURRENT_TIMESTAMP - INTERVAL '1' DAY;
-- Flink 流式写入
INSERT INTO iceberg.db.events SELECT * FROM kafka_events;
Apache Hudi
Apache Hudi(Hadoop Upserts Deletes and Incrementals)是 Uber 开发的表格式,专注于实时数据入湖和增量处理场景。
表类型
Hudi 支持两种表类型:
Copy On Write (COW):
- 写时复制,每次更新重写包含该记录的文件
- 适合读多写少的场景
- 写入延迟较高,但读取性能最好
更新前:
File1.parquet: [record1, record2, record3]
更新 record2:
File1.parquet: [record1, record2_new, record3] // 重写整个文件
Merge On Read (MOR):
- 读时合并,更新写入增量日志文件(Avro 格式)
- 适合写多读少的场景
- 写入延迟低,读取时需要合并
更新前:
File1.parquet: [record1, record2, record3]
更新 record2:
File1.parquet: [record1, record2, record3] // 不变
File1.log: [record2_new] // 增量日志
读取时合并:
Result: [record1, record2_new, record3]
核心特性
| 特性 | 说明 |
|---|---|
| UPSERT | 原生支持主键更新,自动去重 |
| 增量查询 | 提供数据变更流,支持 CDC 场景 |
| 时间旅行 | 支持查询历史版本 |
| 小文件自动合并 | 自动合并小文件,优化存储 |
| 索引 | 支持 Bloom Filter、HBase 等索引加速更新 |
Spark 使用示例
// 创建 Hudi 表
val hudiOptions = Map(
"hoodie.table.name" -> "events",
"hoodie.datasource.write.recordkey.field" -> "event_id",
"hoodie.datasource.write.partitionpath.field" -> "event_date",
"hoodie.datasource.write.table.name" -> "events",
"hoodie.datasource.write.operation" -> "upsert",
"hoodie.datasource.write.precombine.field" -> "event_time",
"hoodie.upsert.shuffle.parallelism" -> "200",
"hoodie.insert.shuffle.parallelism" -> "200"
)
// 写入数据(UPSERT)
df.write
.format("org.apache.hudi")
.options(hudiOptions)
.mode(SaveMode.Append)
.save("/hudi/events")
// 增量查询
val incDF = spark.read
.format("org.apache.hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.datasource.read.begin.instanttime", "20240101000000")
.load("/hudi/events")
// 时间旅行
val historyDF = spark.read
.format("org.apache.hudi")
.option("as.of.instant", "20240101000000")
.load("/hudi/events")
CDC 数据入湖
Hudi 特别适合 CDC(Change Data Capture)场景:
// 从 Kafka 读取 CDC 数据写入 Hudi
val cdcDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "cdc-events")
.load()
cdcDF.writeStream
.format("org.apache.hudi")
.options(hudiOptions)
.option("checkpointLocation", "/checkpoints/hudi")
.start("/hudi/events")
表格式对比
| 维度 | Delta Lake | Iceberg | Hudi |
|---|---|---|---|
| 开发者 | Databricks | Apache | Apache (Uber) |
| 核心引擎 | Spark | 引擎无关 | Spark/Flink |
| UPSERT | 支持 | 支持 | 原生支持,性能最好 |
| 增量查询 | 支持 | 支持 | 原生支持,功能最强 |
| CDC 集成 | 一般 | 一般 | 优秀 |
| 小文件治理 | 手动优化 | 手动优化 | 自动合并 |
| 学习曲线 | 简单 | 中等 | 复杂 |
| 适用场景 | Spark 生态 | 多引擎混合分析 | 实时入湖、CDC |
选型建议:
- 如果主要使用 Spark,选择 Delta Lake 最简单
- 如果需要多引擎支持(Spark + Trino + Flink),选择 Iceberg
- 如果有大量实时更新和 CDC 需求,选择 Hudi
调度系统
调度系统负责定时触发数据处理任务、管理任务依赖、处理失败重试等。一个完善的调度系统是数据平台稳定运行的基础。
Apache Airflow
Airflow 是 Apache 顶级项目,是目前最流行的工作流调度平台之一。它使用 Python 代码定义工作流,具有极高的灵活性。
核心架构
Airflow 采用分布式架构,主要组件包括:
┌─────────────────────────────────────────────────────────────┐
│ Airflow 架构 │
│ │
│ ┌─────────────┐ │
│ │ Webserver │ ◀─── 用户通过 UI 查看和操作 │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Scheduler │ ──▶ │ Metadata │ ◀── │ Worker │ │
│ │ (调度器) │ │ Database │ │ (执行器) │ │
│ └─────────────┘ │ (元数据库) │ └─────────────┘ │
│ │ └─────────────┘ │ │
│ │ ▲ │ │
│ ▼ │ ▼ │
│ ┌─────────────┐ │ ┌─────────────┐ │
│ │ DAG Files │ ────────────┘ │ Executor │ │
│ │ (工作流定义) │ │ (执行引擎) │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
各组件职责:
| 组件 | 职责 |
|---|---|
| Webserver | 提供 Web UI,展示 DAG 状态、任务日志,支持手动触发 |
| Scheduler | 扫描 DAG 文件,调度任务执行 |
| Worker | 执行具体的任务 |
| Metadata Database | 存储任务状态、DAG 元数据(通常使用 PostgreSQL 或 MySQL) |
| DAG Files | Python 文件,定义工作流逻辑 |
Executor 类型
Executor 决定了任务的执行方式:
| Executor | 说明 | 适用场景 |
|---|---|---|
| SequentialExecutor | 单进程顺序执行 | 开发测试 |
| LocalExecutor | 本地多进程执行 | 小规模生产 |
| CeleryExecutor | 分布式执行,需要消息队列 | 大规模生产 |
| KubernetesExecutor | K8s 环境执行,每个任务一个 Pod | 云原生环境 |
DAG 定义
DAG(有向无环图)定义了任务的执行顺序和依赖关系:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
# 定义默认参数
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# 定义 DAG
with DAG(
dag_id='etl_pipeline',
default_args=default_args,
description='每日 ETL 处理流程',
schedule_interval='0 2 * * *', # 每天凌晨 2 点执行
start_date=datetime(2024, 1, 1),
catchup=False, # 不执行历史补跑
tags=['etl', 'daily'],
) as dag:
# 任务定义
extract_data = BashOperator(
task_id='extract_data',
bash_command='python /scripts/extract.py',
)
transform_data = SparkSubmitOperator(
task_id='transform_data',
application='/spark/transform.py',
conn_id='spark_default',
conf={
'spark.executor.memory': '4g',
'spark.executor.cores': '2',
},
)
load_data = PythonOperator(
task_id='load_data',
python_callable=load_to_doris,
op_kwargs={'table': 'ods_events'},
)
quality_check = PythonOperator(
task_id='quality_check',
python_callable=check_data_quality,
)
# 定义任务依赖
extract_data >> transform_data >> load_data >> quality_check
TaskFlow API
Airflow 2.0 引入了 TaskFlow API,简化了 DAG 定义:
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def etl_pipeline():
@task
def extract():
# 提取数据
data = fetch_from_source()
return data
@task
def transform(data):
# 转换数据
return process(data)
@task
def load(data):
# 加载数据
save_to_target(data)
# 自动处理依赖关系和数据传递
data = extract()
processed = transform(data)
load(processed)
dag = etl_pipeline()
任务依赖关系
Airflow 支持灵活的任务依赖定义:
# 线性依赖
task1 >> task2 >> task3
# 分支依赖
task1 >> [task2, task3] >> task4
# 复杂依赖
task1 >> task2 >> task5
task1 >> task3 >> task5
task1 >> task4 >> task5
# 使用 set_downstream/set_upstream
task1.set_downstream([task2, task3])
task4.set_upstream([task2, task3])
DolphinScheduler
DolphinScheduler 是 Apache 顶级项目,由易观数据开源,是一个分布式易扩展的可视化工作流调度平台。
核心架构
DolphinScheduler 采用去中心化设计:
┌─────────────────────────────────────────────────────────────┐
│ DolphinScheduler 架构 │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Master │ │ Master │ │ Master │ │
│ │ Server 1 │ │ Server 2 │ │ Server 3 │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ ZooKeeper │ │
│ │ (分布式协调) │ │
│ └─────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ Worker │ │ Worker │ │ Worker │ │
│ │ Server 1 │ │ Server 2 │ │ Server 3 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
各组件职责:
| 组件 | 职责 |
|---|---|
| Master Server | 负责 DAG 分发、任务调度、状态监控 |
| Worker Server | 负责具体任务执行 |
| ZooKeeper | 集群管理、Master 选举、分布式锁 |
| API Server | 提供 REST API |
| Alert Server | 告警通知 |
核心特性
- 可视化 DAG:拖拽式定义工作流,无需编码
- 多租户支持:支持多租户资源隔离
- 丰富的任务类型:支持 Shell、SQL、Spark、Flink、Python 等多种任务类型
- 高可用:Master 和 Worker 都支持多实例
- 告警机制:支持邮件、钉钉、企业微信等多种告警方式
支持的任务类型
| 类型 | 说明 |
|---|---|
| SHELL | Shell 脚本 |
| SQL | SQL 脚本(支持 MySQL、Hive、Spark SQL 等) |
| SPARK | Spark 应用 |
| FLINK | Flink 应用 |
| PYTHON | Python 脚本 |
| MR | MapReduce 任务 |
| PROCEDURE | 存储过程 |
| SUB_PROCESS | 子流程 |
| DATAX | DataX 同步任务 |
| SQOOP | Sqoop 任务 |
| HTTP | HTTP 请求 |
| DATASOURCE | 数据源操作 |
与 Airflow 对比
| 维度 | Airflow | DolphinScheduler |
|---|---|---|
| 定义方式 | Python 代码 | 可视化 DAG |
| 学习曲线 | 需要编程基础 | 可视化操作,上手快 |
| 灵活性 | 极高,可编程控制 | 中等,受限于可视化组件 |
| 大数据集成 | 通过 Operator | 原生支持多种大数据任务 |
| 高可用 | 需要配置 CeleryExecutor | 原生支持 |
| 多租户 | 不支持 | 支持 |
| 告警 | 需要配置 | 内置多种告警方式 |
选型建议:
- 团队有编程能力、需要复杂逻辑控制,选择 Airflow
- 团队偏向运维、需要快速上手,选择 DolphinScheduler
实时数仓架构
实时数仓是近年来数据架构的重要发展方向,旨在提供低延迟的数据分析能力。
Lambda 架构
Lambda 架构是 Nathan Marz 提出的经典实时架构,采用批流分离的策略:
┌─────────────┐
│ 数据源 │
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐
│ 批处理层 │ │ 加速层 │
│ (Batch) │ │ (Speed) │
└────┬────┘ └────┬────┘
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│批处理视图│ │实时视图 │
└────┬────┘ └────┬────┘
│ │
└──────────┬───────────────┘
│
▼
┌───────────┐
│ 服务层 │
│ (Serving) │
│ 合并查询 │
└───────────┘
三层架构详解:
批处理层(Batch Layer):
- 存储全量历史数据
- 定期(如每天)执行批处理计算
- 保证数据的准确性和完整性
- 使用 Hive、Spark 等批处理引擎
加速层(Speed Layer):
- 处理实时增量数据
- 提供低延迟的实时视图
- 使用 Flink、Spark Streaming 等流处理引擎
- 数据只保留短期(如最近 24 小时)
服务层(Serving Layer):
- 合并批处理层和加速层的结果
- 提供统一的查询接口
- 典型实现:Druid、ClickHouse、Doris
Lambda 架构的优缺点:
| 优点 | 缺点 |
|---|---|
| 容错性强,可重算 | 需要维护两套代码 |
| 数据准确性有保障 | 架构复杂,运维成本高 |
| 支持历史数据分析 | 数据一致性问题 |
Kappa 架构
Kappa 架构由 Kafka 创始人 Jay Kreps 提出,简化了 Lambda 架构:
┌─────────────┐
│ 数据源 │
└──────┬──────┘
│
▼
┌─────────────┐
│ Kafka │
│ (消息队列) │
│ 长期存储 │
└──────┬──────┘
│
▼
┌─────────────┐
│ 流处理层 │
│ (Flink) │
└──────┬──────┘
│
▼
┌─────────────┐
│ 服务层 │
│ (ClickHouse)│
└─────────────┘
核心思想:
- 只保留流处理层,移除批处理层
- Kafka 作为数据湖,长期保留数据(如 7 天以上)
- 需要重算时,从头消费 Kafka 数据
Kappa 架构的优缺点:
| 优点 | 缺点 |
|---|---|
| 架构简单,一套代码 | 依赖 Kafka 数据保留能力 |
| 数据一致性好 | 重算资源开销大 |
| 运维成本低 | 不适合需要全量扫描的场景 |
实时数仓分层
实时数仓通常采用与离线数仓类似的分层架构:
┌─────────────────────────────────────────────────────────────┐
│ ADS(应用数据层) │
│ 指标汇总、报表数据、业务应用数据 │
├─────────────────────────────────────────────────────────────┤
│ DWS(汇总数据层) │
│ 轻度汇总、主题宽表、面向业务主题 │
├─────────────────────────────────────────────────────────────┤
│ DWD(明细数据层) │
│ 清洗转换、标准化、明细数据 │
├─────────────────────────────────────────────────────────────┤
│ ODS(原始数据层) │
│ 原始日志、数据库变更记录、保留原始格式 │
└─────────────────────────────────────────────────────────────┘
典型数据流:
MySQL 业务库
│
▼ Canal
Kafka ODS(binlog 数据)
│
▼ Flink 实时清洗
Kafka DWD(清洗后明细)
│
▼ Flink 实时聚合
Kafka DWS(汇总数据)
│
▼ Flink 写入
ClickHouse ADS(应用数据)
│
▼
BI 报表 / 数据服务
数据治理
数据治理是数据平台建设的重要组成部分,包括元数据管理、数据血缘、数据质量等方面。
元数据管理
元数据是描述数据的数据,是数据治理的基础。元数据管理平台帮助组织发现、理解和管理数据资产。
Apache Atlas
Apache Atlas 是 Apache 顶级项目,提供企业级元数据管理和治理能力。
核心功能:
- 数据资产目录:自动发现和注册数据资产
- 数据血缘:追踪数据来源和去向
- 分类与标签:对数据进行分类和打标
- 安全策略:与 Ranger 集成,基于元数据的访问控制
架构组成:
┌─────────────────────────────────────────────────────────────┐
│ Apache Atlas 架构 │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Ingestion │ │ Storage │ │ Index │ │
│ │ Bridge │ │ (HBase) │ │ (Solr) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ Atlas Core │ │
│ │ Type System │ │
│ │ Graph Engine │ │
│ └─────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ Hive Hook │ │ Spark Hook │ │ Storm Hook │ │
│ │ 自动采集 │ │ 自动采集 │ │ 自动采集 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
通过 Hook 机制,Atlas 可以自动采集 Hive、Spark、Storm 等组件的血缘信息。
数据血缘
数据血缘记录数据从源头到目的地的完整流转过程,是数据治理的核心能力。
数据血缘的价值:
- 问题溯源:快速定位数据问题来源
- 影响分析:评估变更的影响范围
- 合规审计:满足监管要求
- 数据资产评估:识别核心数据资产
血缘采集方式:
| 方式 | 说明 | 优点 | 缺点 |
|---|---|---|---|
| 静态解析 | 解析 SQL、代码获取血缘 | 无侵入性 | 不够准确 |
| 动态采集 | 运行时采集血缘 | 准确度高 | 需要埋点 |
| 混合模式 | 静态 + 动态 | 综合优势 | 实现复杂 |
数据质量
数据质量监控保障数据的准确性、完整性和一致性。
数据质量维度:
| 维度 | 说明 | 检测方法 |
|---|---|---|
| 完整性 | 数据是否完整 | 空值检测、记录数检查 |
| 准确性 | 数据是否正确 | 格式校验、范围检查 |
| 一致性 | 数据是否一致 | 跨表比对、历史对比 |
| 及时性 | 数据是否及时 | 产出时间检查 |
| 唯一性 | 数据是否唯一 | 主键重复检查 |
常用数据质量工具:
- Apache Griffin:eBay 开源的大数据质量监控平台
- Great Expectations:Python 数据质量框架
- Deequ:AWS 开源的数据质量库(基于 Spark)
运维监控
监控指标
大数据集群监控的核心指标按组件划分:
| 组件 | 关键指标 | 说明 |
|---|---|---|
| HDFS | NameNode 堆内存、块池状态、副本健康度 | 存储健康状况 |
| YARN | 队列资源使用率、Container 状态、应用执行时间 | 资源利用情况 |
| Hive | 查询延迟、并发数、Metastore 状态 | 查询性能 |
| Kafka | 消息堆积、生产/消费延迟、Broker 状态 | 消息处理状况 |
| Spark | Executor 内存使用、GC 时间、Shuffle 数据量 | 计算性能 |
| Flink | Checkpoint 时长、背压、状态大小 | 流处理状况 |
监控工具
| 工具 | 说明 | 适用场景 |
|---|---|---|
| Prometheus + Grafana | 指标采集和可视化 | 通用监控方案 |
| Zabbix | 传统监控系统 | 传统运维环境 |
| Datadog | 云监控平台 | 云原生环境 |
日志管理
集中式日志管理架构:
应用日志 ──▶ Filebeat ──▶ Kafka ──▶ Logstash ──▶ Elasticsearch ──▶ Kibana
│
▼
日志处理规则
- 解析
- 过滤
- 格式化
技术选型指南
按数据规模选择
| 数据规模 | 存储选型 | 计算选型 | 典型场景 |
|---|---|---|---|
| GB 级 | MySQL、PostgreSQL | 单机脚本 | 小型业务系统 |
| TB 级 | HDFS、Hive | Spark | 中型企业数仓 |
| PB 级 | 数据湖、湖仓一体 | Spark + Flink | 大型互联网平台 |
按实时性要求选择
| 延迟要求 | 架构选型 | 典型技术 |
|---|---|---|
| 天级 | Hive 离线数仓 | Hive + Spark |
| 小时级 | Spark 批处理 | Spark SQL |
| 分钟级 | Spark Streaming | Spark Streaming |
| 秒级 | Flink 实时计算 | Flink + Kafka |
| 毫秒级 | Flink + 状态优化 | Flink + RocksDB |
按查询类型选择
| 查询类型 | 推荐引擎 | 特点 |
|---|---|---|
| 离线分析 | Hive、Spark SQL | 高吞吐,低并发 |
| 交互式查询 | Trino、ClickHouse | 低延迟,高并发 |
| 实时查询 | Doris、Druid | 秒级响应,支持更新 |
| 全文检索 | Elasticsearch | 倒排索引,全文搜索 |
小结
本章系统介绍了大数据生态系统的核心组件:
-
数据采集:Flume 适合日志采集,Sqoop 适合数据库迁移,Canal 适合实时同步,DataX 适合异构数据源同步
-
数据湖技术:Delta Lake 适合 Spark 生态,Iceberg 适合多引擎场景,Hudi 适合实时入湖和 CDC
-
调度系统:Airflow 灵活强大,适合技术团队;DolphinScheduler 可视化友好,适合运维团队
-
实时架构:Lambda 架构稳定但复杂,Kappa 架构简洁但依赖消息队列能力
-
数据治理:元数据管理是基础,数据血缘是核心,数据质量是保障
-
运维监控:完善的监控和日志体系是平台稳定运行的保障
大数据技术选型没有银弹,需要根据业务场景、团队能力、成本预算等因素综合考虑。理解各组件的定位和适用场景,才能构建出合适的数据平台。