MapReduce 基础
MapReduce 是 Hadoop 的分布式计算框架,采用分而治之的思想处理大规模数据。本章将深入讲解 MapReduce 的编程模型和工作原理。
MapReduce 概述
什么是 MapReduce?
MapReduce 是一种分布式计算编程模型,由 Google 在 2004 年提出。它的核心思想是将复杂的分布式计算分解为 Map(映射)和 Reduce(归约)两个阶段。
设计目标:
- 简化分布式编程:开发者只需关注业务逻辑,无需处理分布式细节
- 自动并行化:框架自动将任务分配到集群节点执行
- 容错处理:自动处理节点故障和任务重试
- 数据本地化:计算任务在数据所在节点执行
适用场景
MapReduce 适合以下场景:
- 离线批处理:处理大规模历史数据
- ETL 数据处理:数据抽取、转换、加载
- 日志分析:服务器日志、应用日志分析
- 数据统计:大规模数据聚合统计
- 机器学习:特征提取、模型训练
不适合的场景:
- 实时计算
- 迭代计算(如机器学习迭代训练)
- 交互式查询
编程模型
Map 和 Reduce
MapReduce 将计算过程抽象为两个阶段:
Map 阶段
Map 阶段负责数据的映射和初步处理:
- 输入:键值对
(K1, V1) - 处理:用户定义的 Map 函数对每个输入进行处理
- 输出:中间结果
(K2, V2)
// Map 函数签名
map(K1 key, V1 value, Context context)
Reduce 阶段
Reduce 阶段负责数据的汇总和最终处理:
- 输入:分组后的中间结果
(K2, List<V2>) - 处理:用户定义的 Reduce 函数对每组数据进行汇总
- 输出:最终结果
(K3, V3)
// Reduce 函数签名
reduce(K2 key, Iterable<V2> values, Context context)
数据流转
WordCount 示例
WordCount 是 MapReduce 的经典入门示例,统计文本中单词出现的次数:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCount {
public static class WordCountMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("\\s+");
for (String w : words) {
if (!w.isEmpty()) {
word.set(w);
context.write(word, one);
}
}
}
}
public static class WordCountReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
执行过程:
输入文件内容:
hello world hello hadoop
world hadoop spark
Map 阶段输出:
(hello, 1), (world, 1), (hello, 1), (hadoop, 1)
(world, 1), (hadoop, 1), (spark, 1)
Shuffle 后:
(hadoop, [1, 1])
(hello, [1, 1])
(spark, [1])
(world, [1, 1])
Reduce 阶段输出:
hadoop 2
hello 2
spark 1
world 2
MapReduce 执行流程
完整执行流程
详细步骤
1. 输入分片(Input Split)
MapReduce 首先将输入文件切分成多个分片:
- 每个分片默认大小等于 HDFS 块大小(128MB)
- 每个分片启动一个 Map 任务处理
- 分片是逻辑概念,不实际切分文件
// 自定义分片大小
conf.set("mapreduce.input.fileinputformat.split.maxsize", "67108864"); // 64MB
2. Map 执行
每个 Map 任务处理一个分片:
- 读取分片数据,调用 Map 函数
- Map 输出写入环形缓冲区
- 缓冲区达到阈值后溢写到磁盘
3. Shuffle 过程
Shuffle 是 MapReduce 最核心的过程,连接 Map 和 Reduce:
Map 端 Shuffle:
- 环形缓冲区:Map 输出先写入内存缓冲区(默认 100MB)
- 分区:根据 Key 的哈希值分配到不同 Reduce
- 排序:每个分区内按 Key 排序
- 溢写:缓冲区达到阈值(80%)时写入磁盘
- 合并:多个溢写文件合并成一个
Reduce 端 Shuffle:
- 拉取数据:Reduce 从 Map 节点拉取属于自己的分区数据
- 合并排序:将拉取的数据合并排序
- 分组:相同 Key 的数据分到一组
4. Reduce 执行
Reduce 任务处理分组后的数据:
- 对每组数据调用 Reduce 函数
- 输出结果写入 HDFS
Shuffle 配置优化
<!-- Map 端缓冲区大小 -->
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>100</value>
</property>
<!-- 溢写阈值 -->
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.80</value>
</property>
<!-- 合并文件数 -->
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>10</value>
</property>
<!-- Reduce 拉取并行数 -->
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>5</value>
</property>
<!-- Reduce 缓冲区比例 -->
<property>
<name>mapreduce.reduce.shuffle.input.buffer.percent</name>
<value>0.70</value>
</property>
MapReduce 数据类型
Writable 接口
MapReduce 使用 Writable 接口实现序列化:
| Hadoop 类型 | Java 类型 | 说明 |
|---|---|---|
| IntWritable | int | 整型 |
| LongWritable | long | 长整型 |
| FloatWritable | float | 浮点型 |
| DoubleWritable | double | 双精度 |
| Text | String | 文本 |
| BooleanWritable | boolean | 布尔型 |
| NullWritable | null | 空值 |
| BytesWritable | byte[] | 字节数组 |
自定义 Writable 类型
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Person implements Writable {
private String name;
private int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
}
@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
age = in.readInt();
}
@Override
public String toString() {
return name + "\t" + age;
}
}
自定义比较器
实现 WritableComparable 接口可以自定义排序:
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class IntPair implements WritableComparable<IntPair> {
private int first;
private int second;
public IntPair() {}
public IntPair(int first, int second) {
this.first = first;
this.second = second;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first);
out.writeInt(second);
}
@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt();
second = in.readInt();
}
@Override
public int compareTo(IntPair o) {
if (first != o.first) {
return first - o.first;
}
return second - o.second;
}
}
InputFormat 和 OutputFormat
InputFormat
InputFormat 负责读取输入数据:
| InputFormat | 说明 |
|---|---|
| TextInputFormat | 默认,每行一条记录 |
| KeyValueTextInputFormat | 每行解析为键值对 |
| NLineInputFormat | 每个分片固定行数 |
| SequenceFileInputFormat | 读取 SequenceFile |
| CombineFileInputFormat | 合并小文件 |
自定义 InputFormat:
public class CustomInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new CustomRecordReader();
}
}
public class CustomRecordReader extends RecordReader<Text, Text> {
private Text key = new Text();
private Text value = new Text();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
// 初始化
}
@Override
public boolean nextKeyValue() throws IOException {
// 读取下一条记录
return false;
}
@Override
public Text getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
@Override
public float getProgress() {
return 0;
}
@Override
public void close() throws IOException {
// 关闭资源
}
}
OutputFormat
OutputFormat 负责写入输出数据:
| OutputFormat | 说明 |
|---|---|
| TextOutputFormat | 默认,输出文本文件 |
| SequenceFileOutputFormat | 输出 SequenceFile |
| NullOutputFormat | 不输出 |
Partitioner、Combiner 和 GroupingComparator
Partitioner(分区器)
Partitioner 决定 Map 输出由哪个 Reduce 处理:
public class CustomPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
// 根据 key 决定分区
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
// 设置自定义分区器
job.setPartitionerClass(CustomPartitioner.class);
Combiner(本地合并)
Combiner 在 Map 端进行本地合并,减少网络传输:
// Combiner 通常与 Reducer 相同
job.setCombinerClass(WordCountReducer.class);
注意:Combiner 的输入输出类型必须一致,且操作必须满足结合律。
GroupingComparator(分组比较器)
GroupingComparator 决定哪些数据分为一组:
public class CustomGroupingComparator extends WritableComparator {
protected CustomGroupingComparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 自定义分组逻辑
Text t1 = (Text) a;
Text t2 = (Text) b;
return t1.compareTo(t2);
}
}
// 设置分组比较器
job.setGroupingComparatorClass(CustomGroupingComparator.class);
运行 MapReduce 作业
打包提交
# 编译打包
javac -classpath $(hadoop classpath) WordCount.java
jar -cvf wordcount.jar WordCount*.class
# 提交作业
hadoop jar wordcount.jar WordCount /input /output
通过 YARN 提交
# 指定队列
hadoop jar wordcount.jar WordCount -D mapreduce.job.queuename=default /input /output
# 指定 Map 和 Reduce 数量
hadoop jar wordcount.jar WordCount \
-D mapreduce.job.maps=10 \
-D mapreduce.job.reduces=5 \
/input /output
查看作业状态
# 查看作业列表
yarn application -list
# 查看作业详情
yarn application -status application_1234567890_0001
# 查看作业日志
yarn logs -applicationId application_1234567890_0001
# 杀死作业
yarn application -kill application_1234567890_0001
小结
本章介绍了 MapReduce 的基础知识:
-
编程模型:Map 和 Reduce 两阶段编程模型,简单易用。
-
执行流程:理解输入分片、Map 执行、Shuffle、Reduce 执行的完整流程。
-
数据类型:Writable 接口和自定义类型。
-
核心组件:Partitioner、Combiner、GroupingComparator 的作用和使用。
MapReduce 虽然编程模型简单,但可以处理大规模数据。下一章将介绍 MapReduce 的高级特性和性能优化。