跳到主要内容

Spark 内存计算框架

Apache Spark 是一个快速、通用、可扩展的大数据分析引擎。相比 Hadoop MapReduce,Spark 基于内存计算,速度可以快 100 倍,已成为目前最流行的大数据处理框架。

Spark 概述

什么是 Spark?

Spark 是 UC Berkeley AMP 实验室开源的分布式计算框架,后来成为 Apache 顶级项目。它提供了统一的编程模型,支持批处理、流处理、机器学习、图计算等多种计算场景。

Spark 的核心特点:

  1. 速度快:基于内存计算,比 MapReduce 快 10-100 倍
  2. 易用性:支持 Java、Scala、Python、R 多种语言
  3. 通用性:统一的 API 支持多种计算场景
  4. 兼容性:可运行在 YARN、Mesos、Kubernetes 上

Spark vs MapReduce

维度MapReduceSpark
计算模式磁盘计算内存计算
速度较慢快 10-100 倍
编程模型Map/Reduce 两阶段RDD 丰富的算子
适用场景离线批处理批处理、流处理、机器学习
迭代计算每次迭代写磁盘内存中迭代

MapReduce 的最大问题是每次计算结果都要写入磁盘,下次计算再从磁盘读取。对于需要多次迭代的算法(如机器学习),这会导致大量的磁盘 I/O。Spark 通过将中间结果保存在内存中,避免了这个问题,从而大幅提升了性能。

Spark 生态组件

Spark 提供了一整套生态系统:

组件功能说明
Spark Core核心引擎RDD 抽象、任务调度
Spark SQL结构化数据处理SQL 查询、DataFrame API
Spark Streaming流处理微批处理流数据
MLlib机器学习常用机器学习算法
GraphX图计算图数据处理

这些组件共享统一的 API,可以在同一个应用中无缝切换使用。例如,你可以用 Spark SQL 读取数据,用 MLlib 训练模型,最后用 Spark Streaming 进行实时预测。

Spark 版本演进

版本发布时间重要特性
Spark 1.x2014引入 DataFrame API
Spark 2.x2016Structured Streaming、Dataset API
Spark 3.x2020Adaptive Query Execution、动态分区裁剪
Spark 4.02025VARIANT 数据类型、SQL UDF 增强、Python 自定义数据源、Spark Connect 增强、声明式管道

Spark 架构

运行架构

Spark 采用主从架构,主要组件包括:

Driver(驱动程序)

Driver 是 Spark 应用的主进程,负责:

  • 解析代码:将用户代码转换为任务
  • 创建 SparkContext:初始化 Spark 应用
  • 任务调度:将任务分配给 Executor 执行
  • 结果收集:收集和汇总任务结果

Driver 是 Spark 应用的"大脑",它维护了整个应用的执行计划,并协调所有 Executor 的工作。如果 Driver 发生故障,整个应用就会失败。

Executor(执行器)

Executor 是工作节点上的进程,负责:

  • 任务执行:执行 Driver 分配的任务
  • 数据存储:缓存 RDD 数据
  • 结果返回:将计算结果返回给 Driver

每个 Executor 是一个独立的 JVM 进程,可以并行执行多个任务。Executor 的数量和资源配置直接影响 Spark 应用的性能。

Cluster Manager(集群管理器)

Cluster Manager 负责资源管理:

  • Standalone:Spark 自带的资源管理器
  • YARN:Hadoop 的资源管理器
  • Mesos:通用的集群管理器
  • Kubernetes:容器编排平台

选择哪种 Cluster Manager 取决于你的基础设施。如果是 Hadoop 生态系统,通常选择 YARN;如果是云原生环境,Kubernetes 是更好的选择。

Spark 执行模型

理解 Spark 的执行模型对于编写高效的 Spark 应用至关重要。Spark 的执行过程可以分为以下几个层次:

应用(Application)→ 作业(Job)→ 阶段(Stage)→ 任务(Task)

Job(作业)

每次遇到 Action 算子时,Spark 就会创建一个 Job。一个 Spark 应用可能包含多个 Job,每个 Job 由多个 Stage 组成。

Stage(阶段)

Stage 是 Job 的子集,由一组可以并行执行的任务组成。Stage 的划分依据是 Shuffle(数据重分发)操作:

  • Shuffle Map Stage:中间阶段,输出数据需要 Shuffle
  • Result Stage:最终阶段,直接输出结果

