MapReduce 高级特性
本章将深入讲解 MapReduce 的高级特性,包括 Shuffle 机制优化、自定义组件、计数器、以及性能调优技巧。
Shuffle 机制深入
Map 端 Shuffle 详解
Map 端 Shuffle 是 MapReduce 性能的关键因素:
环形缓冲区
Map 输出首先写入内存中的环形缓冲区:
| 配置项 | 默认值 | 说明 |
|---|---|---|
mapreduce.task.io.sort.mb | 100MB | 缓冲区大小 |
mapreduce.map.sort.spill.percent | 0.80 | 溢写阈值 |
环形缓冲区的工作原理:
- 缓冲区同时存储数据和索引
- 数据从一端写入,索引从另一端写入
- 当数据区域达到阈值时开始溢写
优化建议:
- 增大缓冲区可以减少溢写次数
- 但不要超过 JVM 堆内存的合理比例
- 一般设置为 100-256MB
溢写过程
当缓冲区达到阈值时:
- 锁定缓冲区:阻止新数据写入
- 分区排序:按分区和 Key 排序
- 可选 Combiner:执行本地合并
- 写入磁盘:生成溢写文件
- 释放缓冲区:继续接收新数据
文件合并
Map 任务完成后,将所有溢写文件合并:
<!-- 合并因子,一次合并的文件数 -->
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>10</value>
</property>
Reduce 端 Shuffle 详解
Reduce 端 Shuffle 从 Map 节点拉取数据:
数据拉取
Reduce 启动多个线程并行拉取数据:
<!-- 拉取线程数 -->
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>5</value>
</property>
<!-- 拉取超时时间 -->
<property>
<name>mapreduce.reduce.shuffle.read.timeout</name>
<value>180000</value>
</property>
内存管理
Reduce 端内存分配:
<!-- 拉取数据占用的内存比例 -->
<property>
<name>mapreduce.reduce.shuffle.input.buffer.percent</name>
<value>0.70</value>
</property>
<!-- 内存中合并的阈值 -->
<property>
<name>mapreduce.reduce.shuffle.merge.percent</name>
<value>0.66</value>
</property>
<!-- Reduce 计算占用内存比例 -->
<property>
<name>mapreduce.reduce.input.buffer.percent</name>
<value>0.0</value>
</property>
自定义组件
自定义 InputFormat
当需要读取特殊格式的输入数据时,可以自定义 InputFormat:
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new WholeFileRecordReader();
}
}
public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private Text key = new Text();
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
key.set(file.getName());
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public Text getCurrentKey() { return key; }
@Override
public BytesWritable getCurrentValue() { return value; }
@Override
public float getProgress() { return processed ? 1.0f : 0.0f; }
@Override
public void close() throws IOException { }
}
自定义 OutputFormat
当需要输出特殊格式时,可以自定义 OutputFormat:
public class MySQLOutputFormat extends OutputFormat<Text, Text> {
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) {
return new MySQLRecordWriter();
}
@Override
public void checkOutputSpecs(JobContext context) { }
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
return new FileOutputCommitter(null, context);
}
static class MySQLRecordWriter extends RecordWriter<Text, Text> {
private Connection connection;
private PreparedStatement statement;
public MySQLRecordWriter() {
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost/db", "user", "pass");
statement = connection.prepareStatement("INSERT INTO output VALUES (?, ?)");
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, Text value) throws IOException {
try {
statement.setString(1, key.toString());
statement.setString(2, value.toString());
statement.executeUpdate();
} catch (SQLException e) {
throw new IOException(e);
}
}
@Override
public void close(TaskAttemptContext context) throws IOException {
try {
statement.close();
connection.close();
} catch (SQLException e) {
throw new IOException(e);
}
}
}
}
自定义 Partitioner
实现自定义分区逻辑:
public class DomainPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
String domain = extractDomain(key.toString());
return (domain.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
private String extractDomain(String url) {
try {
URL u = new URL(url);
return u.getHost();
} catch (Exception e) {
return "unknown";
}
}
}
自定义 Combiner
Combiner 可以显著减少网络传输:
public class MaxTemperatureCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
Combiner 使用注意事项:
- Combiner 输入输出类型必须一致
- 操作必须满足结合律(如求和、求最大值)
- 求平均值等操作不能使用 Combiner
自定义 WritableComparator
自定义排序和分组逻辑:
public class TextComparator extends WritableComparator {
protected TextComparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Text t1 = (Text) a;
Text t2 = (Text) b;
return t2.compareTo(t1);
}
}
public class FirstGroupingComparator extends WritableComparator {
protected FirstGroupingComparator() {
super(TextPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TextPair tp1 = (TextPair) a;
TextPair tp2 = (TextPair) b;
return tp1.getFirst().compareTo(tp2.getFirst());
}
}
计数器
内置计数器
MapReduce 提供了丰富的内置计数器:
| 计数器组 | 计数器 | 说明 |
|---|---|---|
| MAP_INPUT_RECORDS | 输入记录数 | Map 读取的记录数 |
| MAP_OUTPUT_RECORDS | 输出记录数 | Map 输出的记录数 |
| MAP_OUTPUT_BYTES | 输出字节数 | Map 输出的字节数 |
| REDUCE_INPUT_RECORDS | 输入记录数 | Reduce 读取的记录数 |
| REDUCE_OUTPUT_RECORDS | 输出记录数 | Reduce 输出的记录数 |
| SPILLED_RECORDS | 溢写记录数 | 溢写到磁盘的记录数 |
自定义计数器
public enum MyCounters {
TOTAL_RECORDS,
INVALID_RECORDS,
MISSING_FIELDS
}
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(MyCounters.TOTAL_RECORDS).increment(1);
String[] fields = value.toString().split(",");
if (fields.length < 3) {
context.getCounter(MyCounters.MISSING_FIELDS).increment(1);
return;
}
if (!isValid(fields)) {
context.getCounter(MyCounters.INVALID_RECORDS).increment(1);
return;
}
// 正常处理
}
}
动态计数器
context.getCounter("GroupName", "CounterName").increment(1);
查看计数器
# 通过命令行查看
hadoop job -history /output/_logs/history/job_xxx
# 通过 Web UI 查看
# 访问 http://resourcemanager:8088
性能调优
Map 任务调优
内存配置
<!-- Map 任务 JVM 堆内存 -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
<!-- Map 任务 JVM 参数 -->
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx1638m</value>
</property>
环形缓冲区
<!-- 增大缓冲区减少溢写 -->
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>256</value>
</property>
<!-- 调整溢写阈值 -->
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.80</value>
</property>
小文件优化
<!-- 合并小文件 -->
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>134217728</value>
</property>
<!-- 使用 CombineFileInputFormat -->
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>268435456</value>
</property>
Reduce 任务调优
内存配置
<!-- Reduce 任务 JVM 堆内存 -->
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>
<!-- Reduce 任务 JVM 参数 -->
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx3276m</value>
</property>
Shuffle 优化
<!-- 增加拉取并行度 -->
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>10</value>
</property>
<!-- 调整内存比例 -->
<property>
<name>mapreduce.reduce.shuffle.input.buffer.percent</name>
<value>0.70</value>
</property>
<!-- 增加 Reduce 计算缓冲 -->
<property>
<name>mapreduce.reduce.input.buffer.percent</name>
<value>0.30</value>
</property>
任务数量优化
Map 任务数量
Map 任务数量由输入分片决定:
- 分片大小 = max(minSize, min(maxSize, blockSize))
- 默认分片大小 = HDFS 块大小
<!-- 增大分片,减少 Map 数量 -->
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>268435456</value>
</property>
<!-- 减小分片,增加 Map 数量 -->
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value>67108864</value>
</property>
Reduce 任务数量
<!-- 设置 Reduce 数量 -->
<property>
<name>mapreduce.job.reduces</name>
<value>10</value>
</property>
Reduce 数量选择原则:
- 每个 Reduce 处理 1-5GB 数据为宜
- Reduce 数量不宜过多,会增加 Shuffle 开销
- Reduce 数量不宜过少,会降低并行度
数据压缩
启用压缩可以减少磁盘 IO 和网络传输:
<!-- 启用 Map 输出压缩 -->
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<!-- Map 输出压缩编码器 -->
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
<!-- 启用 Reduce 输出压缩 -->
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<!-- Reduce 输出压缩编码器 -->
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>
压缩编码器选择:
| 编码器 | 压缩比 | 压缩速度 | 是否可分割 | 适用场景 |
|---|---|---|---|---|
| Gzip | 高 | 慢 | 否 | 最终输出 |
| Snappy | 低 | 快 | 否 | Map 输出 |
| LZO | 中 | 快 | 是 | 大文件 |
| Bzip2 | 最高 | 最慢 | 是 | 归档存储 |
推测执行
当任务执行缓慢时,可以启动备份任务:
<!-- 启用 Map 推测执行 -->
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
</property>
<!-- 启用 Reduce 推测执行 -->
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
</property>
注意:Reduce 推测执行可能导致数据重复,谨慎使用。
JVM 重用
在 YARN 中配置 JVM 重用:
<!-- 每个 JVM 执行的任务数 -->
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>-1</value>
</property>
常见问题排查
数据倾斜
现象:大部分 Reduce 很快完成,个别 Reduce 执行很慢。
解决方案:
- 增加 Reduce 数量:分散数据
- 自定义 Partitioner:均匀分布数据
- 局部聚合:先 Map 端聚合,再 Reduce 聚合
public class SkewPartitioner extends Partitioner<Text, IntWritable> {
private static final String[] PREFIXES = {"a-m", "n-z"};
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
char first = Character.toLowerCase(key.toString().charAt(0));
if (first >= 'a' && first <= 'm') {
return 0 % numReduceTasks;
} else {
return 1 % numReduceTasks;
}
}
}
内存溢出
现象:任务失败,日志显示 OOM 错误。
解决方案:
- 增加任务内存配置
- 优化代码,减少内存占用
- 调整缓冲区大小
小文件过多
现象:Map 任务数量过多,执行效率低。
解决方案:
- 使用 CombineFileInputFormat 合并小文件
- 预处理合并小文件
- 使用 SequenceFile 存储小文件
小结
本章介绍了 MapReduce 的高级特性:
-
Shuffle 优化:理解 Map 端和 Reduce 端 Shuffle 机制,合理配置参数。
-
自定义组件:自定义 InputFormat、OutputFormat、Partitioner 等组件。
-
计数器:使用计数器监控任务执行情况。
-
性能调优:内存配置、压缩、推测执行等优化技巧。
-
问题排查:数据倾斜、内存溢出、小文件等常见问题的解决方案。
掌握这些高级特性,可以更好地优化 MapReduce 作业性能,处理复杂的数据处理需求。