跳到主要内容

Spark 内存计算框架

Apache Spark是一个快速、通用、可扩展的大数据分析引擎。相比Hadoop MapReduce,Spark基于内存计算,速度可以快100倍,已成为目前最流行的大数据处理框架。

Spark 概述

什么是 Spark?

Spark是UC Berkeley AMP实验室开源的分布式计算框架,后来成为Apache顶级项目。它提供了统一的编程模型,支持批处理、流处理、机器学习、图计算等多种计算场景。

Spark的核心特点:

  1. 速度快:基于内存计算,比MapReduce快10-100倍
  2. 易用性:支持Java、Scala、Python、R多种语言
  3. 通用性:统一的API支持多种计算场景
  4. 兼容性:可运行在YARN、Mesos、Kubernetes上

Spark vs MapReduce

维度MapReduceSpark
计算模式磁盘计算内存计算
速度较慢快10-100倍
编程模型Map/Reduce两阶段RDD丰富的算子
适用场景离线批处理批处理、流处理、机器学习
迭代计算每次迭代写磁盘内存中迭代

Spark 生态组件

Spark提供了一整套生态系统:

组件功能说明
Spark Core核心引擎RDD抽象、任务调度
Spark SQL结构化数据处理SQL查询、DataFrame API
Spark Streaming流处理微批处理流数据
MLlib机器学习常用机器学习算法
GraphX图计算图数据处理

Spark 架构

运行架构

Spark采用主从架构,主要组件包括:

Driver(驱动程序)

Driver是Spark应用的主进程,负责:

  • 解析代码:将用户代码转换为任务
  • 创建SparkContext:初始化Spark应用
  • 任务调度:将任务分配给Executor执行
  • 结果收集:收集和汇总任务结果

Executor(执行器)

Executor是工作节点上的进程,负责:

  • 任务执行:执行Driver分配的任务
  • 数据存储:缓存RDD数据
  • 结果返回:将计算结果返回给Driver

Cluster Manager(集群管理器)

Cluster Manager负责资源管理:

  • Standalone:Spark自带的资源管理器
  • YARN:Hadoop的资源管理器
  • Mesos:通用的集群管理器
  • Kubernetes:容器编排平台

RDD 弹性分布式数据集

RDD(Resilient Distributed Dataset)是Spark的核心抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

RDD 五大特性

  1. 分区列表:数据集被切分为多个分区
  2. 计算函数:每个分区都有一个计算函数
  3. 依赖关系:RDD之间有依赖关系
  4. 分区器:可选的分区策略
  5. 最佳位置:数据本地性偏好

RDD 创建方式

// 方式1:从集合创建
val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))

// 方式2:从外部存储创建
val rdd2 = sc.textFile("hdfs://path/to/file")

// 方式3:从其他RDD转换
val rdd3 = rdd1.map(_ * 2)

RDD 操作类型

RDD支持两种操作类型:

转换操作(Transformation):懒执行,返回新RDD

操作说明
map对每个元素应用函数
filter过滤满足条件的元素
flatMap映射后展平
groupByKey按键分组
reduceByKey按键归约
join两个RDD连接
union合并两个RDD

行动操作(Action):触发计算,返回结果

操作说明
collect返回所有元素
count返回元素个数
reduce聚合所有元素
saveAsTextFile保存到文件
foreach对每个元素执行操作

Spark 执行流程

Spark应用的执行流程如下:

  1. 构建RDD血缘关系图(DAG)
  2. DAGScheduler将DAG划分为多个Stage
  3. TaskScheduler将Task提交给Executor
  4. Executor执行Task并返回结果

Spark Core 编程

SparkContext 初始化

SparkContext是Spark程序的入口:

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
.setAppName("MyApp")
.setMaster("local[*]") // 本地模式

val sc = new SparkContext(conf)

常用转换操作

// map:一对一转换
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val mapped = rdd.map(x => x * 2)
// 结果:2, 4, 6, 8, 10

// filter:过滤
val filtered = rdd.filter(x => x > 2)
// 结果:3, 4, 5

// flatMap:一对多转换并展平
val textRdd = sc.parallelize(Seq("hello world", "spark rdd"))
val words = textRdd.flatMap(line => line.split(" "))
// 结果:hello, world, spark, rdd