Task(任务)

Task 是 Spark 执行的最小单元,每个 Task 处理一个分区的数据。一个 Stage 内的所有 Task 可以并行执行。

宽依赖与窄依赖

RDD 之间的依赖关系决定了 Stage 的划分:

窄依赖(Narrow Dependency)

窄依赖指父 RDD 的每个分区最多被子 RDD 的一个分区使用:

  • 特点:不涉及 Shuffle,数据在同一个节点内处理
  • 算子:map、filter、flatMap、union 等
  • 优势:可以进行流水线优化,减少中间结果
父RDD分区        子RDD分区
[P1] → [P1']
[P2] → [P2']
[P3] → [P3']

宽依赖(Wide Dependency)

宽依赖指父 RDD 的每个分区被子 RDD 的多个分区使用:

  • 特点:涉及 Shuffle,需要跨节点传输数据
  • 算子:groupByKey、reduceByKey、join、repartition 等
  • 代价:Shuffle 是 Spark 中最耗时的操作
父RDD分区              子RDD分区
[P1] ─┬──→ [P1'](需要P1,P2,P3的数据)
[P2] ─┼──→ [P2'](需要P1,P2,P3的数据)
[P3] ─┘──→ [P3'](需要P1,P2,P3的数据)

DAG 与 Stage 划分

DAG(有向无环图)是 Spark 执行计划的表示。Spark 通过以下步骤将用户代码转换为执行计划:

  1. 构建逻辑计划:分析 RDD 的依赖关系
  2. 划分 Stage:遇到宽依赖就切分 Stage
  3. 生成物理计划:将 Stage 转换为 TaskSet
RDD依赖图示例:

[A] ─map→ [B] ─filter→ [C] ─reduceByKey→ [D] ─map→ [E] ─saveAsTextFile
│ │ │
└──── Stage 1 ───────────────┘ Stage 2 ────────┘
(窄依赖) (宽依赖后)

Stage 划分的规则是:从 Action 算子开始向前回溯,遇到宽依赖就切分成新的 Stage。这样做的目的是让同一个 Stage 内的所有算子可以流水线式执行,减少中间结果的落盘。

Shuffle 机制详解

Shuffle 是 Spark 中最关键也最耗时的操作,理解 Shuffle 对于性能调优至关重要。

Shuffle 的本质

Shuffle 的本质是数据的重新分发。当某个算子需要跨分区聚合数据时(如 groupByKey),就必须把相同 Key 的数据汇总到同一个分区,这个过程就是 Shuffle。

Shuffle 的过程

一个完整的 Shuffle 过程包括:

  1. Map 阶段:每个 Map 任务处理输入数据,输出中间结果
  2. 排序:对中间结果按 Key 排序
  3. 溢写:内存不足时将数据写入磁盘
  4. 合并:合并多个溢写文件
  5. Reduce 阶段:Reduce 任务拉取属于自己的数据

Shuffle Manager

Spark 提供了多种 Shuffle Manager:

Manager说明适用场景
Hash Shuffle早期版本,每个 Map 输出多个文件已废弃
Sort Shuffle默认方式,每个 Map 输出少量文件大多数场景
Tungsten Sort内存优化的 Sort Shuffle内存充足时

Sort Shuffle 是目前默认的 Shuffle 方式,它通过排序和合并减少了中间文件的数量,大大降低了内存和磁盘的压力。

Shuffle 调优

// 增加 Shuffle 分区数,减少每个任务处理的数据量
spark.conf.set("spark.sql.shuffle.partitions", "200")

// 启用 Shuffle 服务,提高 Fetch 效率
spark.conf.set("spark.shuffle.service.enabled", "true")

// 压缩 Shuffle 输出
spark.conf.set("spark.shuffle.compress", "true")
spark.conf.set("spark.shuffle.spill.compress", "true")

RDD 弹性分布式数据集

RDD(Resilient Distributed Dataset)是 Spark 的核心抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

RDD 五大特性

理解 RDD 的五大特性是掌握 Spark 的关键:

1. 分区列表(Partition List)

RDD 由多个分区组成,每个分区是一个数据片段。分区是 Spark 并行计算的基本单位:

  • 分区数决定了并行度
  • 每个分区由一个 Task 处理
  • 分区可以自定义或自动设置
// 查看分区数
val rdd = sc.parallelize(1 to 100)
println(rdd.partitions.length) // 默认分区数

// 指定分区数
val rdd2 = sc.parallelize(1 to 100, 10) // 10个分区

2. 计算函数(Compute Function)

每个分区都有一个计算函数,定义了如何从父 RDD 计算得到当前分区的数据:

  • 计算是惰性的,只有在 Action 触发时才执行
  • 每个分区的计算是独立的,可以并行执行
  • 计算函数是 RDD 的核心,决定了数据的转换逻辑

3. 依赖关系(Dependency)

RDD 之间通过依赖关系形成 DAG:

  • 窄依赖:一个父分区对应一个子分区
  • 宽依赖:一个父分区对应多个子分区

依赖关系是 Spark 容错机制的基础。如果某个分区的数据丢失,Spark 可以根据依赖关系重新计算该分区。

4. 分区器(Partitioner)

分区器决定 Key-Value 类型 RDD 的分区策略:

  • HashPartitioner:默认分区器,根据 Key 的哈希值分区
  • RangePartitioner:按范围分区,适用于排序场景
// 自定义分区器
class MyPartitioner(numPartitions: Int) extends Partitioner {
override def numPartitions: Int = numPartitions
override def getPartition(key: Any): Int = {
// 自定义分区逻辑
key.hashCode % numPartitions
}
}

// 使用自定义分区器
val partitionedRDD = pairRDD.partitionBy(new MyPartitioner(4))

5. 最佳位置(Preferred Locations)

数据本地性偏好,指明分区数据最好在哪些节点上计算:

  • PROCESS_LOCAL:数据在同一 JVM 进程中
  • NODE_LOCAL:数据在同一节点上
  • RACK_LOCAL:数据在同一机架上
  • ANY:数据在其他地方

Spark 调度器会优先将任务分配到数据所在节点,减少网络传输。

RDD 创建方式

// 方式1:从集合创建
val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
// 指定分区数
val rdd2 = sc.parallelize(Seq(1, 2, 3, 4, 5), 3)

// 方式2:从外部存储创建
val rdd3 = sc.textFile("hdfs://path/to/file")
// 默认每个 HDFS Block 对应一个分区(128MB)
val rdd4 = sc.textFile("hdfs://path/to/file", 10) // 指定最小分区数

// 方式3:从其他 RDD 转换
val rdd5 = rdd1.map(_ * 2)

从集合创建时,建议根据集群 CPU 核数设置分区数,一般每个 CPU 核对应 2-4 个分区。从 HDFS 读取时,Spark 会根据 Block 数量自动设置分区数,通常不需要手动指定。

RDD 操作类型

RDD 支持两种操作类型:

转换操作(Transformation)

转换操作是懒执行的,返回新的 RDD:

基本转换

// map:一对一转换
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val mapped = rdd.map(x => x * 2)
// 结果:2, 4, 6, 8, 10
// map 会对每个元素应用函数,生成新的 RDD

// filter:过滤元素
val filtered = rdd.filter(x => x > 2)
// 结果:3, 4, 5
// filter 保留满足条件的元素

// flatMap:一对多转换并展平
val textRdd = sc.parallelize(Seq("hello world", "spark rdd"))
val words = textRdd.flatMap(line => line.split(" "))
// 结果:hello, world, spark, rdd
// flatMap 先 map 后 flatten,适合文本分词等场景

// mapPartitions:按分区处理
val partitionMapped = rdd.mapPartitions(iter => {
// iter 是一个分区的所有数据迭代器
// 可以在这里初始化资源(如数据库连接)
iter.map(x => x * 2)
})
// mapPartitions 适合需要批量处理数据的场景,如数据库写入

// mapPartitionsWithIndex:带分区索引
val indexed = rdd.mapPartitionsWithIndex((index, iter) => {
iter.map(x => (index, x))
})
// 可以知道每个元素属于哪个分区

// distinct:去重
val dupRdd = sc.parallelize(Seq(1, 1, 2, 2, 3))
val distinct = dupRdd.distinct()
// 结果:1, 2, 3

Key-Value 转换

// 创建 PairRDD(键值对 RDD)
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))

// reduceByKey:按键聚合(推荐)
val reduced = pairs.reduceByKey(_ + _)
// 结果:(a, 4), (b, 2)
// reduceByKey 在 Map 端进行预聚合,减少 Shuffle 数据量

// groupByKey:按键分组(慎用)
val grouped = pairs.groupByKey()
// 结果:(a, [1, 3]), (b, [2])
// groupByKey 不进行预聚合,可能导致内存溢出

// mapValues:只对值进行转换
val mappedValues = pairs.mapValues(_ * 2)
// 结果:(a, 2), (b, 4), (a, 6)
// mapValues 保持 Key 不变,只转换 Value

// flatMapValues:对值展平
val flattened = sc.parallelize(Seq(("a", "1,2,3"), ("b", "4,5")))
.flatMapValues(v => v.split(","))
// 结果:(a,1), (a,2), (a,3), (b,4), (b,5)

// sortByKey:按键排序
val sorted = pairs.sortByKey()

// join:内连接
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2)))
val rdd2 = sc.parallelize(Seq(("a", "x"), ("b", "y")))
val joined = rdd1.join(rdd2)
// 结果:(a, (1, x)), (b, (2, y))

// leftOuterJoin:左外连接
val leftJoined = rdd1.leftOuterJoin(rdd2)
// 结果:(a, (1, Some(x))), (b, (2, Some(y)))

// cogroup:协同分组
val cogrouped = rdd1.cogroup(rdd2)
// 结果:(a, ([1], [x])), (b, ([2], [y]))

// subtractByKey:按键差集
val subtracted = rdd1.subtractByKey(rdd2)
// 返回 rdd1 中有但 rdd2 中没有的 Key

行动操作(Action)

行动操作触发计算,返回结果:

val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))

