Spark 内存计算框架
Apache Spark 是一个快速、通用、可扩展的大数据分析引擎。相比 Hadoop MapReduce,Spark 基于内存计算,速度可以快 100 倍,已成为目前最流行的大数据处理框架。
Spark 概述
什么是 Spark?
Spark 是 UC Berkeley AMP 实验室开源的分布式计算框架,后来成为 Apache 顶级项目。它提供了统一的编程模型,支持批处理、流处理、机器学习、图计算等多种计算场景。
Spark 的核心特点:
- 速度快:基于内存计算,比 MapReduce 快 10-100 倍
- 易用性:支持 Java、Scala、Python、R 多种语言
- 通用性:统一的 API 支持多种计算场景
- 兼容性:可运行在 YARN、Mesos、Kubernetes 上
Spark vs MapReduce
| 维度 | MapReduce | Spark |
|---|---|---|
| 计算模式 | 磁盘计算 | 内存计算 |
| 速度 | 较慢 | 快 10-100 倍 |
| 编程模型 | Map/Reduce 两阶段 | RDD 丰富的算子 |
| 适用场景 | 离线批处理 | 批处理、流处理、机器学习 |
| 迭代计算 | 每次迭代写磁盘 | 内存中迭代 |
MapReduce 的最大问题是每次计算结果都要写入磁盘,下次计算再从磁盘读取。对于需要多次迭代的算法(如机器学习),这会导致大量的磁盘 I/O。Spark 通过将中间结果保存在内存中,避免了这个问题,从而大幅提升了性能。
Spark 生态组件
Spark 提供了一整套生态系统:
| 组件 | 功能 | 说明 |
|---|---|---|
| Spark Core | 核心引擎 | RDD 抽象、任务调度 |
| Spark SQL | 结构化数据处理 | SQL 查询、DataFrame API |
| Spark Streaming | 流处理 | 微批处理流数据 |
| MLlib | 机器学习 | 常用机器学习算法 |
| GraphX | 图计算 | 图数据处理 |
这些组件共享统一的 API,可以在同一个应用中无缝切换使用。例如,你可以用 Spark SQL 读取数据,用 MLlib 训练模型,最后用 Spark Streaming 进行实时预测。
Spark 版本演进
| 版本 | 发布时间 | 重要特性 |
|---|---|---|
| Spark 1.x | 2014 | 引入 DataFrame API |
| Spark 2.x | 2016 | Structured Streaming、Dataset API |
| Spark 3.x | 2020 | Adaptive Query Execution、动态分区裁剪 |
| Spark 4.0 | 2025 | VARIANT 数据类型、SQL UDF 增强、Python 自定义数据源、Spark Connect 增强、声明式管道 |
Spark 架构
运行架构
Spark 采用主从架构,主要组件包括:
Driver(驱动程序)
Driver 是 Spark 应用的主进程,负责:
- 解析代码:将用户代码转换为任务
- 创建 SparkContext:初始化 Spark 应用
- 任务调度:将任务分配给 Executor 执行
- 结果收集:收集和汇总任务结果
Driver 是 Spark 应用的"大脑",它维护了整个应用的执行计划,并协调所有 Executor 的工作。如果 Driver 发生故障,整个应用就会失败。
Executor(执行器)
Executor 是工作节点上的进程,负责:
- 任务执行:执行 Driver 分配的任务
- 数据存储:缓存 RDD 数据
- 结果返回:将计算结果返回给 Driver
每个 Executor 是一个独立的 JVM 进程,可以并行执行多个任务。Executor 的数量和资源配置直接影响 Spark 应用的性能。
Cluster Manager(集群管理器)
Cluster Manager 负责资源管理:
- Standalone:Spark 自带的资源管理器
- YARN:Hadoop 的资源管理器
- Mesos:通用的集群管理器
- Kubernetes:容器编排平台
选择哪种 Cluster Manager 取决于你的基础设施。如果是 Hadoop 生态系统,通常选择 YARN;如果是云原生环境,Kubernetes 是更好的选择。
Spark 执行模型
理解 Spark 的执行模型对于编写高效的 Spark 应用至关重要。Spark 的执行过程可以分为以下几个层次:
应用(Application)→ 作业(Job)→ 阶段(Stage)→ 任务(Task)
Job(作业)
每次遇到 Action 算子时,Spark 就会创建一个 Job。一个 Spark 应用可能包含多个 Job,每个 Job 由多个 Stage 组成。
Stage(阶段)
Stage 是 Job 的子集,由一组可以并行执行的任务组成。Stage 的划分依据是 Shuffle(数据重分发)操作:
- Shuffle Map Stage:中间阶段,输出数据需要 Shuffle
- Result Stage:最终阶段,直接输出结果
Task(任务)
Task 是 Spark 执行的最小单元,每个 Task 处理一个分区的数据。一个 Stage 内的所有 Task 可以并行执行。
宽依赖与窄依赖
RDD 之间的依赖关系决定了 Stage 的划分:
窄依赖(Narrow Dependency)
窄依赖指父 RDD 的每个分区最多被子 RDD 的一个分区使用:
- 特点:不涉及 Shuffle,数据在同一个节点内处理
- 算子:map、filter、flatMap、union 等
- 优势:可以进行流水线优化,减少中间结果
父RDD分区 子RDD分区
[P1] → [P1']
[P2] → [P2']
[P3] → [P3']
宽依赖(Wide Dependency)
宽依赖指父 RDD 的每个分区被子 RDD 的多个分区使用:
- 特点:涉及 Shuffle,需要跨节点传输数据
- 算子:groupByKey、reduceByKey、join、repartition 等
- 代价:Shuffle 是 Spark 中最耗时的操作
父RDD分区 子RDD分区
[P1] ─┬──→ [P1'](需要P1,P2,P3的数据)
[P2] ─┼──→ [P2'](需要P1,P2,P3的数据)
[P3] ─┘──→ [P3'](需要P1,P2,P3的数据)
DAG 与 Stage 划分
DAG(有向无环图)是 Spark 执行计划的表示。Spark 通过以下步骤将用户代码转换为执行计划:
- 构建逻辑计划:分析 RDD 的依赖关系
- 划分 Stage:遇到宽依赖就切分 Stage
- 生成物理计划:将 Stage 转换为 TaskSet
RDD依赖图示例:
[A] ─map→ [B] ─filter→ [C] ─reduceByKey→ [D] ─map→ [E] ─saveAsTextFile
│ │ │
└──── Stage 1 ───────────────┘ Stage 2 ────────┘
(窄依赖) (宽依赖后)
Stage 划分的规则是:从 Action 算子开始向前回溯,遇到宽依赖就切分成新的 Stage。这样做的目的是让同一个 Stage 内的所有算子可以流水线式执行,减少中间结果的落盘。
Shuffle 机制详解
Shuffle 是 Spark 中最关键也最耗时的操作,理解 Shuffle 对于性能调优至关重要。
Shuffle 的本质
Shuffle 的本质是数据的重新分发。当某个算子需要跨分区聚合数据时(如 groupByKey),就必须把相同 Key 的数据汇总到同一个分区,这个过程就是 Shuffle。
Shuffle 的过程
一个完整的 Shuffle 过程包括:
- Map 阶段:每个 Map 任务处理输入数据,输出中间结果
- 排序:对中间结果按 Key 排序
- 溢写:内存不足时将数据写入磁盘
- 合并:合并多个溢写文件
- Reduce 阶段:Reduce 任务拉取属于自己的数据
Shuffle Manager
Spark 提供了多种 Shuffle Manager:
| Manager | 说明 | 适用场景 |
|---|---|---|
| Hash Shuffle | 早期版本,每个 Map 输出多个文件 | 已废弃 |
| Sort Shuffle | 默认方式,每个 Map 输出少量文件 | 大多数场景 |
| Tungsten Sort | 内存优化的 Sort Shuffle | 内存充足时 |
Sort Shuffle 是目前默认的 Shuffle 方式,它通过排序和合并减少了中间文件的数量,大大降低了内存和磁盘的压力。
Shuffle 调优
// 增加 Shuffle 分区数,减少每个任务处理的数据量
spark.conf.set("spark.sql.shuffle.partitions", "200")
// 启用 Shuffle 服务,提高 Fetch 效率
spark.conf.set("spark.shuffle.service.enabled", "true")
// 压缩 Shuffle 输出
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")
RDD 弹性分布式数据集
RDD(Resilient Distributed Dataset)是 Spark 的核心抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD 五大特性
理解 RDD 的五大特性是掌握 Spark 的关键:
1. 分区列表(Partition List)
RDD 由多个分区组成,每个分区是一个数据片段。分区是 Spark 并行计算的基本单位:
- 分区数决定了并行度
- 每个分区由一个 Task 处理
- 分区可以自定义或自动设置
// 查看分区数
val rdd = sc.parallelize(1 to 100)
println(rdd.partitions.length) // 默认分区数
// 指定分区数
val rdd2 = sc.parallelize(1 to 100, 10) // 10个分区
2. 计算函数(Compute Function)
每个分区都有一个计算函数,定义了如何从父 RDD 计算得到当前分区的数据:
- 计算是惰性的,只有在 Action 触发时才执行
- 每个分区的计算是独立的,可以并行执行
- 计算函数是 RDD 的核心,决定了数据的转换逻辑
3. 依赖关系(Dependency)
RDD 之间通过依赖关系形成 DAG:
- 窄依赖:一个父分区对应一个子分区
- 宽依赖:一个父分区对应多个子分区
依赖关系是 Spark 容错机制的基础。如果某个分区的数据丢失,Spark 可以根据依赖关系重新计算该分区。
4. 分区器(Partitioner)
分区器决定 Key-Value 类型 RDD 的分区策略:
- HashPartitioner:默认分区器,根据 Key 的哈希值分区
- RangePartitioner:按范围分区,适用于排序场景
// 自定义分区器
class MyPartitioner(numPartitions: Int) extends Partitioner {
override def numPartitions: Int = numPartitions
override def getPartition(key: Any): Int = {
// 自定义分区逻辑
key.hashCode % numPartitions
}
}
// 使用自定义分区器
val partitionedRDD = pairRDD.partitionBy(new MyPartitioner(4))
5. 最佳位置(Preferred Locations)
数据本地性偏好,指明分区数据最好在哪些节点上计算:
- PROCESS_LOCAL:数据在同一 JVM 进程中
- NODE_LOCAL:数据在同一节点上
- RACK_LOCAL:数据在同一机架上
- ANY:数据在其他地方
Spark 调度器会优先将任务分配到数据所在节点,减少网络传输。
RDD 创建方式
// 方式1:从集合创建
val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
// 指定分区数
val rdd2 = sc.parallelize(Seq(1, 2, 3, 4, 5), 3)
// 方式2:从外部存储创建
val rdd3 = sc.textFile("hdfs://path/to/file")
// 默认每个 HDFS Block 对应一个分区(128MB)
val rdd4 = sc.textFile("hdfs://path/to/file", 10) // 指定最小分区数
// 方式3:从其他 RDD 转换
val rdd5 = rdd1.map(_ * 2)
从集合创建时,建议根据集群 CPU 核数设置分区数,一般每个 CPU 核对应 2-4 个分区。从 HDFS 读取时,Spark 会根据 Block 数量自动设置分区数,通常不需要手动指定。
RDD 操作类型
RDD 支持两种操作类型:
转换操作(Transformation)
转换操作是懒执行的,返回新的 RDD:
基本转换:
// map:一对一转换
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val mapped = rdd.map(x => x * 2)
// 结果:2, 4, 6, 8, 10
// map 会对每个元素应用函数,生成新的 RDD
// filter:过滤元素
val filtered = rdd.filter(x => x > 2)
// 结果:3, 4, 5
// filter 保留满足条件的元素
// flatMap:一对多转换并展平
val textRdd = sc.parallelize(Seq("hello world", "spark rdd"))
val words = textRdd.flatMap(line => line.split(" "))
// 结果:hello, world, spark, rdd
// flatMap 先 map 后 flatten,适合文本分词等场景
// mapPartitions:按分区处理
val partitionMapped = rdd.mapPartitions(iter => {
// iter 是一个分区的所有数据迭代器
// 可以在这里初始化资源(如数据库连接)
iter.map(x => x * 2)
})
// mapPartitions 适合需要批量处理数据的场景,如数据库写入
// mapPartitionsWithIndex:带分区索引
val indexed = rdd.mapPartitionsWithIndex((index, iter) => {
iter.map(x => (index, x))
})
// 可以知道每个元素属于哪个分区
// distinct:去重
val dupRdd = sc.parallelize(Seq(1, 1, 2, 2, 3))
val distinct = dupRdd.distinct()
// 结果:1, 2, 3
Key-Value 转换:
// 创建 PairRDD(键值对 RDD)
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
// reduceByKey:按键聚合(推荐)
val reduced = pairs.reduceByKey(_ + _)
// 结果:(a, 4), (b, 2)
// reduceByKey 在 Map 端进行预聚合,减少 Shuffle 数据量
// groupByKey:按键分组(慎用)
val grouped = pairs.groupByKey()
// 结果:(a, [1, 3]), (b, [2])
// groupByKey 不进行预聚合,可能导致内存溢出
// mapValues:只对值进行转换
val mappedValues = pairs.mapValues(_ * 2)
// 结果:(a, 2), (b, 4), (a, 6)
// mapValues 保持 Key 不变,只转换 Value
// flatMapValues:对值展平
val flattened = sc.parallelize(Seq(("a", "1,2,3"), ("b", "4,5")))
.flatMapValues(v => v.split(","))
// 结果:(a,1), (a,2), (a,3), (b,4), (b,5)
// sortByKey:按键排序
val sorted = pairs.sortByKey()
// join:内连接
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2)))
val rdd2 = sc.parallelize(Seq(("a", "x"), ("b", "y")))
val joined = rdd1.join(rdd2)
// 结果:(a, (1, x)), (b, (2, y))
// leftOuterJoin:左外连接
val leftJoined = rdd1.leftOuterJoin(rdd2)
// 结果:(a, (1, Some(x))), (b, (2, Some(y)))
// cogroup:协同分组
val cogrouped = rdd1.cogroup(rdd2)
// 结果:(a, ([1], [x])), (b, ([2], [y]))
// subtractByKey:按键差集
val subtracted = rdd1.subtractByKey(rdd2)
// 返回 rdd1 中有但 rdd2 中没有的 Key
行动操作(Action)
行动操作触发计算,返回结果:
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
// collect:收集所有元素到 Driver
val arr = rdd.collect()
// 结果:Array(1, 2, 3, 4, 5)
// 注意:数据量大时会导致 Driver OOM
// count:计数
val count = rdd.count()
// 结果:5
// reduce:聚合
val sum = rdd.reduce(_ + _)
// 结果:15
// reduce 使用指定的函数聚合所有元素
// aggregate:自定义聚合(更灵活)
// 参数:(初始值)(分区内聚合函数)(分区间聚合函数)
val result = rdd.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1), // 分区内聚合
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // 分区间聚合
)
// 结果:(15, 5),即总和和个数
// aggregate 可以返回与输入类型不同的结果
// first:取第一个元素
val first = rdd.first()
// 结果:1
// take:取前 n 个元素
val top3 = rdd.take(3)
// 结果:Array(1, 2, 3)
// takeOrdered:取最小的 n 个元素
val smallest3 = rdd.takeOrdered(3)
// 结果:Array(1, 2, 3)
// top:取最大的 n 个元素
val largest3 = rdd.top(3)
// 结果:Array(5, 4, 3)
// foreach:遍历每个元素
rdd.foreach(x => println(x))
// foreachPartition:遍历每个分区
rdd.foreachPartition(iter => {
// 适合在这里初始化资源(如数据库连接)
iter.foreach(x => println(x))
})
// saveAsTextFile:保存到文件
rdd.saveAsTextFile("hdfs://path/output")
// countByKey:按键计数
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
val counts = pairs.countByKey()
// 结果:Map(a -> 2, b -> 1)
// countByValue:按值计数
val values = sc.parallelize(Seq(1, 1, 2, 2, 2, 3))
val valueCounts = values.countByValue()
// 结果:Map(1 -> 2, 2 -> 3, 3 -> 1)
RDD 持久化
Spark 支持将 RDD 缓存到内存或磁盘,避免重复计算:
为什么需要持久化
当同一个 RDD 被多次使用时,如果不持久化,每次都会从源头重新计算:
val rdd = sc.textFile("hdfs://path/data").map(...)
// 不持久化:以下操作会触发两次完整计算
rdd.count() // 第一次计算
rdd.filter(...).count() // 第二次计算
// 持久化:只计算一次
rdd.cache() // 或 rdd.persist()
rdd.count() // 第一次计算,并缓存结果
rdd.filter(...).count() // 直接使用缓存
存储级别
import org.apache.spark.storage.StorageLevel
// 缓存到内存
rdd.cache() // 等同于 MEMORY_ONLY
rdd.persist(StorageLevel.MEMORY_ONLY)
// 内存不足时存磁盘
rdd.persist(StorageLevel.MEMORY_AND_DISK)
// 只存磁盘
rdd.persist(StorageLevel.DISK_ONLY)
// 内存中序列化存储(节省空间)
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
// 多副本存储
rdd.persist(StorageLevel.MEMORY_ONLY_2)
存储级别选择建议:
| 存储级别 | 适用场景 |
|---|---|
| MEMORY_ONLY | 数据量不大,可完全放入内存 |
| MEMORY_AND_DISK | 数据量较大,部分放磁盘 |
| MEMORY_ONLY_SER | 内存有限,可接受序列化开销 |
| DISK_ONLY | 内存严重不足,不介意计算开销 |
持久化最佳实践
// 1. 及时释放不再使用的缓存
rdd.unpersist()
// 2. 在 Shuffle 后持久化
val shuffled = rdd.reduceByKey(_ + _).persist(StorageLevel.MEMORY_AND_DISK)
// 3. 监控缓存使用情况
// 通过 Spark UI 的 Storage 页面查看缓存状态
// 4. 避免缓存太大的 RDD
// 如果 RDD 太大,序列化后仍然占用大量空间,不如不缓存
共享变量
Spark 提供两种共享变量,解决闭包捕获变量的效率问题:
广播变量
广播变量将只读变量发送到所有节点,避免重复传输:
// 场景:大表 join 小表
// 假设 smallTable 是一个小表(几十MB)
val smallTable = Map(
"001" -> "产品A",
"002" -> "产品B",
"003" -> "产品C"
)
// 不使用广播变量:每个任务都会收到一份小表的拷贝
val rdd = sc.parallelize(Seq("001", "002", "003"))
val result1 = rdd.map(id => (id, smallTable(id)))
// 问题:如果有 100 个任务,小表会被传输 100 次
// 使用广播变量:小表只传输一次到每个 Executor
val broadcastTable = sc.broadcast(smallTable)
val result2 = rdd.map(id => (id, broadcastTable.value(id)))
// 优势:每个 Executor 只保存一份小表,节省网络和内存
// 释放广播变量
broadcastTable.unpersist()
广播变量原理:
- Driver 将广播变量分块存储到 BlockManager
- Executor 从 Driver 或其他 Executor 拉取数据块
- 类似 BT 下载,Executor 之间可以互相传输数据
- 每个 Executor 只保存一份广播变量
适用场景:
- 大表 join 小表(Map-side Join)
- 共享配置信息
- 机器学习模型参数
累加器
累加器用于聚合所有节点的值:
// 场景:统计错误记录数
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
// 创建累加器
val errorCount = sc.longAccumulator("errorCount")
rdd.foreach(x => {
if (x > 5) {
errorCount.add(1) // 累加
}
})
// 在 Driver 中获取值
println(s"错误数量: ${errorCount.value}")
// 结果:错误数量: 5
累加器注意事项:
// 问题:在 Transformation 中使用累加器可能导致不确定结果
val acc = sc.longAccumulator("acc")
val mapped = rdd.map(x => {
acc.add(1) // 如果 RDD 被多次计算,累加器会被多次累加
x * 2
})
mapped.count() // 累加器值 = 10
mapped.count() // 累加器值 = 20(而不是 10)
// 解决方案:先持久化再使用
mapped.cache()
mapped.count() // 累加器值 = 10
mapped.count() // 累加器值 = 10(不变)
// 或者只在 Action 操作中使用累加器
rdd.foreach(x => acc.add(1)) // 每次执行都会正确累加
自定义累加器:
import org.apache.spark.util.AccumulatorV2
class SetAccumulator[T] extends AccumulatorV2[T, Set[T]] {
private val set = scala.collection.mutable.Set.empty[T]
override def isZero: Boolean = set.isEmpty
override def copy(): AccumulatorV2[T, Set[T]] = new SetAccumulator[T]
override def reset(): Unit = set.clear()
override def add(v: T): Unit = set += v
override def merge(other: AccumulatorV2[T, Set[T]]): Unit = set ++= other.value
override def value: Set[T] = set.toSet
}
// 使用自定义累加器收集唯一值
val acc = new SetAccumulator[Int]
sc.register(acc, "uniqueValues")
rdd.foreach(x => acc.add(x))
println(s"唯一值: ${acc.value}")
Spark SQL
Spark SQL 提供了结构化数据处理能力,支持 SQL 查询和 DataFrame API。
DataFrame 和 Dataset
DataFrame 是带有 Schema 的分布式 Row 集合,类似数据库表:
import org.apache.spark.sql.SparkSession
// 创建 SparkSession(Spark 2.0+ 的入口)
val spark = SparkSession.builder()
.appName("SparkSQLDemo")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 从集合创建 DataFrame
val df = Seq((1, "Alice", 25), (2, "Bob", 30))
.toDF("id", "name", "age")
// 查看 Schema
df.printSchema()
// root
// |-- id: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)
// 查看数据
df.show()
// +---+-----+---+
// | id| name|age|
// +---+-----+---+
// | 1|Alice| 25|
// | 2| Bob| 30|
// +---+-----+---+
// 从文件读取
val csvDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data.csv")
val jsonDF = spark.read.json("data.json")
val parquetDF = spark.read.parquet("data.parquet")
// 从 JDBC 读取
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mydb")
.option("dbtable", "users")
.option("user", "root")
.option("password", "password")
.load()
DataFrame vs RDD
| 维度 | RDD | DataFrame |
|---|---|---|
| 类型安全 | 编译时检查 | 运行时检查 |
| 优化 | 无自动优化 | Catalyst 优化器 |
| 性能 | 较低 | 较高 |
| 易用性 | 需要了解底层 API | SQL 风格,更易用 |
DataFrame 由 Catalyst 优化器自动优化执行计划,通常比手动优化的 RDD 代码更快。
SQL 查询
Spark SQL 支持标准 SQL 语法:
// 注册临时视图
df.createOrReplaceTempView("people")
// 执行 SQL 查询
val result = spark.sql("""
SELECT
name,
age,
CASE WHEN age >= 30 THEN 'Senior' ELSE 'Junior' END as level
FROM people
WHERE age > 20
ORDER BY age DESC
""")
result.show()
DataFrame API
import org.apache.spark.sql.functions._
// 选择列
df.select("name", "age").show()
// 过滤
df.filter($"age" > 25).show()
df.filter("age > 25").show()
// 分组聚合
df.groupBy("age")
.agg(
count("*").as("count"),
avg("salary").as("avg_salary")
)
.show()
// 排序
df.orderBy($"age".desc).show()
// 连接
val df1 = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
val df2 = Seq((1, 25), (2, 30)).toDF("id", "age")
// 内连接
df1.join(df2, "id").show()
// 左连接
df1.join(df2, Seq("id"), "left").show()
// 条件连接
df1.join(df2, df1("id") === df2("id")).show()
常用内置函数
import org.apache.spark.sql.functions._
// 字符串函数
df.select(
col("name"),
upper(col("name")).as("upper_name"), // 转大写
lower(col("name")).as("lower_name"), // 转小写
length(col("name")).as("name_len"), // 字符串长度
concat(col("name"), lit("_"), col("age")).as("concat_str") // 字符串拼接
).show()
// 日期函数
df.select(
col("date"),
year(col("date")).as("year"),
month(col("date")).as("month"),
dayofmonth(col("date")).as("day"),
date_add(col("date"), 7).as("date_plus_7"), // 日期加7天
datediff(col("end_date"), col("start_date")).as("days_diff") // 日期差
).show()
// 数学函数
df.select(
col("value"),
round(col("value"), 2).as("rounded"), // 四舍五入
abs(col("value")).as("absolute"), // 绝对值
sqrt(col("value")).as("sqrt") // 平方根
).show()
// 条件函数
df.select(
col("name"),
when(col("age") > 30, "Senior")
.when(col("age") > 20, "Junior")
.otherwise("Entry")
.as("level")
).show()
// 空值处理
df.select(
col("name"),
coalesce(col("nickname"), col("name")).as("display_name"), // 返回第一个非空值
na.fill(0) // 空值填充为 0
).show()
窗口函数
窗口函数在不减少行数的情况下进行聚合计算:
import org.apache.spark.sql.expressions.Window
// 定义窗口
val window = Window.partitionBy("department").orderBy($"salary".desc)
// 排名函数
df.withColumn("rank", rank().over(window)) // 排名,相同值排名相同,跳过后续排名
.withColumn("dense_rank", dense_rank().over(window)) // 排名,相同值排名相同,不跳过
.withColumn("row_number", row_number().over(window)) // 行号,连续递增
.show()
// 聚合窗口
val aggWindow = Window.partitionBy("department")
.orderBy("date")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("running_total", sum("amount").over(aggWindow))
.withColumn("running_avg", avg("amount").over(aggWindow))
.show()
// LAG/LEAD:访问前后行
df.withColumn("prev_salary", lag("salary", 1).over(window))
.withColumn("next_salary", lead("salary", 1).over(window))
.show()
UDF 用户自定义函数
// 定义 UDF
val upperCase = udf((s: String) => s.toUpperCase)
// 注册 UDF(可在 SQL 中使用)
spark.udf.register("upperCase", upperCase)
// 在 SQL 中使用
spark.sql("SELECT upperCase(name) FROM people").show()
// 在 DataFrame API 中使用
df.withColumn("upper_name", upperCase(col("name"))).show()
UDF 性能注意事项:
UDF 是黑盒操作,Catalyst 优化器无法对其进行优化。在可能的情况下,优先使用内置函数:
// 不推荐:使用 UDF
val myUpper = udf((s: String) => s.toUpperCase)
df.select(myUpper(col("name")))
// 推荐:使用内置函数
df.select(upper(col("name")))
Spark Streaming
Spark Streaming 是 Spark 的流处理组件,采用微批处理模式。
DStream 编程模型
DStream(Discretized Stream)是 Spark Streaming 的基本抽象,代表连续的数据流:
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(5)) // 批处理间隔 5 秒
// 从 Socket 创建 DStream
val lines = ssc.socketTextStream("localhost", 9999)
// 处理数据
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// 输出结果
wordCounts.print()
// 启动流处理
ssc.start()
ssc.awaitTermination()
Structured Streaming
Structured Streaming 是基于 Spark SQL 的流处理 API,更加易用:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
// 从 Socket 读取数据
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 处理数据
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
// 输出到控制台
val query = wordCounts.writeStream
.outputMode("complete") // 输出模式
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds")) // 触发间隔
.start()
query.awaitTermination()
窗口操作
// 窗口聚合
val windowCounts = lines
.as[String]
.flatMap(_.split(" "))
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"), // 窗口大小 10 分钟,滑动步长 5 分钟
col("value")
)
.count()
// 水印:处理迟到数据
val withWatermark = lines
.withWatermark("timestamp", "10 minutes") // 允许数据迟到 10 分钟
.groupBy(
window(col("timestamp"), "10 minutes"),
col("word")
)
.count()
输出模式
| 模式 | 说明 | 适用场景 |
|---|---|---|
| Append | 只输出新数据 | 确定性场景,不允许修改历史数据 |
| Complete | 输出所有结果 | 聚合查询,结果集较小 |
| Update | 只输出更新的结果 | 聚合查询,需要区分新旧数据 |
与 Kafka 集成
// 从 Kafka 读取
val kafkaDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
// 解析消息
val values = kafkaDF.select(
col("key").cast("string"),
col("value").cast("string"),
col("timestamp")
)
// 写入 Kafka
val query = result.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output")
.option("checkpointLocation", "/path/to/checkpoint")
.start()
Spark 性能调优
内存管理
Spark 的内存管理是性能调优的核心:
内存模型
Spark Executor 的堆内存分为三个区域:
┌─────────────────────────────────────────────────────┐
│ Executor Heap │
├─────────────────────┬───────────────────────────────┤
│ Reserved Memory │ Spark Memory (M) │
│ (300MB) │ ┌───────────┬──────────────┐ │
│ 用户数据结构、 │ │ Storage │ Execution │ │
│ 内部元数据 │ │ Memory │ Memory │ │
│ │ │ (R) │ (M-R) │ │
│ │ └───────────┴──────────────┘ │
└─────────────────────┴───────────────────────────────┘
- Reserved Memory:固定 300MB,用于用户数据结构和内部元数据
- Spark Memory:用于执行和存储
- Storage Memory:缓存数据、广播变量
- Execution Memory:Shuffle、Join、聚合等操作的内存
关键配置
// 内存配置
spark.executor.memory // Executor 总内存
spark.executor.memoryOverhead // 堆外内存(防止 OOM)
spark.driver.memory // Driver 内存
// 内存比例
spark.memory.fraction // Spark Memory 占比,默认 0.6
spark.memory.storageFraction // Storage Memory 占 Spark Memory 的比例,默认 0.5
内存调优建议
// 1. 如果缓存数据多,增加 Storage Memory 比例
spark.conf.set("spark.memory.storageFraction", "0.6")
// 2. 如果 Shuffle/Join 多,增加 Execution Memory 比例
spark.conf.set("spark.memory.storageFraction", "0.3")
// 3. 使用序列化存储减少内存占用
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
// 4. 使用 Kryo 序列化
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 5. 注册自定义类以提高序列化效率
spark.conf.registerKryoClasses(Array(classOf[MyClass]))
并行度调优
// 设置默认并行度
spark.conf.set("spark.default.parallelism", "200")
// 设置 Shuffle 分区数
spark.conf.set("spark.sql.shuffle.partitions", "200")
// 代码中调整
val df = spark.read.parquet("data")
.repartition(100) // 增加分区
.coalesce(10) // 减少分区(不触发 Shuffle)
// 分区数建议:集群总 CPU 核数的 2-3 倍
数据倾斜处理
数据倾斜是 Spark 应用最常见的性能问题:
症状
- 少数任务执行时间远超其他任务
- 某个 Executor 内存溢出
- 整体任务卡在 99% 完成
解决方案
// 方案1:增加 Shuffle 分区数
spark.conf.set("spark.sql.shuffle.partitions", "1000")
// 方案2:广播 Join(适合大表 join 小表)
val smallTable = spark.table("small_table").cache()
val bigTable = spark.table("big_table")
val result = bigTable.join(broadcast(smallTable), "key")
// 方案3:加盐处理(适合大表 join 大表)
import org.apache.spark.sql.functions._
// 对倾斜 Key 加盐
val salted = df.withColumn("salt", (rand() * 10).cast("int"))
.withColumn("salted_key", concat(col("key"), col("salt")))
// 扩展小表
val expanded = smallTable
.crossJoin(spark.range(0, 10).toDF("salt"))
.withColumn("salted_key", concat(col("key"), col("salt")))
// Join
val joined = salted.join(expanded, "salted_key")
// 方案4:AQE 自适应查询执行(Spark 3.0+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
广播 Join 优化
当 Join 的一方数据量较小时,使用广播 Join 可以避免 Shuffle:
// 自动广播阈值(默认 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")
// 手动广播
val smallDF = spark.table("small_table")
val bigDF = spark.table("big_table")
val result = bigDF.join(broadcast(smallDF), "key")
Spark UI 与监控
Spark UI 是性能分析和调优的重要工具:
关键页面
- Jobs:查看所有 Job 的执行时间
- Stages:查看 Stage 详情和任务分布
- Storage:查看缓存使用情况
- Environment:查看配置信息
- Executors:查看 Executor 状态
关键指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| Task Duration | 任务执行时间 | 个别任务远超平均值 |
| Shuffle Read/Write | Shuffle 数据量 | 过大说明数据倾斜 |
| GC Time | GC 时间 | 超过任务时间的 10% |
| Spill to Disk | 内存溢写到磁盘 | 频繁溢写说明内存不足 |
Spark 提交作业
spark-submit 命令
spark-submit \
--class com.example.WordCount \
--master yarn \
--deploy-mode cluster \
--executor-memory 4G \
--executor-cores 2 \
--num-executors 10 \
--driver-memory 2G \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.executor.memoryOverhead=1G \
myapp.jar \
input output
常用参数
| 参数 | 说明 | 示例 |
|---|---|---|
| --master | 集群地址 | yarn、spark://host:7077、local[*] |
| --deploy-mode | 部署模式 | client、cluster |
| --executor-memory | Executor 内存 | 4G |
| --executor-cores | Executor 核心数 | 2 |
| --num-executors | Executor 数量 | 10 |
| --driver-memory | Driver 内存 | 2G |
| --conf | 配置参数 | spark.sql.shuffle.partitions=200 |
部署模式
| 模式 | 说明 | 适用场景 |
|---|---|---|
| client | Driver 运行在提交任务的机器 | 开发调试 |
| cluster | Driver 运行在集群中 | 生产环境 |
小结
本章介绍了 Spark 的核心概念和使用方法:
- Spark 架构:Driver、Executor、Cluster Manager 三组件协作
- 执行模型:Job、Stage、Task 的层次关系,宽窄依赖与 Stage 划分
- RDD 编程:转换操作和行动操作,持久化与共享变量
- Spark SQL:DataFrame API 和 SQL 查询,窗口函数和 UDF
- Spark Streaming:DStream 和 Structured Streaming 流处理
- 性能调优:内存管理、并行度、数据倾斜处理
Spark 是目前最流行的大数据处理框架,掌握 Spark 对于大数据开发至关重要。在实际项目中,Spark 常与 Hive、Kafka、HBase 等组件配合使用。