// mapPartitions:按分区处理
val partitionMapped = rdd.mapPartitions(iter => {
iter.map(x => x * 2)
})

// distinct:去重
val dupRdd = sc.parallelize(Seq(1, 1, 2, 2, 3))
val distinct = dupRdd.distinct()
// 结果:1, 2, 3

Key-Value 操作

// 创建PairRDD
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))

// reduceByKey:按键聚合
val reduced = pairs.reduceByKey(_ + _)
// 结果:(a, 4), (b, 2)

// groupByKey:按键分组
val grouped = pairs.groupByKey()
// 结果:(a, [1, 3]), (b, [2])

// mapValues:只对值进行转换
val mappedValues = pairs.mapValues(_ * 2)
// 结果:(a, 2), (b, 4), (a, 6)

// sortByKey:按键排序
val sorted = pairs.sortByKey()

// join:内连接
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2)))
val rdd2 = sc.parallelize(Seq(("a", "x"), ("b", "y")))
val joined = rdd1.join(rdd2)
// 结果:(a, (1, x)), (b, (2, y))

// leftOuterJoin:左外连接
val leftJoined = rdd1.leftOuterJoin(rdd2)

// cogroup:协同分组
val cogrouped = rdd1.cogroup(rdd2)

常用行动操作

val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))

// collect:收集所有元素
val arr = rdd.collect()
// 结果:Array(1, 2, 3, 4, 5)

// count:计数
val count = rdd.count()
// 结果:5

// reduce:聚合
val sum = rdd.reduce(_ + _)
// 结果:15

// aggregate:自定义聚合
val result = rdd.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
// 结果:(15, 5),即总和和个数

// first:取第一个元素
val first = rdd.first()
// 结果:1

// take:取前n个元素
val top3 = rdd.take(3)
// 结果:Array(1, 2, 3)

// foreach:遍历
rdd.foreach(println)

RDD 持久化

Spark支持将RDD缓存到内存或磁盘,避免重复计算:

// 缓存到内存
rdd.cache()

// 指定存储级别
import org.apache.spark.storage.StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK)

// 常用存储级别
StorageLevel.MEMORY_ONLY // 只存内存
StorageLevel.MEMORY_AND_DISK // 内存不足时存磁盘
StorageLevel.DISK_ONLY // 只存磁盘
StorageLevel.MEMORY_ONLY_SER // 内存中序列化存储

// 移除缓存
rdd.unpersist()

共享变量

Spark提供两种共享变量:

广播变量

广播变量将只读变量发送到所有节点,避免重复传输:

val broadcastVar = sc.broadcast(Array(1, 2, 3))

// 在Executor中使用
val value = broadcastVar.value

// 释放广播变量
broadcastVar.unpersist()

累加器

累加器用于聚合所有节点的值:

val acc = sc.longAccumulator("myAccumulator")

rdd.foreach(x => acc.add(x))

// 在Driver中获取值
val total = acc.value

Spark SQL

Spark SQL提供了结构化数据处理能力,支持SQL查询和DataFrame API。

DataFrame 和 Dataset

DataFrame是带有Schema的分布式Row集合,类似数据库表:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
.appName("SparkSQLDemo")
.master("local[*]")
.getOrCreate()

import spark.implicits._

// 从集合创建DataFrame
val df = Seq((1, "Alice", 25), (2, "Bob", 30))
.toDF("id", "name", "age")

// 查看Schema
df.printSchema()

// 查看数据
df.show()

// 从文件读取
val csvDF = spark.read.csv("data.csv")
val jsonDF = spark.read.json("data.json")
val parquetDF = spark.read.parquet("data.parquet")

SQL 查询

Spark SQL支持标准SQL语法:

// 注册临时视图
df.createOrReplaceTempView("people")

// 执行SQL查询
val result = spark.sql("""
SELECT name, age
FROM people
WHERE age > 25
ORDER BY age DESC
""")

result.show()

DataFrame API

// 选择列
df.select("name", "age").show()

// 过滤
df.filter($"age" > 25).show()
df.filter("age > 25").show()

// 分组聚合
df.groupBy("age")
.count()
.show()

// 排序
df.orderBy($"age".desc).show()

// 连接
val df1 = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
val df2 = Seq((1, 25), (2, 30)).toDF("id", "age")