// collect:收集所有元素到 Driver
val arr = rdd.collect()
// 结果:Array(1, 2, 3, 4, 5)
// 注意:数据量大时会导致 Driver OOM

// count:计数
val count = rdd.count()
// 结果:5

// reduce:聚合
val sum = rdd.reduce(_ + _)
// 结果:15
// reduce 使用指定的函数聚合所有元素

// aggregate:自定义聚合(更灵活)
// 参数:(初始值)(分区内聚合函数)(分区间聚合函数)
val result = rdd.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1), // 分区内聚合
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // 分区间聚合
)
// 结果:(15, 5),即总和和个数
// aggregate 可以返回与输入类型不同的结果

// first:取第一个元素
val first = rdd.first()
// 结果:1

// take:取前 n 个元素
val top3 = rdd.take(3)
// 结果:Array(1, 2, 3)

// takeOrdered:取最小的 n 个元素
val smallest3 = rdd.takeOrdered(3)
// 结果:Array(1, 2, 3)

// top:取最大的 n 个元素
val largest3 = rdd.top(3)
// 结果:Array(5, 4, 3)

// foreach:遍历每个元素
rdd.foreach(x => println(x))

// foreachPartition:遍历每个分区
rdd.foreachPartition(iter => {
// 适合在这里初始化资源(如数据库连接)
iter.foreach(x => println(x))
})

