跳到主要内容

MapReduce 基础

MapReduce 是 Hadoop 的分布式计算框架,采用分而治之的思想处理大规模数据。本章将深入讲解 MapReduce 的编程模型和工作原理。

MapReduce 概述

什么是 MapReduce?

MapReduce 是一种分布式计算编程模型,由 Google 在 2004 年提出。它的核心思想是将复杂的分布式计算分解为 Map(映射)和 Reduce(归约)两个阶段。

设计目标:

  1. 简化分布式编程:开发者只需关注业务逻辑,无需处理分布式细节
  2. 自动并行化:框架自动将任务分配到集群节点执行
  3. 容错处理:自动处理节点故障和任务重试
  4. 数据本地化:计算任务在数据所在节点执行

适用场景

MapReduce 适合以下场景:

  • 离线批处理:处理大规模历史数据
  • ETL 数据处理:数据抽取、转换、加载
  • 日志分析:服务器日志、应用日志分析
  • 数据统计:大规模数据聚合统计
  • 机器学习:特征提取、模型训练

不适合的场景:

  • 实时计算
  • 迭代计算(如机器学习迭代训练)
  • 交互式查询

编程模型

Map 和 Reduce

MapReduce 将计算过程抽象为两个阶段:

Map 阶段

Map 阶段负责数据的映射和初步处理:

  • 输入:键值对 (K1, V1)
  • 处理:用户定义的 Map 函数对每个输入进行处理
  • 输出:中间结果 (K2, V2)
// Map 函数签名
map(K1 key, V1 value, Context context)

Reduce 阶段

Reduce 阶段负责数据的汇总和最终处理:

  • 输入:分组后的中间结果 (K2, List<V2>)
  • 处理:用户定义的 Reduce 函数对每组数据进行汇总
  • 输出:最终结果 (K3, V3)
// Reduce 函数签名
reduce(K2 key, Iterable<V2> values, Context context)

数据流转

WordCount 示例

WordCount 是 MapReduce 的经典入门示例,统计文本中单词出现的次数:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class WordCount {

public static class WordCountMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("\\s+");
for (String w : words) {
if (!w.isEmpty()) {
word.set(w);
context.write(word, one);
}
}
}
}

public static class WordCountReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");

job.setJarByClass(WordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

执行过程:

输入文件内容:
hello world hello hadoop
world hadoop spark

Map 阶段输出:
(hello, 1), (world, 1), (hello, 1), (hadoop, 1)
(world, 1), (hadoop, 1), (spark, 1)

Shuffle 后:
(hadoop, [1, 1])
(hello, [1, 1])
(spark, [1])
(world, [1, 1])

Reduce 阶段输出:
hadoop 2
hello 2
spark 1
world 2

MapReduce 执行流程

完整执行流程

详细步骤

1. 输入分片(Input Split)

MapReduce 首先将输入文件切分成多个分片:

  • 每个分片默认大小等于 HDFS 块大小(128MB)
  • 每个分片启动一个 Map 任务处理
  • 分片是逻辑概念,不实际切分文件
// 自定义分片大小
conf.set("mapreduce.input.fileinputformat.split.maxsize", "67108864"); // 64MB

2. Map 执行

每个 Map 任务处理一个分片:

  1. 读取分片数据,调用 Map 函数
  2. Map 输出写入环形缓冲区
  3. 缓冲区达到阈值后溢写到磁盘

3. Shuffle 过程

Shuffle 是 MapReduce 最核心的过程,连接 Map 和 Reduce:

Map 端 Shuffle:

  • 环形缓冲区:Map 输出先写入内存缓冲区(默认 100MB)
  • 分区:根据 Key 的哈希值分配到不同 Reduce
  • 排序:每个分区内按 Key 排序
  • 溢写:缓冲区达到阈值(80%)时写入磁盘
  • 合并:多个溢写文件合并成一个

Reduce 端 Shuffle:

  • 拉取数据:Reduce 从 Map 节点拉取属于自己的分区数据
  • 合并排序:将拉取的数据合并排序
  • 分组:相同 Key 的数据分到一组

4. Reduce 执行

Reduce 任务处理分组后的数据:

  1. 对每组数据调用 Reduce 函数
  2. 输出结果写入 HDFS

Shuffle 配置优化

<!-- Map 端缓冲区大小 -->
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>100</value>
</property>

<!-- 溢写阈值 -->
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.80</value>
</property>

<!-- 合并文件数 -->
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>10</value>
</property>

<!-- Reduce 拉取并行数 -->
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>5</value>
</property>

<!-- Reduce 缓冲区比例 -->
<property>
<name>mapreduce.reduce.shuffle.input.buffer.percent</name>
<value>0.70</value>
</property>

MapReduce 数据类型

Writable 接口

MapReduce 使用 Writable 接口实现序列化:

Hadoop 类型Java 类型说明
IntWritableint整型
LongWritablelong长整型
FloatWritablefloat浮点型
DoubleWritabledouble双精度
TextString文本
BooleanWritableboolean布尔型
NullWritablenull空值
BytesWritablebyte[]字节数组

自定义 Writable 类型

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Person implements Writable {
private String name;
private int age;

public Person() {}

public Person(String name, int age) {
this.name = name;
this.age = age;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
}

@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
age = in.readInt();
}

@Override
public String toString() {
return name + "\t" + age;
}
}

