跳到主要内容

MapReduce 高级特性

本章将深入讲解 MapReduce 的高级特性,包括 Shuffle 机制优化、自定义组件、计数器、以及性能调优技巧。

Shuffle 机制深入

Map 端 Shuffle 详解

Map 端 Shuffle 是 MapReduce 性能的关键因素:

环形缓冲区

Map 输出首先写入内存中的环形缓冲区:

配置项默认值说明
mapreduce.task.io.sort.mb100MB缓冲区大小
mapreduce.map.sort.spill.percent0.80溢写阈值

环形缓冲区的工作原理:

  1. 缓冲区同时存储数据和索引
  2. 数据从一端写入,索引从另一端写入
  3. 当数据区域达到阈值时开始溢写

优化建议:

  • 增大缓冲区可以减少溢写次数
  • 但不要超过 JVM 堆内存的合理比例
  • 一般设置为 100-256MB

溢写过程

当缓冲区达到阈值时:

  1. 锁定缓冲区:阻止新数据写入
  2. 分区排序:按分区和 Key 排序
  3. 可选 Combiner:执行本地合并
  4. 写入磁盘:生成溢写文件
  5. 释放缓冲区:继续接收新数据

文件合并

Map 任务完成后,将所有溢写文件合并:

<!-- 合并因子,一次合并的文件数 -->
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>10</value>
</property>

Reduce 端 Shuffle 详解

Reduce 端 Shuffle 从 Map 节点拉取数据:

数据拉取

Reduce 启动多个线程并行拉取数据:

<!-- 拉取线程数 -->
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>5</value>
</property>

<!-- 拉取超时时间 -->
<property>
<name>mapreduce.reduce.shuffle.read.timeout</name>
<value>180000</value>
</property>

内存管理

Reduce 端内存分配:

<!-- 拉取数据占用的内存比例 -->
<property>
<name>mapreduce.reduce.shuffle.input.buffer.percent</name>
<value>0.70</value>
</property>

<!-- 内存中合并的阈值 -->
<property>
<name>mapreduce.reduce.shuffle.merge.percent</name>
<value>0.66</value>
</property>

<!-- Reduce 计算占用内存比例 -->
<property>
<name>mapreduce.reduce.input.buffer.percent</name>
<value>0.0</value>
</property>

自定义组件

自定义 InputFormat

当需要读取特殊格式的输入数据时,可以自定义 InputFormat:

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {

@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}

@Override
public RecordReader<Text, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new WholeFileRecordReader();
}
}

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private Text key = new Text();
private BytesWritable value = new BytesWritable();
private boolean processed = false;

@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
key.set(file.getName());
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}

@Override
public Text getCurrentKey() { return key; }

@Override
public BytesWritable getCurrentValue() { return value; }

@Override
public float getProgress() { return processed ? 1.0f : 0.0f; }

@Override
public void close() throws IOException { }
}

自定义 OutputFormat

当需要输出特殊格式时,可以自定义 OutputFormat:

public class MySQLOutputFormat extends OutputFormat<Text, Text> {

@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) {
return new MySQLRecordWriter();
}

@Override
public void checkOutputSpecs(JobContext context) { }

@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
return new FileOutputCommitter(null, context);
}

static class MySQLRecordWriter extends RecordWriter<Text, Text> {
private Connection connection;
private PreparedStatement statement;

public MySQLRecordWriter() {
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost/db", "user", "pass");
statement = connection.prepareStatement("INSERT INTO output VALUES (?, ?)");
} catch (SQLException e) {
e.printStackTrace();
}
}

@Override
public void write(Text key, Text value) throws IOException {
try {
statement.setString(1, key.toString());
statement.setString(2, value.toString());
statement.executeUpdate();
} catch (SQLException e) {
throw new IOException(e);
}
}

@Override
public void close(TaskAttemptContext context) throws IOException {
try {
statement.close();
connection.close();
} catch (SQLException e) {
throw new IOException(e);
}
}
}
}

自定义 Partitioner

实现自定义分区逻辑:

public class DomainPartitioner extends Partitioner<Text, Text> {

@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
String domain = extractDomain(key.toString());
return (domain.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

private String extractDomain(String url) {
try {
URL u = new URL(url);
return u.getHost();
} catch (Exception e) {
return "unknown";
}
}
}

自定义 Combiner

Combiner 可以显著减少网络传输:

public class MaxTemperatureCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}

Combiner 使用注意事项:

  • Combiner 输入输出类型必须一致
  • 操作必须满足结合律(如求和、求最大值)
  • 求平均值等操作不能使用 Combiner

自定义 WritableComparator

自定义排序和分组逻辑:

public class TextComparator extends WritableComparator {

protected TextComparator() {
super(Text.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
Text t1 = (Text) a;
Text t2 = (Text) b;
return t2.compareTo(t1);
}
}

public class FirstGroupingComparator extends WritableComparator {

protected FirstGroupingComparator() {
super(TextPair.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
TextPair tp1 = (TextPair) a;
TextPair tp2 = (TextPair) b;
return tp1.getFirst().compareTo(tp2.getFirst());
}
}

计数器

内置计数器

MapReduce 提供了丰富的内置计数器:

计数器组计数器说明
MAP_INPUT_RECORDS输入记录数Map 读取的记录数
MAP_OUTPUT_RECORDS输出记录数Map 输出的记录数
MAP_OUTPUT_BYTES输出字节数Map 输出的字节数
REDUCE_INPUT_RECORDS输入记录数Reduce 读取的记录数
REDUCE_OUTPUT_RECORDS输出记录数Reduce 输出的记录数
SPILLED_RECORDS溢写记录数溢写到磁盘的记录数

自定义计数器

public enum MyCounters {
TOTAL_RECORDS,
INVALID_RECORDS,
MISSING_FIELDS
}

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MyCounters.TOTAL_RECORDS).increment(1);

String[] fields = value.toString().split(",");
if (fields.length < 3) {
context.getCounter(MyCounters.MISSING_FIELDS).increment(1);
return;
}

if (!isValid(fields)) {
context.getCounter(MyCounters.INVALID_RECORDS).increment(1);
return;
}

// 正常处理
}
}