// saveAsTextFile:保存到文件
rdd.saveAsTextFile("hdfs://path/output")

// countByKey:按键计数
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
val counts = pairs.countByKey()
// 结果:Map(a -> 2, b -> 1)

// countByValue:按值计数
val values = sc.parallelize(Seq(1, 1, 2, 2, 2, 3))
val valueCounts = values.countByValue()
// 结果:Map(1 -> 2, 2 -> 3, 3 -> 1)

RDD 持久化

Spark 支持将 RDD 缓存到内存或磁盘,避免重复计算:

为什么需要持久化

当同一个 RDD 被多次使用时,如果不持久化,每次都会从源头重新计算:

val rdd = sc.textFile("hdfs://path/data").map(...)

// 不持久化:以下操作会触发两次完整计算
rdd.count() // 第一次计算
rdd.filter(...).count() // 第二次计算

// 持久化:只计算一次
rdd.cache() // 或 rdd.persist()
rdd.count() // 第一次计算,并缓存结果
rdd.filter(...).count() // 直接使用缓存

存储级别

import org.apache.spark.storage.StorageLevel

// 缓存到内存
rdd.cache() // 等同于 MEMORY_ONLY
rdd.persist(StorageLevel.MEMORY_ONLY)

// 内存不足时存磁盘
rdd.persist(StorageLevel.MEMORY_AND_DISK)

// 只存磁盘
rdd.persist(StorageLevel.DISK_ONLY)

// 内存中序列化存储(节省空间)
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