自定义比较器

实现 WritableComparable 接口可以自定义排序:

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class IntPair implements WritableComparable<IntPair> {
private int first;
private int second;

public IntPair() {}

public IntPair(int first, int second) {
this.first = first;
this.second = second;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first);
out.writeInt(second);
}

@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt();
second = in.readInt();
}

@Override
public int compareTo(IntPair o) {
if (first != o.first) {
return first - o.first;
}
return second - o.second;
}
}

InputFormat 和 OutputFormat

InputFormat

InputFormat 负责读取输入数据:

InputFormat说明
TextInputFormat默认,每行一条记录
KeyValueTextInputFormat每行解析为键值对
NLineInputFormat每个分片固定行数
SequenceFileInputFormat读取 SequenceFile
CombineFileInputFormat合并小文件

自定义 InputFormat:

public class CustomInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new CustomRecordReader();
}
}

public class CustomRecordReader extends RecordReader<Text, Text> {
private Text key = new Text();
private Text value = new Text();

@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
// 初始化
}

@Override
public boolean nextKeyValue() throws IOException {
// 读取下一条记录
return false;
}

@Override
public Text getCurrentKey() {
return key;
}

@Override
public Text getCurrentValue() {
return value;
}

@Override
public float getProgress() {
return 0;
}

@Override
public void close() throws IOException {
// 关闭资源
}
}

OutputFormat

OutputFormat 负责写入输出数据:

OutputFormat说明
TextOutputFormat默认,输出文本文件
SequenceFileOutputFormat输出 SequenceFile
NullOutputFormat不输出

Partitioner、Combiner 和 GroupingComparator

Partitioner(分区器)

Partitioner 决定 Map 输出由哪个 Reduce 处理:

public class CustomPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
// 根据 key 决定分区
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

// 设置自定义分区器
job.setPartitionerClass(CustomPartitioner.class);

Combiner(本地合并)

Combiner 在 Map 端进行本地合并,减少网络传输:

// Combiner 通常与 Reducer 相同
job.setCombinerClass(WordCountReducer.class);

注意:Combiner 的输入输出类型必须一致,且操作必须满足结合律。

GroupingComparator(分组比较器)

GroupingComparator 决定哪些数据分为一组:

public class CustomGroupingComparator extends WritableComparator {
protected CustomGroupingComparator() {
super(Text.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
// 自定义分组逻辑
Text t1 = (Text) a;
Text t2 = (Text) b;
return t1.compareTo(t2);
}
}

// 设置分组比较器
job.setGroupingComparatorClass(CustomGroupingComparator.class);

运行 MapReduce 作业

打包提交

# 编译打包
javac -classpath $(hadoop classpath) WordCount.java
jar -cvf wordcount.jar WordCount*.class

# 提交作业
hadoop jar wordcount.jar WordCount /input /output

通过 YARN 提交

# 指定队列
hadoop jar wordcount.jar WordCount -D mapreduce.job.queuename=default /input /output

# 指定 Map 和 Reduce 数量
hadoop jar wordcount.jar WordCount \
-D mapreduce.job.maps=10 \
-D mapreduce.job.reduces=5 \
/input /output

查看作业状态

# 查看作业列表
yarn application -list

# 查看作业详情
yarn application -status application_1234567890_0001

# 查看作业日志
yarn logs -applicationId application_1234567890_0001

# 杀死作业
yarn application -kill application_1234567890_0001

小结

本章介绍了 MapReduce 的基础知识:

  1. 编程模型:Map 和 Reduce 两阶段编程模型,简单易用。

  2. 执行流程:理解输入分片、Map 执行、Shuffle、Reduce 执行的完整流程。

  3. 数据类型:Writable 接口和自定义类型。

  4. 核心组件:Partitioner、Combiner、GroupingComparator 的作用和使用。

MapReduce 虽然编程模型简单,但可以处理大规模数据。下一章将介绍 MapReduce 的高级特性和性能优化。