df1.join(df2, "id").show()

// 内置函数
import org.apache.spark.sql.functions._

df.select(
col("name"),
col("age"),
lit(100).as("constant"),
upper(col("name")).as("upper_name"),
when(col("age") > 25, "senior").otherwise("junior").as("level")
).show()

// 窗口函数
import org.apache.spark.sql.expressions.Window

val window = Window.orderBy("age")
df.withColumn("rank", rank().over(window))
.withColumn("row_num", row_number().over(window))
.show()

UDF 用户自定义函数

// 注册UDF
val upperCase = udf((s: String) => s.toUpperCase)
spark.udf.register("upperCase", upperCase)

// 在SQL中使用
spark.sql("SELECT upperCase(name) FROM people").show()

// 在DataFrame API中使用
df.withColumn("upper_name", upperCase(col("name"))).show()

Spark Streaming

Spark Streaming是Spark的流处理组件,支持实时数据处理。

DStream 编程模型

DStream(Discretized Stream)是Spark Streaming的基本抽象,代表连续的数据流:

import org.apache.spark.streaming.{Seconds, StreamingContext}

val ssc = new StreamingContext(sc, Seconds(5))

// 从Socket创建DStream
val lines = ssc.socketTextStream("localhost", 9999)

// 处理数据
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// 输出结果
wordCounts.print()

// 启动流处理
ssc.start()
ssc.awaitTermination()

Structured Streaming

Structured Streaming是基于Spark SQL的流处理API,更加易用:

import org.apache.spark.sql.functions._

// 从Socket读取数据
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()

// 处理数据
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

// 输出到控制台
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()

query.awaitTermination()

窗口操作

// 窗口聚合
val windowCounts = lines
.as[String]
.flatMap(_.split(" "))
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("value")
)
.count()

// 水印:处理迟到数据
val withWatermark = lines
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "10 minutes"),
col("word")
)
.count()

输出模式

模式说明
Append只输出新数据
Complete输出所有结果
Update只输出更新的结果

Spark 性能调优

内存调优

// 调整Executor内存
spark.executor.memory // Executor内存
spark.executor.memoryOverhead // 堆外内存
spark.driver.memory // Driver内存

// 调整内存比例
spark.memory.fraction // 执行和存储内存比例
spark.memory.storageFraction // 存储内存占执行存储内存的比例

并行度调优

// 设置默认并行度
spark.default.parallelism

// 设置Shuffle分区数
spark.sql.shuffle.partitions

// 代码中设置
val df = spark.read.parquet("data")
.repartition(100) // 增加分区
.coalesce(10) // 减少分区

数据倾斜处理

// 方式1:增加Shuffle分区数
spark.conf.set("spark.sql.shuffle.partitions", "200")

// 方式2:广播Join
df1.join(broadcast(df2), "key")

// 方式3:加盐处理
val salted = df.withColumn("salt", (rand() * 10).cast("int"))
.withColumn("salted_key", concat(col("key"), col("salt")))

广播Join优化

当Join的一方数据量较小时,使用广播Join可以避免Shuffle:

// 自动广播阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760") // 10MB

// 手动广播
df1.join(broadcast(df2), "key")

Spark 提交作业

spark-submit 命令

spark-submit \
--class com.example.WordCount \
--master yarn \
--deploy-mode cluster \
--executor-memory 4G \
--executor-cores 2 \
--num-executors 10 \
--conf spark.sql.shuffle.partitions=200 \
myapp.jar \
input output

常用参数

参数说明
--master集群地址
--deploy-mode部署模式(client/cluster)
--executor-memoryExecutor内存
--executor-coresExecutor核心数
--num-executorsExecutor数量
--driver-memoryDriver内存
--conf配置参数

小结

本章介绍了Spark的核心概念和使用方法:

  1. Spark架构:Driver、Executor、Cluster Manager三组件协作
  2. RDD编程:转换操作和行动操作两大类
  3. Spark SQL:DataFrame API和SQL查询
  4. Spark Streaming:DStream和Structured Streaming流处理
  5. 性能调优:内存、并行度、数据倾斜优化

Spark是目前最流行的大数据处理框架,掌握Spark对于大数据开发至关重要。在实际项目中,Spark常与Hive、Kafka、HBase等组件配合使用。