// 多副本存储
rdd.persist(StorageLevel.MEMORY_ONLY_2)

存储级别选择建议

存储级别适用场景
MEMORY_ONLY数据量不大,可完全放入内存
MEMORY_AND_DISK数据量较大,部分放磁盘
MEMORY_ONLY_SER内存有限,可接受序列化开销
DISK_ONLY内存严重不足,不介意计算开销

持久化最佳实践

// 1. 及时释放不再使用的缓存
rdd.unpersist()

// 2. 在 Shuffle 后持久化
val shuffled = rdd.reduceByKey(_ + _).persist(StorageLevel.MEMORY_AND_DISK)

// 3. 监控缓存使用情况
// 通过 Spark UI 的 Storage 页面查看缓存状态

// 4. 避免缓存太大的 RDD
// 如果 RDD 太大,序列化后仍然占用大量空间,不如不缓存

共享变量

Spark 提供两种共享变量,解决闭包捕获变量的效率问题:

广播变量

广播变量将只读变量发送到所有节点,避免重复传输:

// 场景:大表 join 小表
// 假设 smallTable 是一个小表(几十MB)
val smallTable = Map(
"001" -> "产品A",
"002" -> "产品B",
"003" -> "产品C"
)

// 不使用广播变量:每个任务都会收到一份小表的拷贝
val rdd = sc.parallelize(Seq("001", "002", "003"))
val result1 = rdd.map(id => (id, smallTable(id)))
// 问题:如果有 100 个任务,小表会被传输 100 次

// 使用广播变量:小表只传输一次到每个 Executor
val broadcastTable = sc.broadcast(smallTable)
val result2 = rdd.map(id => (id, broadcastTable.value(id)))
// 优势:每个 Executor 只保存一份小表,节省网络和内存

// 释放广播变量
broadcastTable.unpersist()

广播变量原理

  1. Driver 将广播变量分块存储到 BlockManager
  2. Executor 从 Driver 或其他 Executor 拉取数据块
  3. 类似 BT 下载,Executor 之间可以互相传输数据
  4. 每个 Executor 只保存一份广播变量

适用场景

  • 大表 join 小表(Map-side Join)
  • 共享配置信息
  • 机器学习模型参数

累加器

累加器用于聚合所有节点的值:

// 场景:统计错误记录数
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

// 创建累加器
val errorCount = sc.longAccumulator("errorCount")

rdd.foreach(x => {
if (x > 5) {
errorCount.add(1) // 累加
}
})

// 在 Driver 中获取值
println(s"错误数量: ${errorCount.value}")
// 结果:错误数量: 5

累加器注意事项

// 问题:在 Transformation 中使用累加器可能导致不确定结果
val acc = sc.longAccumulator("acc")
val mapped = rdd.map(x => {
acc.add(1) // 如果 RDD 被多次计算,累加器会被多次累加
x * 2
})

mapped.count() // 累加器值 = 10
mapped.count() // 累加器值 = 20(而不是 10)

// 解决方案:先持久化再使用
mapped.cache()
mapped.count() // 累加器值 = 10
mapped.count() // 累加器值 = 10(不变)

// 或者只在 Action 操作中使用累加器
rdd.foreach(x => acc.add(1)) // 每次执行都会正确累加

自定义累加器

import org.apache.spark.util.AccumulatorV2

class SetAccumulator[T] extends AccumulatorV2[T, Set[T]] {
private val set = scala.collection.mutable.Set.empty[T]

override def isZero: Boolean = set.isEmpty
override def copy(): AccumulatorV2[T, Set[T]] = new SetAccumulator[T]
override def reset(): Unit = set.clear()
override def add(v: T): Unit = set += v
override def merge(other: AccumulatorV2[T, Set[T]]): Unit = set ++= other.value
override def value: Set[T] = set.toSet
}

// 使用自定义累加器收集唯一值
val acc = new SetAccumulator[Int]
sc.register(acc, "uniqueValues")

rdd.foreach(x => acc.add(x))
println(s"唯一值: ${acc.value}")

Spark SQL

Spark SQL 提供了结构化数据处理能力,支持 SQL 查询和 DataFrame API。

DataFrame 和 Dataset

DataFrame 是带有 Schema 的分布式 Row 集合,类似数据库表:

import org.apache.spark.sql.SparkSession