动态计数器

context.getCounter("GroupName", "CounterName").increment(1);

查看计数器

# 通过命令行查看
hadoop job -history /output/_logs/history/job_xxx

# 通过 Web UI 查看
# 访问 http://resourcemanager:8088

性能调优

Map 任务调优

内存配置

<!-- Map 任务 JVM 堆内存 -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>

<!-- Map 任务 JVM 参数 -->
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx1638m</value>
</property>

环形缓冲区

<!-- 增大缓冲区减少溢写 -->
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>256</value>
</property>

<!-- 调整溢写阈值 -->
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.80</value>
</property>

小文件优化

<!-- 合并小文件 -->
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>134217728</value>
</property>

<!-- 使用 CombineFileInputFormat -->
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>268435456</value>
</property>

Reduce 任务调优

内存配置

<!-- Reduce 任务 JVM 堆内存 -->
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>

<!-- Reduce 任务 JVM 参数 -->
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx3276m</value>
</property>

Shuffle 优化

<!-- 增加拉取并行度 -->
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>10</value>
</property>

<!-- 调整内存比例 -->
<property>
<name>mapreduce.reduce.shuffle.input.buffer.percent</name>
<value>0.70</value>
</property>

<!-- 增加 Reduce 计算缓冲 -->
<property>
<name>mapreduce.reduce.input.buffer.percent</name>
<value>0.30</value>
</property>

任务数量优化

Map 任务数量

Map 任务数量由输入分片决定:

  • 分片大小 = max(minSize, min(maxSize, blockSize))
  • 默认分片大小 = HDFS 块大小
<!-- 增大分片,减少 Map 数量 -->
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>268435456</value>
</property>

<!-- 减小分片,增加 Map 数量 -->
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>67108864</value>
</property>

Reduce 任务数量

<!-- 设置 Reduce 数量 -->
<property>
<name>mapreduce.job.reduces</name>
<value>10</value>
</property>

Reduce 数量选择原则:

  • 每个 Reduce 处理 1-5GB 数据为宜
  • Reduce 数量不宜过多,会增加 Shuffle 开销
  • Reduce 数量不宜过少,会降低并行度

数据压缩

启用压缩可以减少磁盘 IO 和网络传输:

<!-- 启用 Map 输出压缩 -->
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>

<!-- Map 输出压缩编码器 -->
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

<!-- 启用 Reduce 输出压缩 -->
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>

<!-- Reduce 输出压缩编码器 -->
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>

压缩编码器选择:

编码器压缩比压缩速度是否可分割适用场景
Gzip最终输出
SnappyMap 输出
LZO大文件
Bzip2最高最慢归档存储

推测执行

当任务执行缓慢时,可以启动备份任务:

<!-- 启用 Map 推测执行 -->
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
</property>

<!-- 启用 Reduce 推测执行 -->
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
</property>

注意:Reduce 推测执行可能导致数据重复,谨慎使用。

JVM 重用

在 YARN 中配置 JVM 重用:

<!-- 每个 JVM 执行的任务数 -->
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>-1</value>
</property>

常见问题排查

数据倾斜

现象:大部分 Reduce 很快完成,个别 Reduce 执行很慢。

解决方案:

  1. 增加 Reduce 数量:分散数据
  2. 自定义 Partitioner:均匀分布数据
  3. 局部聚合:先 Map 端聚合,再 Reduce 聚合
public class SkewPartitioner extends Partitioner<Text, IntWritable> {
private static final String[] PREFIXES = {"a-m", "n-z"};

@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
char first = Character.toLowerCase(key.toString().charAt(0));
if (first >= 'a' && first <= 'm') {
return 0 % numReduceTasks;
} else {
return 1 % numReduceTasks;
}
}
}

内存溢出

现象:任务失败,日志显示 OOM 错误。

解决方案:

  1. 增加任务内存配置
  2. 优化代码,减少内存占用
  3. 调整缓冲区大小

小文件过多

现象:Map 任务数量过多,执行效率低。

解决方案:

  1. 使用 CombineFileInputFormat 合并小文件
  2. 预处理合并小文件
  3. 使用 SequenceFile 存储小文件

小结

本章介绍了 MapReduce 的高级特性:

  1. Shuffle 优化:理解 Map 端和 Reduce 端 Shuffle 机制,合理配置参数。

  2. 自定义组件:自定义 InputFormat、OutputFormat、Partitioner 等组件。

  3. 计数器:使用计数器监控任务执行情况。

  4. 性能调优:内存配置、压缩、推测执行等优化技巧。

  5. 问题排查:数据倾斜、内存溢出、小文件等常见问题的解决方案。

掌握这些高级特性,可以更好地优化 MapReduce 作业性能,处理复杂的数据处理需求。