Spark 内存计算框架
Apache Spark是一个快速、通用、可扩展的大数据分析引擎。相比Hadoop MapReduce,Spark基于内存计算,速度可以快100倍,已成为目前最流行的大数据处理框架。
Spark 概述
什么是 Spark?
Spark是UC Berkeley AMP实验室开源的分布式计算框架,后来成为Apache顶级项目。它提供了统一的编程模型,支持批处理、流处理、机器学习、图计算等多种计算场景。
Spark的核心特点:
- 速度快:基于内存计算,比MapReduce快10-100倍
- 易用性:支持Java、Scala、Python、R多种语言
- 通用性:统一的API支持多种计算场景
- 兼容性:可运行在YARN、Mesos、Kubernetes上
Spark vs MapReduce
| 维度 | MapReduce | Spark |
|---|---|---|
| 计算模式 | 磁盘计算 | 内存计算 |
| 速度 | 较慢 | 快10-100倍 |
| 编程模型 | Map/Reduce两阶段 | RDD丰富的算子 |
| 适用场景 | 离线批处理 | 批处理、流处理、机器学习 |
| 迭代计算 | 每次迭代写磁盘 | 内存中迭代 |
Spark 生态组件
Spark提供了一整套生态系统:
| 组件 | 功能 | 说明 |
|---|---|---|
| Spark Core | 核心引擎 | RDD抽象、任务调度 |
| Spark SQL | 结构化数据处理 | SQL查询、DataFrame API |
| Spark Streaming | 流处理 | 微批处理流数据 |
| MLlib | 机器学习 | 常用机器学习算法 |
| GraphX | 图计算 | 图数据处理 |
Spark 架构
运行架构
Spark采用主从架构,主要组件包括:
Driver(驱动程序)
Driver是Spark应用的主进程,负责:
- 解析代码:将用户代码转换为任务
- 创建SparkContext:初始化Spark应用
- 任务调度:将任务分配给Executor执行
- 结果收集:收集和汇总任务结果
Executor(执行器)
Executor是工作节点上的进程,负责:
- 任务执行:执行Driver分配的任务
- 数据存储:缓存RDD数据
- 结果返回:将计算结果返回给Driver
Cluster Manager(集群管理器)
Cluster Manager负责资源管理:
- Standalone:Spark自带的资源管理器
- YARN:Hadoop的资源管理器
- Mesos:通用的集群管理器
- Kubernetes:容器编排平台
RDD 弹性分布式数据集
RDD(Resilient Distributed Dataset)是Spark的核心抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD 五大特性
- 分区列表:数据集被切分为多个分区
- 计算函数:每个分区都有一个计算函数
- 依赖关系:RDD之间有依赖关系
- 分区器:可选的分区策略
- 最佳位置:数据本地性偏好
RDD 创建方式
// 方式1:从集合创建
val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
// 方式2:从外部存储创建
val rdd2 = sc.textFile("hdfs://path/to/file")
// 方式3:从其他RDD转换
val rdd3 = rdd1.map(_ * 2)
RDD 操作类型
RDD支持两种操作类型:
转换操作(Transformation):懒执行,返回新RDD
| 操作 | 说明 |
|---|---|
| map | 对每个元素应用函数 |
| filter | 过滤满足条件的元素 |
| flatMap | 映射后展平 |
| groupByKey | 按键分组 |
| reduceByKey | 按键归约 |
| join | 两个RDD连接 |
| union | 合并两个RDD |
行动操作(Action):触发计算,返回结果
| 操作 | 说明 |
|---|---|
| collect | 返回所有元素 |
| count | 返回元素个数 |
| reduce | 聚合所有元素 |
| saveAsTextFile | 保存到文件 |
| foreach | 对每个元素执行操作 |
Spark 执行流程
Spark应用的执行流程如下:
- 构建RDD血缘关系图(DAG)
- DAGScheduler将DAG划分为多个Stage
- TaskScheduler将Task提交给Executor
- Executor执行Task并返回结果
Spark Core 编程
SparkContext 初始化
SparkContext是Spark程序的入口:
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf()
.setAppName("MyApp")
.setMaster("local[*]") // 本地模式
val sc = new SparkContext(conf)
常用转换操作
// map:一对一转换
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val mapped = rdd.map(x => x * 2)
// 结果:2, 4, 6, 8, 10
// filter:过滤
val filtered = rdd.filter(x => x > 2)
// 结果:3, 4, 5
// flatMap:一对多转换并展平
val textRdd = sc.parallelize(Seq("hello world", "spark rdd"))
val words = textRdd.flatMap(line => line.split(" "))
// 结果:hello, world, spark, rdd
// mapPartitions:按分区处理
val partitionMapped = rdd.mapPartitions(iter => {
iter.map(x => x * 2)
})
// distinct:去重
val dupRdd = sc.parallelize(Seq(1, 1, 2, 2, 3))
val distinct = dupRdd.distinct()
// 结果:1, 2, 3
Key-Value 操作
// 创建PairRDD
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
// reduceByKey:按键聚合
val reduced = pairs.reduceByKey(_ + _)
// 结果:(a, 4), (b, 2)
// groupByKey:按键分组
val grouped = pairs.groupByKey()
// 结果:(a, [1, 3]), (b, [2])
// mapValues:只对值进行转换
val mappedValues = pairs.mapValues(_ * 2)
// 结果:(a, 2), (b, 4), (a, 6)
// sortByKey:按键排序
val sorted = pairs.sortByKey()
// join:内连接
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2)))
val rdd2 = sc.parallelize(Seq(("a", "x"), ("b", "y")))
val joined = rdd1.join(rdd2)
// 结果:(a, (1, x)), (b, (2, y))
// leftOuterJoin:左外连接
val leftJoined = rdd1.leftOuterJoin(rdd2)
// cogroup:协同分组
val cogrouped = rdd1.cogroup(rdd2)
常用行动操作
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
// collect:收集所有元素
val arr = rdd.collect()
// 结果:Array(1, 2, 3, 4, 5)
// count:计数
val count = rdd.count()
// 结果:5
// reduce:聚合
val sum = rdd.reduce(_ + _)
// 结果:15
// aggregate:自定义聚合
val result = rdd.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
// 结果:(15, 5),即总和和个数
// first:取第一个元素
val first = rdd.first()
// 结果:1
// take:取前n个元素
val top3 = rdd.take(3)
// 结果:Array(1, 2, 3)
// foreach:遍历
rdd.foreach(println)
RDD 持久化
Spark支持将RDD缓存到内存或磁盘,避免重复计算:
// 缓存到内存
rdd.cache()
// 指定存储级别
import org.apache.spark.storage.StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK)
// 常用存储级别
StorageLevel.MEMORY_ONLY // 只存内存
StorageLevel.MEMORY_AND_DISK // 内存不足时存磁盘
StorageLevel.DISK_ONLY // 只存磁盘
StorageLevel.MEMORY_ONLY_SER // 内存中序列化存储
// 移除缓存
rdd.unpersist()
共享变量
Spark提供两种共享变量:
广播变量
广播变量将只读变量发送到所有节点,避免重复传输:
val broadcastVar = sc.broadcast(Array(1, 2, 3))
// 在Executor中使用
val value = broadcastVar.value
// 释放广播变量
broadcastVar.unpersist()
累加器
累加器用于聚合所有节点的值:
val acc = sc.longAccumulator("myAccumulator")
rdd.foreach(x => acc.add(x))
// 在Driver中获取值
val total = acc.value
Spark SQL
Spark SQL提供了结构化数据处理能力,支持SQL查询和DataFrame API。
DataFrame 和 Dataset
DataFrame是带有Schema的分布式Row集合,类似数据库表:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("SparkSQLDemo")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// 从集合创建DataFrame
val df = Seq((1, "Alice", 25), (2, "Bob", 30))
.toDF("id", "name", "age")
// 查看Schema
df.printSchema()
// 查看数据
df.show()
// 从文件读取
val csvDF = spark.read.csv("data.csv")
val jsonDF = spark.read.json("data.json")
val parquetDF = spark.read.parquet("data.parquet")
SQL 查询
Spark SQL支持标准SQL语法:
// 注册临时视图
df.createOrReplaceTempView("people")
// 执行SQL查询
val result = spark.sql("""
SELECT name, age
FROM people
WHERE age > 25
ORDER BY age DESC
""")
result.show()
DataFrame API
// 选择列
df.select("name", "age").show()
// 过滤
df.filter($"age" > 25).show()
df.filter("age > 25").show()
// 分组聚合
df.groupBy("age")
.count()
.show()
// 排序
df.orderBy($"age".desc).show()
// 连接
val df1 = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
val df2 = Seq((1, 25), (2, 30)).toDF("id", "age")
df1.join(df2, "id").show()
// 内置函数
import org.apache.spark.sql.functions._
df.select(
col("name"),
col("age"),
lit(100).as("constant"),
upper(col("name")).as("upper_name"),
when(col("age") > 25, "senior").otherwise("junior").as("level")
).show()
// 窗口函数
import org.apache.spark.sql.expressions.Window
val window = Window.orderBy("age")
df.withColumn("rank", rank().over(window))
.withColumn("row_num", row_number().over(window))
.show()
UDF 用户自定义函数
// 注册UDF
val upperCase = udf((s: String) => s.toUpperCase)
spark.udf.register("upperCase", upperCase)
// 在SQL中使用
spark.sql("SELECT upperCase(name) FROM people").show()
// 在DataFrame API中使用
df.withColumn("upper_name", upperCase(col("name"))).show()
Spark Streaming
Spark Streaming是Spark的流处理组件,支持实时数据处理。
DStream 编程模型
DStream(Discretized Stream)是Spark Streaming的基本抽象,代表连续的数据流:
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(5))
// 从Socket创建DStream
val lines = ssc.socketTextStream("localhost", 9999)
// 处理数据
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// 输出结果
wordCounts.print()
// 启动流处理
ssc.start()
ssc.awaitTermination()
Structured Streaming
Structured Streaming是基于Spark SQL的流处理API,更加易用:
import org.apache.spark.sql.functions._
// 从Socket读取数据
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 处理数据
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
// 输出到控制台
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
窗口操作
// 窗口聚合
val windowCounts = lines
.as[String]
.flatMap(_.split(" "))
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("value")
)
.count()
// 水印:处理迟到数据
val withWatermark = lines
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "10 minutes"),
col("word")
)
.count()
输出模式
| 模式 | 说明 |
|---|---|
| Append | 只输出新数据 |
| Complete | 输出所有结果 |
| Update | 只输出更新的结果 |
Spark 性能调优
内存调优
// 调整Executor内存
spark.executor.memory // Executor内存
spark.executor.memoryOverhead // 堆外内存
spark.driver.memory // Driver内存
// 调整内存比例
spark.memory.fraction // 执行和存储内存比例
spark.memory.storageFraction // 存储内存占执行存储内存的比例
并行度调优
// 设置默认并行度
spark.default.parallelism
// 设置Shuffle分区数
spark.sql.shuffle.partitions
// 代码中设置
val df = spark.read.parquet("data")
.repartition(100) // 增加分区
.coalesce(10) // 减少分区
数据倾斜处理
// 方式1:增加Shuffle分区数
spark.conf.set("spark.sql.shuffle.partitions", "200")
// 方式2:广播Join
df1.join(broadcast(df2), "key")
// 方式3:加盐处理
val salted = df.withColumn("salt", (rand() * 10).cast("int"))
.withColumn("salted_key", concat(col("key"), col("salt")))
广播Join优化
当Join的一方数据量较小时,使用广播Join可以避免Shuffle:
// 自动广播阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760") // 10MB
// 手动广播
df1.join(broadcast(df2), "key")
Spark 提交作业
spark-submit 命令
spark-submit \
--class com.example.WordCount \
--master yarn \
--deploy-mode cluster \
--executor-memory 4G \
--executor-cores 2 \
--num-executors 10 \
--conf spark.sql.shuffle.partitions=200 \
myapp.jar \
input output
常用参数
| 参数 | 说明 |
|---|---|
| --master | 集群地址 |
| --deploy-mode | 部署模式(client/cluster) |
| --executor-memory | Executor内存 |
| --executor-cores | Executor核心数 |
| --num-executors | Executor数量 |
| --driver-memory | Driver内存 |
| --conf | 配置参数 |
小结
本章介绍了Spark的核心概念和使用方法:
- Spark架构:Driver、Executor、Cluster Manager三组件协作
- RDD编程:转换操作和行动操作两大类
- Spark SQL:DataFrame API和SQL查询
- Spark Streaming:DStream和Structured Streaming流处理
- 性能调优:内存、并行度、数据倾斜优化
Spark是目前最流行的大数据处理框架,掌握Spark对于大数据开发至关重要。在实际项目中,Spark常与Hive、Kafka、HBase等组件配合使用。