// 创建 SparkSession(Spark 2.0+ 的入口)
val spark = SparkSession.builder()
.appName("SparkSQLDemo")
.master("local[*]")
.getOrCreate()

import spark.implicits._

// 从集合创建 DataFrame
val df = Seq((1, "Alice", 25), (2, "Bob", 30))
.toDF("id", "name", "age")

// 查看 Schema
df.printSchema()
// root
// |-- id: integer (nullable = false)
// |-- name: string (nullable = true)
// |-- age: integer (nullable = false)

// 查看数据
df.show()
// +---+-----+---+
// | id| name|age|
// +---+-----+---+
// | 1|Alice| 25|
// | 2| Bob| 30|
// +---+-----+---+

// 从文件读取
val csvDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data.csv")

val jsonDF = spark.read.json("data.json")
val parquetDF = spark.read.parquet("data.parquet")

// 从 JDBC 读取
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mydb")
.option("dbtable", "users")
.option("user", "root")
.option("password", "password")
.load()

DataFrame vs RDD

维度RDDDataFrame
类型安全编译时检查运行时检查
优化无自动优化Catalyst 优化器
性能较低较高
易用性需要了解底层 APISQL 风格,更易用

DataFrame 由 Catalyst 优化器自动优化执行计划,通常比手动优化的 RDD 代码更快。

SQL 查询

Spark SQL 支持标准 SQL 语法:

// 注册临时视图
df.createOrReplaceTempView("people")

// 执行 SQL 查询
val result = spark.sql("""
SELECT
name,
age,
CASE WHEN age >= 30 THEN 'Senior' ELSE 'Junior' END as level
FROM people
WHERE age > 20
ORDER BY age DESC
""")

result.show()

DataFrame API

import org.apache.spark.sql.functions._

// 选择列
df.select("name", "age").show()

// 过滤
df.filter($"age" > 25).show()
df.filter("age > 25").show()

// 分组聚合
df.groupBy("age")
.agg(
count("*").as("count"),
avg("salary").as("avg_salary")
)
.show()

// 排序
df.orderBy($"age".desc).show()

// 连接
val df1 = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
val df2 = Seq((1, 25), (2, 30)).toDF("id", "age")

// 内连接
df1.join(df2, "id").show()

// 左连接
df1.join(df2, Seq("id"), "left").show()

// 条件连接
df1.join(df2, df1("id") === df2("id")).show()

常用内置函数

import org.apache.spark.sql.functions._

// 字符串函数
df.select(
col("name"),
upper(col("name")).as("upper_name"), // 转大写
lower(col("name")).as("lower_name"), // 转小写
length(col("name")).as("name_len"), // 字符串长度
concat(col("name"), lit("_"), col("age")).as("concat_str") // 字符串拼接
).show()

// 日期函数
df.select(
col("date"),
year(col("date")).as("year"),
month(col("date")).as("month"),
dayofmonth(col("date")).as("day"),
date_add(col("date"), 7).as("date_plus_7"), // 日期加7天
datediff(col("end_date"), col("start_date")).as("days_diff") // 日期差
).show()

// 数学函数
df.select(
col("value"),
round(col("value"), 2).as("rounded"), // 四舍五入
abs(col("value")).as("absolute"), // 绝对值
sqrt(col("value")).as("sqrt") // 平方根
).show()

// 条件函数
df.select(
col("name"),
when(col("age") > 30, "Senior")
.when(col("age") > 20, "Junior")
.otherwise("Entry")
.as("level")
).show()

// 空值处理
df.select(
col("name"),
coalesce(col("nickname"), col("name")).as("display_name"), // 返回第一个非空值
na.fill(0) // 空值填充为 0
).show()

窗口函数

窗口函数在不减少行数的情况下进行聚合计算:

import org.apache.spark.sql.expressions.Window

// 定义窗口
val window = Window.partitionBy("department").orderBy($"salary".desc)

// 排名函数
df.withColumn("rank", rank().over(window)) // 排名,相同值排名相同,跳过后续排名
.withColumn("dense_rank", dense_rank().over(window)) // 排名,相同值排名相同,不跳过
.withColumn("row_number", row_number().over(window)) // 行号,连续递增
.show()

// 聚合窗口
val aggWindow = Window.partitionBy("department")
.orderBy("date")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn("running_total", sum("amount").over(aggWindow))
.withColumn("running_avg", avg("amount").over(aggWindow))
.show()

// LAG/LEAD:访问前后行
df.withColumn("prev_salary", lag("salary", 1).over(window))
.withColumn("next_salary", lead("salary", 1).over(window))
.show()

UDF 用户自定义函数

// 定义 UDF
val upperCase = udf((s: String) => s.toUpperCase)

// 注册 UDF(可在 SQL 中使用)
spark.udf.register("upperCase", upperCase)

// 在 SQL 中使用
spark.sql("SELECT upperCase(name) FROM people").show()

// 在 DataFrame API 中使用
df.withColumn("upper_name", upperCase(col("name"))).show()

UDF 性能注意事项

UDF 是黑盒操作,Catalyst 优化器无法对其进行优化。在可能的情况下,优先使用内置函数:

// 不推荐:使用 UDF
val myUpper = udf((s: String) => s.toUpperCase)
df.select(myUpper(col("name")))

// 推荐:使用内置函数
df.select(upper(col("name")))

Spark Streaming

Spark Streaming 是 Spark 的流处理组件,采用微批处理模式。

DStream 编程模型

DStream(Discretized Stream)是 Spark Streaming 的基本抽象,代表连续的数据流:

import org.apache.spark.streaming.{Seconds, StreamingContext}

// 创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(5)) // 批处理间隔 5 秒

// 从 Socket 创建 DStream
val lines = ssc.socketTextStream("localhost", 9999)

// 处理数据
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// 输出结果
wordCounts.print()

// 启动流处理
ssc.start()
ssc.awaitTermination()

Structured Streaming

Structured Streaming 是基于 Spark SQL 的流处理 API,更加易用:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

// 从 Socket 读取数据
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()

// 处理数据
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

// 输出到控制台
val query = wordCounts.writeStream
.outputMode("complete") // 输出模式
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds")) // 触发间隔
.start()

query.awaitTermination()

窗口操作

// 窗口聚合
val windowCounts = lines
.as[String]
.flatMap(_.split(" "))
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"), // 窗口大小 10 分钟,滑动步长 5 分钟
col("value")
)
.count()

// 水印:处理迟到数据
val withWatermark = lines
.withWatermark("timestamp", "10 minutes") // 允许数据迟到 10 分钟
.groupBy(
window(col("timestamp"), "10 minutes"),
col("word")
)
.count()

输出模式

模式说明适用场景
Append只输出新数据确定性场景,不允许修改历史数据
Complete输出所有结果聚合查询,结果集较小
Update只输出更新的结果聚合查询,需要区分新旧数据

与 Kafka 集成

// 从 Kafka 读取
val kafkaDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()

// 解析消息
val values = kafkaDF.select(
col("key").cast("string"),
col("value").cast("string"),
col("timestamp")
)

// 写入 Kafka
val query = result.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output")
.option("checkpointLocation", "/path/to/checkpoint")
.start()

Spark 性能调优

内存管理

Spark 的内存管理是性能调优的核心:

内存模型

Spark Executor 的堆内存分为三个区域:

┌─────────────────────────────────────────────────────┐
│ Executor Heap │
├─────────────────────┬───────────────────────────────┤
│ Reserved Memory │ Spark Memory (M) │
│ (300MB) │ ┌───────────┬──────────────┐ │
│ 用户数据结构、 │ │ Storage │ Execution │ │
│ 内部元数据 │ │ Memory │ Memory │ │
│ │ │ (R) │ (M-R) │ │
│ │ └───────────┴──────────────┘ │
└─────────────────────┴───────────────────────────────┘
  • Reserved Memory:固定 300MB,用于用户数据结构和内部元数据
  • Spark Memory:用于执行和存储
    • Storage Memory:缓存数据、广播变量
    • Execution Memory:Shuffle、Join、聚合等操作的内存

关键配置

// 内存配置
spark.executor.memory // Executor 总内存
spark.executor.memoryOverhead // 堆外内存(防止 OOM)
spark.driver.memory // Driver 内存

// 内存比例
spark.memory.fraction // Spark Memory 占比,默认 0.6
spark.memory.storageFraction // Storage Memory 占 Spark Memory 的比例,默认 0.5

内存调优建议

// 1. 如果缓存数据多,增加 Storage Memory 比例
spark.conf.set("spark.memory.storageFraction", "0.6")

// 2. 如果 Shuffle/Join 多,增加 Execution Memory 比例
spark.conf.set("spark.memory.storageFraction", "0.3")

// 3. 使用序列化存储减少内存占用
rdd.persist(StorageLevel.MEMORY_ONLY_SER)

// 4. 使用 Kryo 序列化
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

// 5. 注册自定义类以提高序列化效率
spark.conf.registerKryoClasses(Array(classOf[MyClass]))

并行度调优

// 设置默认并行度
spark.conf.set("spark.default.parallelism", "200")

// 设置 Shuffle 分区数
spark.conf.set("spark.sql.shuffle.partitions", "200")

// 代码中调整
val df = spark.read.parquet("data")
.repartition(100) // 增加分区
.coalesce(10) // 减少分区(不触发 Shuffle)

// 分区数建议:集群总 CPU 核数的 2-3 倍

数据倾斜处理

数据倾斜是 Spark 应用最常见的性能问题:

症状

  • 少数任务执行时间远超其他任务
  • 某个 Executor 内存溢出
  • 整体任务卡在 99% 完成

解决方案

// 方案1:增加 Shuffle 分区数
spark.conf.set("spark.sql.shuffle.partitions", "1000")

// 方案2:广播 Join(适合大表 join 小表)
val smallTable = spark.table("small_table").cache()
val bigTable = spark.table("big_table")
val result = bigTable.join(broadcast(smallTable), "key")

// 方案3:加盐处理(适合大表 join 大表)
import org.apache.spark.sql.functions._

// 对倾斜 Key 加盐
val salted = df.withColumn("salt", (rand() * 10).cast("int"))
.withColumn("salted_key", concat(col("key"), col("salt")))

// 扩展小表
val expanded = smallTable
.crossJoin(spark.range(0, 10).toDF("salt"))
.withColumn("salted_key", concat(col("key"), col("salt")))

// Join
val joined = salted.join(expanded, "salted_key")

// 方案4:AQE 自适应查询执行(Spark 3.0+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

广播 Join 优化

当 Join 的一方数据量较小时,使用广播 Join 可以避免 Shuffle:

// 自动广播阈值(默认 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")

// 手动广播
val smallDF = spark.table("small_table")
val bigDF = spark.table("big_table")
val result = bigDF.join(broadcast(smallDF), "key")

Spark UI 与监控

Spark UI 是性能分析和调优的重要工具:

关键页面

  • Jobs:查看所有 Job 的执行时间
  • Stages:查看 Stage 详情和任务分布
  • Storage:查看缓存使用情况
  • Environment:查看配置信息
  • Executors:查看 Executor 状态

关键指标

指标说明告警阈值
Task Duration任务执行时间个别任务远超平均值
Shuffle Read/WriteShuffle 数据量过大说明数据倾斜
GC TimeGC 时间超过任务时间的 10%
Spill to Disk内存溢写到磁盘频繁溢写说明内存不足

Spark 提交作业

spark-submit 命令

spark-submit \
--class com.example.WordCount \
--master yarn \
--deploy-mode cluster \
--executor-memory 4G \
--executor-cores 2 \
--num-executors 10 \
--driver-memory 2G \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.executor.memoryOverhead=1G \
myapp.jar \
input output

常用参数

参数说明示例
--master集群地址yarn、spark://host:7077、local[*]
--deploy-mode部署模式client、cluster
--executor-memoryExecutor 内存4G
--executor-coresExecutor 核心数2
--num-executorsExecutor 数量10
--driver-memoryDriver 内存2G
--conf配置参数spark.sql.shuffle.partitions=200

部署模式

模式说明适用场景
clientDriver 运行在提交任务的机器开发调试
clusterDriver 运行在集群中生产环境

小结

本章介绍了 Spark 的核心概念和使用方法:

  1. Spark 架构:Driver、Executor、Cluster Manager 三组件协作
  2. 执行模型:Job、Stage、Task 的层次关系,宽窄依赖与 Stage 划分
  3. RDD 编程:转换操作和行动操作,持久化与共享变量
  4. Spark SQL:DataFrame API 和 SQL 查询,窗口函数和 UDF
  5. Spark Streaming:DStream 和 Structured Streaming 流处理
  6. 性能调优:内存管理、并行度、数据倾斜处理

Spark 是目前最流行的大数据处理框架,掌握 Spark 对于大数据开发至关重要。在实际项目中,Spark 常与 Hive、Kafka、HBase 等组件配合使用。

参考资源