Hadoop 基础
Hadoop是Apache基金会开发的开源分布式计算平台,是大数据技术的基石。它提供了可靠的分布式存储(HDFS)和分布式计算(MapReduce)能力,能够处理PB级别的海量数据。
Hadoop 概述
什么是 Hadoop?
Hadoop是一个由Apache基金会开发的分布式系统基础架构,主要解决海量数据的存储和分析计算问题。用户可以在不了解分布式底层细节的情况下,开发分布式程序,利用集群的威力进行高速运算和存储。
Hadoop的核心设计目标是:
- 高可靠性:通过数据多副本和自动故障转移机制保证数据安全
- 高扩展性:可以通过增加节点线性扩展集群容量
- 高效性:采用分布式并行计算,加快处理速度
- 高容错性:自动将失败的任务重新分配
Hadoop 三大核心组件
Hadoop由三个核心组件构成,它们各司其职又紧密协作:
| 组件 | 全称 | 功能 |
|---|---|---|
| HDFS | Hadoop Distributed File System | 分布式文件存储 |
| MapReduce | MapReduce | 分布式计算框架 |
| YARN | Yet Another Resource Negotiator | 资源调度管理 |
Hadoop 版本演进
| 版本 | 特点 |
|---|---|
| Hadoop 1.x | HDFS + MapReduce,计算和资源调度耦合 |
| Hadoop 2.x | 引入YARN,实现资源调度与计算分离 |
| Hadoop 3.x | 支持纠删码、多NameNode、容器化优化 |
HDFS 分布式文件系统
HDFS(Hadoop Distributed File System)是Hadoop的分布式文件存储系统,设计用于在通用硬件上运行,提供高吞吐量的数据访问。
HDFS 设计目标
HDFS 的设计基于以下几个核心假设和目标:
硬件故障是常态
在大型集群中,硬件故障是常态而非例外。一个 HDFS 实例可能由数百甚至数千台服务器组成,每台服务器都存储着部分数据。由于组件数量庞大,每个组件都有一定的故障概率,这意味着系统中总有某些组件处于故障状态。因此,快速检测故障并自动恢复是 HDFS 架构的核心目标。
流式数据访问
运行在 HDFS 上的应用需要流式访问数据集。这些应用不是通用程序,而是专门针对批处理设计的。HDFS 的设计更侧重于高吞吐量的数据访问,而非低延迟的数据访问。为了提高数据吞吐率,HDFS 放宽了一些 POSIX 语义要求。
大数据集
HDFS 应用处理的是大数据集。典型文件大小为 GB 到 TB 级别。因此,HDFS 针对大文件进行了优化,提供高聚合数据带宽,支持单集群数百个节点、数千万个文件。
简单的一致性模型
HDFS 应用采用"一次写入,多次读取"的文件访问模型。文件创建、写入和关闭后,除了追加和截断外,不需要再修改。这个假设简化了数据一致性问题,并实现了高吞吐量的数据访问。MapReduce 应用或网络爬虫应用非常符合这个模型。
移动计算比移动数据更划算
如果计算操作在数据附近执行,效率会更高。这在数据集规模巨大时尤其明显。这种方式可以最小化网络拥塞,提高系统整体吞吐量。HDFS 提供接口,让应用可以将计算移动到数据所在位置。
核心设计目标总结
- 适合大文件存储:适合存储GB到TB级别的大文件
- 流式数据访问:一次写入,多次读取的数据访问模式
- 商用硬件:运行在普通服务器上,通过冗余保证可靠性
- 高容错:自动处理节点故障,数据自动复制
- 高吞吐量:优先保证数据访问的高吞吐,而非低延迟
HDFS 架构
HDFS采用主从架构(Master-Slave),主要由以下组件构成:
NameNode(名称节点)
NameNode是HDFS的主节点,负责管理文件系统的元数据:
- 文件目录树:维护整个文件系统的目录结构
- 文件块映射:记录每个文件由哪些数据块组成
- 数据块位置:记录每个数据块存储在哪些DataNode上
- 访问控制:管理文件的权限信息
NameNode将元数据保存在内存中以提高访问速度,同时持久化到磁盘的FsImage和EditLog文件中。
FsImage 和 EditLog:
NameNode 使用两个核心文件来持久化元数据:
- FsImage(文件系统镜像):存储整个文件系统命名空间的快照,包括文件到数据块的映射、文件属性等
- EditLog(编辑日志):记录对文件系统的所有修改操作,如创建文件、删除文件、修改副本数等
当 NameNode 启动时,它会读取 FsImage,然后重放 EditLog 中的所有操作,最终得到最新的文件系统状态。为了避免 EditLog 过大导致启动时间过长,SecondaryNameNode(或 Standby NameNode)会定期执行检查点操作,将 EditLog 合并到 FsImage 中。
元数据内存结构:
NameNode 在内存中维护以下数据结构:
内存中的元数据:
├── 命名空间(Namespace)
│ ├── 目录树结构
│ └── 文件属性(权限、时间戳等)
├── 块映射(BlockMap)
│ ├── 文件 → 数据块列表
│ └── 数据块 → DataNode 列表
└── DataNode 信息
├── 心跳状态
└── 块报告
DataNode(数据节点)
DataNode是HDFS的工作节点,负责实际的数据存储:
- 数据块存储:将文件切分成固定大小的块(默认128MB)存储
- 数据块读写:响应客户端的读写请求
- 心跳汇报:定期向NameNode发送心跳和块报告
- 数据块复制:根据NameNode指令复制数据块
DataNode 将每个数据块存储在本地文件系统的单独文件中。DataNode 不知道文件的内容,它只是把每个数据块当作一个独立的文件来管理。DataNode 不会把所有文件都放在同一个目录下,而是使用启发式方法来确定每个目录的最优文件数量,并相应地创建子目录。
Secondary NameNode(辅助节点)
Secondary NameNode不是NameNode的热备,它的主要作用是:
- 合并FsImage和EditLog:定期合并元数据,减少EditLog大小
- 检查点创建:创建元数据检查点,加快NameNode启动
- 元数据备份:提供元数据的冷备份
检查点工作流程:
1. Secondary NameNode 通知 NameNode 滚动 EditLog
2. NameNode 创建新的 EditLog 文件继续写入
3. Secondary NameNode 下载 FsImage 和旧的 EditLog
4. Secondary NameNode 将 EditLog 应用到 FsImage
5. Secondary NameNode 将新的 FsImage 上传回 NameNode
6. NameNode 使用新的 FsImage 替换旧的
数据块与副本机制
数据块(Block)
HDFS将文件切分成固定大小的块进行存储:
- 默认块大小:128MB(Hadoop 2.x/3.x)
- 设计原因:减少元数据占用、减少寻址时间、支持大文件
为什么块大小是128MB?
HDFS 的块大小设置需要平衡多个因素:
- 太小:会导致大量小文件,增加 NameNode 内存压力;同时增加寻址时间比例
- 太大:会导致并行度降低,MapReduce 任务数减少;同时小文件浪费空间
128MB 是一个折中的选择,适合大多数场景。对于大规模数据集,可以设置为 256MB 或更大。
块大小的配置:
<property>
<name>dfs.blocksize</name>
<value>134217728</value> <!-- 128MB -->
</property>
副本机制
HDFS通过多副本机制保证数据可靠性:
- 默认副本数:3
- 副本放置策略:
副本放置策略对 HDFS 的可靠性和性能至关重要。这是 HDFS 与其他分布式文件系统的主要区别之一。机架感知的副本放置策略旨在提高数据可靠性、可用性和网络带宽利用率。
三副本放置策略:
当副本因子为 3 时,HDFS 的放置策略是:
- 第一个副本:如果写入者在 DataNode 上,则放在本地机器;否则放在与写入者同机架的随机 DataNode 上
- 第二个副本:放在不同(远程)机架的节点上
- 第三个副本:放在第二个副本所在机架的不同节点上
这种策略减少了机架间的写入流量,通常能提高写入性能。机架故障的概率远低于节点故障概率,因此该策略不会影响数据可靠性和可用性保证。
客户端 → DataNode1(机架A)→ DataNode2(机架B)→ DataNode3(机架B)
第一个副本 第二个副本 第三个副本
副本选择策略:
为了最小化全局带宽消耗和读取延迟,HDFS 会尝试从距离读取者最近的副本读取数据。如果存在与读取者同机架的副本,则优先选择该副本。如果 HDFS 集群跨越多个数据中心,则优先选择本地数据中心的副本。
副本流水线写入
当客户端向 HDFS 文件写入数据(副本因子为 3)时,写入流程如下:
- NameNode 返回一个 DataNode 列表(包含 3 个 DataNode)
- 客户端连接第一个 DataNode 开始写入
- 第一个 DataNode 接收数据,写入本地,同时转发给第二个 DataNode
- 第二个 DataNode 接收数据,写入本地,同时转发给第三个 DataNode
- 第三个 DataNode 接收数据,写入本地
- 所有 DataNode 确认写入完成后,客户端确认写入成功
客户端 ──写入──→ DN1 ──转发──→ DN2 ──转发──→ DN3
│ │ │
↓ ↓ ↓
本地存储 本地存储 本地存储
这种流水线写入方式使得数据可以并行传输,提高了写入效率。
机架感知
机架感知是 HDFS 优化数据可靠性和网络带宽利用率的关键机制。
机架感知的作用
- 提高可靠性:将副本分布在不同机架,防止整个机架故障导致数据丢失
- 优化带宽:减少跨机架的数据传输
- 负载均衡:合理分布数据,避免热点
机架感知配置
Hadoop 需要知道每个 DataNode 所属的机架。可以通过两种方式配置:
方式一:脚本配置
<!-- core-site.xml -->
<property>
<name>net.topology.script.file.name</name>
<value>/path/to/rack-topology.sh</value>
</property>
脚本示例:
#!/bin/bash
# rack-topology.sh
# 输入:IP地址或主机名
# 输出:机架名称
case $1 in
192.168.1.101|192.168.1.102)
echo "/rack1"
;;
192.168.1.103|192.168.1.104)
echo "/rack2"
;;
*)
echo "/default-rack"
;;
esac
方式二:Java 类配置
<property>
<name>net.topology.node.switch.mapping.impl</name>
<value>org.apache.hadoop.net.TableMapping</value>
</property>
<property>
<name>net.topology.table.file.name</name>
<value>/path/to/topology.data</value>
</property>
topology.data 文件格式:
192.168.1.101 /rack1
192.168.1.102 /rack1
192.168.1.103 /rack2
192.168.1.104 /rack2
HDFS 高可用
生产环境中,HDFS 集群需要高可用(HA)配置,避免单点故障。
高可用架构
HDFS 高可用通过 Active/Standby NameNode 实现:
- 主备切换:Active NameNode 故障时,Standby 自动接管
- 共享存储:使用 Quorum Journal Manager 或 NFS 共享 EditLog
- ZooKeeper 协调:自动故障检测和主备选举
HDFS HA 架构:
┌─────────────────┐
│ ZooKeeper │
│ 集群(选举) │
└────────┬────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ NameNode Active │ │NameNode Standby │ │NameNode Standby │
│ (当前主节点) │ │ (备节点1) │ │ (备节点2) │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌────────┴────────┐
│ JournalNode 集群 │
│ (EditLog存储) │
└────────┬────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ DataNode 1 │ │ DataNode 2 │ │ DataNode 3 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
JournalNode
JournalNode 是一组独立的守护进程,用于存储 EditLog:
- 数量:至少 3 个(允许 1 个故障)
- 作用:Active NameNode 写入 EditLog,Standby NameNode 读取并同步
- 一致性:使用 Paxos 算法保证数据一致性
自动故障转移
使用 ZooKeeper 和 ZKFC(ZooKeeper Failover Controller)实现自动故障转移:
<!-- hdfs-site.xml -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>zk1:2181,zk2:2181,zk3:2181</value>
</property>
故障转移流程:
- Active NameNode 的 ZKFC 在 ZooKeeper 上维护一个临时节点
- Active NameNode 故障时,临时节点被删除
- Standby NameNode 的 ZKFC 检测到节点变化
- Standby NameNode 通过 ZooKeeper 选举成为新的 Active
- 新的 Active NameNode 接管服务
多 NameNode 支持(Hadoop 3.x)
Hadoop 3.x 支持配置多个 Standby NameNode,提供更高的容错能力:
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2,nn3</value>
</property>
例如,配置 3 个 NameNode 和 5 个 JournalNode,可以容忍 2 个节点故障。
安全模式
NameNode 启动时会进入一个特殊状态,称为安全模式(Safemode)。在安全模式下,数据块的复制不会发生。NameNode 接收来自 DataNode 的心跳和块报告(Blockreport)。块报告包含 DataNode 上托管的数据块列表。每个数据块都有指定的最小副本数。当最小副本数的数据块向 NameNode 报告后,该数据块被认为已安全复制。
当可配置百分比的数据块已安全复制(加上额外的 30 秒等待时间),NameNode 会退出安全模式。然后它确定仍然少于指定副本数的数据块列表,并开始复制这些数据块到其他 DataNode。
安全模式相关命令:
# 查看安全模式状态
hdfs dfsadmin -safemode get
# 手动进入安全模式
hdfs dfsadmin -safemode enter
# 手动离开安全模式
hdfs dfsadmin -safemode leave
# 等待安全模式结束
hdfs dfsadmin -safemode wait
安全模式配置:
<property>
<name>dfs.namenode.safemode.threshold-pct</name>
<value>0.999f</value>
<!-- 安全复制的数据块百分比阈值 -->
</property>
<property>
<name>dfs.namenode.safemode.extension</name>
<value>30000</value>
<!-- 达到阈值后额外等待时间(毫秒) -->
</property>
数据完整性
从 DataNode 获取的数据块可能会损坏。这种损坏可能由存储设备故障、网络故障或软件错误引起。
HDFS 客户端软件对 HDFS 文件内容实现了校验和检查:
- 创建 HDFS 文件时,客户端计算每个数据块的校验和,并存储在同一个 HDFS 命名空间的隐藏文件中
- 检索文件内容时,验证从 DataNode 接收的数据是否与校验和匹配
- 如果不匹配,客户端可以选择从其他拥有副本的 DataNode 获取该数据块
数据完整性验证示例:
# 检查文件系统的健康状况
hdfs fsck /path/to/file
# 检查并显示损坏的块
hdfs fsck / -list-corruptfileblocks
# 检查块详情
hdfs fsck /path/to/file -files -blocks -locations
HDFS 文件读写流程
文件读取流程
客户端读取HDFS文件的步骤如下:
读取流程:
1. Client 调用 FileSystem.open() 打开文件
2. NameNode 返回文件块位置信息(按距离排序)
3. Client 连接最近的 DataNode 读取数据
4. 读取完一个块后,继续读取下一个块
5. 读取完成后关闭连接
详细步骤:
- 打开文件:客户端调用
open()方法,向 NameNode 请求打开文件 - 获取块位置:NameNode 返回文件的所有数据块位置信息,包括每个块的副本所在的 DataNode 列表
- 选择 DataNode:客户端根据网络拓扑选择最近的 DataNode
- 读取数据:客户端建立到 DataNode 的连接,读取数据块
- 验证校验和:客户端验证数据的校验和
- 继续读取:如果数据损坏或连接失败,客户端尝试从其他副本读取
// HDFS文件读取示例
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path filePath = new Path("/user/data/example.txt");
FSDataInputStream inputStream = fs.open(filePath);
BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
reader.close();
fs.close();
文件写入流程
客户端写入HDFS文件的步骤如下:
写入流程:
1. Client 调用 FileSystem.create() 创建文件
2. NameNode 检查权限并创建文件元数据
3. NameNode 返回 DataNode 列表用于写入
4. Client 建立到 DataNode 的流水线
5. 数据以包(packet)为单位发送
6. DataNode 确认后,Client 继续发送下一个包
7. 写入完成后,Client 调用 close() 关闭文件
8. NameNode 记录文件创建完成
详细步骤:
- 创建文件:客户端调用
create()方法,向 NameNode 请求创建文件 - 权限验证:NameNode 检查客户端是否有权限创建文件
- 选择 DataNode:NameNode 根据副本放置策略选择一组 DataNode
- 建立流水线:客户端按顺序连接 DataNode,形成写入流水线
- 写入数据:客户端将数据分成 64KB 的包(packet),依次发送到流水线
- 确认应答:每个 DataNode 写入成功后返回确认
- 完成写入:客户端发送完成信号,NameNode 记录文件元数据
// HDFS文件写入示例
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path filePath = new Path("/user/data/output.txt");
FSDataOutputStream outputStream = fs.create(filePath);
String content = "Hello, HDFS!";
outputStream.write(content.getBytes());
outputStream.close();
fs.close();
HDFS 常用命令
HDFS提供了丰富的命令行工具,用于文件操作:
# 查看帮助
hdfs dfs -help
# 创建目录
hdfs dfs -mkdir -p /user/hadoop/data
# 上传文件
hdfs dfs -put localfile.txt /user/hadoop/data/
hdfs dfs -copyFromLocal localfile.txt /user/hadoop/data/
hdfs dfs -moveFromLocal localfile.txt /user/hadoop/data/ # 上传后删除本地文件
# 下载文件
hdfs dfs -get /user/hadoop/data/remotefile.txt ./
hdfs dfs -copyToLocal /user/hadoop/data/remotefile.txt ./
# 列出目录内容
hdfs dfs -ls /user/hadoop/data/
hdfs dfs -ls -R /user/hadoop/data/ # 递归列出
# 查看文件内容
hdfs dfs -cat /user/hadoop/data/file.txt
hdfs dfs -tail /user/hadoop/data/file.txt # 查看文件末尾
hdfs dfs -head /user/hadoop/data/file.txt # 查看文件开头
# 删除文件或目录
hdfs dfs -rm /user/hadoop/data/file.txt
hdfs dfs -rm -r /user/hadoop/data/olddir
hdfs dfs -rm -skipTrash /user/hadoop/data/file.txt # 不放入回收站
# 查看文件大小
hdfs dfs -du -h /user/hadoop/data/
hdfs dfs -du -s -h /user/hadoop/data/ # 汇总大小
# 设置副本数
hdfs dfs -setrep 3 /user/hadoop/data/file.txt
hdfs dfs -setrep -R 3 /user/hadoop/data/ # 递归设置
# 查看文件块信息
hdfs fsck /user/hadoop/data/file.txt -files -blocks -locations
# 合并下载
hdfs dfs -getmerge /user/hadoop/data/* localfile.txt
纠删码(Erasure Coding,Hadoop 3.x)
Hadoop 3.x 引入纠删码作为副本机制的替代方案,可以显著节省存储空间。
纠删码原理
纠删码是一种数据编码技术,将数据分成数据块和校验块:
- 副本机制:3 副本意味着 3 倍存储开销
- 纠删码:如 Reed-Solomon (10,4) 编码,只有 1.4 倍存储开销
工作原理:
原始数据:[D1, D2, D3, D4, D5, D6, D7, D8, D9, D10]
↓ 纠删码编码
存储数据:[D1, D2, D3, D4, D5, D6, D7, D8, D9, D10, P1, P2, P3, P4]
├────────── 数据块 (10个) ──────────┤ ├── 校验块 (4个) ──┤
可以容忍任意 4 个块丢失,通过剩余块恢复数据
纠删码配置
<!-- 启用纠删码策略 -->
<property>
<name>dfs.namenode.ec.policies.enabled</name>
<value>XOR-2-1-1024k, RS-6-3-1024k, RS-10-4-1024k</value>
</property>
纠删码策略
| 策略 | 数据块:校验块 | 存储开销 | 容忍故障数 |
|---|---|---|---|
| RS-6-3 | 6:3 | 1.5x | 3 个块 |
| RS-10-4 | 10:4 | 1.4x | 4 个块 |
| XOR-2-1 | 2:1 | 1.5x | 1 个块 |
纠删码使用
# 查看可用的纠删码策略
hdfs ec -listPolicies
# 为目录设置纠删码策略
hdfs ec -setPolicy -path /data/archive -policy RS-6-3-1024k
# 取消纠删码策略
hdfs ec -unsetPolicy -path /data/archive
# 查看目录的纠删码策略
hdfs ec -getPolicy -path /data/archive
适用场景:
- 冷数据存储:不频繁访问的归档数据
- 大文件:纠删码对小文件开销较大
- 网络带宽充足:数据恢复需要较多网络传输
HDFS 存储类型和存储策略(Hadoop 3.x)
Hadoop 3.x 支持异构存储,可以根据数据热度选择不同的存储介质。
存储类型
| 存储类型 | 说明 | 适用场景 |
|---|---|---|
| RAM_DISK | 内存磁盘 | 最热数据 |
| SSD | 固态硬盘 | 热数据 |
| DISK | 普通硬盘 | 温数据 |
| ARCHIVE | 归档存储 | 冷数据 |
存储策略
| 策略 | 说明 | 数据存放 |
|---|---|---|
| Hot | 热数据策略 | DISK |
| Warm | 温数据策略 | DISK + ARCHIVE |
| Cold | 冷数据策略 | ARCHIVE |
| All_SSD | 全SSD策略 | SSD |
| One_SSD | 单SSD策略 | SSD + DISK |
| Lazy_Persist | 内存持久化策略 | RAM_DISK + DISK |
存储策略使用
# 查看存储策略列表
hdfs storagepolicies -listPolicies
# 设置存储策略
hdfs storagepolicies -setStoragePolicy -path /data/hot -policy Hot
# 获取存储策略
hdfs storagepolicies -getStoragePolicy -path /data/hot
# 数据迁移到符合策略的存储
hdfs mover /data/hot
MapReduce 分布式计算框架
MapReduce是一种分布式计算编程模型,将复杂的分布式计算分解为Map(映射)和Reduce(归约)两个阶段。
MapReduce 编程模型
MapReduce的核心思想是将计算任务分解为两个阶段:
- Map阶段:将输入数据映射为键值对,进行并行处理
- Reduce阶段:对Map输出的中间结果进行汇总
输入数据 -> Map -> (K1, V1) -> Shuffle -> (K2, List<V2>) -> Reduce -> 输出结果
MapReduce 工作流程
完整的MapReduce执行流程包括以下步骤:
- 输入分片:将输入文件切分成多个分片(Split)
- Map任务:每个分片启动一个Map任务处理
- Shuffle过程:对Map输出进行分区、排序、合并
- Reduce任务:对Shuffle后的数据进行汇总
- 输出结果:将Reduce结果写入HDFS
输入分片(InputSplit)
InputSplit 是 MapReduce 处理数据的基本单位:
- 分片大小:默认等于 HDFS 块大小(128MB)
- 分片数量:决定 Map 任务的数量
- 不可跨文件:一个分片不能包含多个文件的数据(除非使用 CombineFileInputFormat)
分片大小计算:
splitSize = max(minSize, min(maxSize, blockSize))
其中:
- minSize = mapreduce.input.fileinputformat.split.minsize
- maxSize = mapreduce.input.fileinputformat.split.maxsize
- blockSize = HDFS 块大小
小文件问题:
当输入文件很小时,会产生大量小分片,导致:
- Map 任务数量过多,任务调度开销大
- NameNode 元数据压力大
解决方案:
- 使用 CombineFileInputFormat 合并小文件
- 预处理合并小文件
WordCount 示例详解
WordCount是MapReduce的经典入门示例,统计文本中单词出现的次数:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
// Mapper类:输入偏移量和行内容,输出单词和计数1
// Mapper<输入Key类型, 输入Value类型, 输出Key类型, 输出Value类型>
public class WordCountMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* map方法:每个分片的每行数据调用一次
* @param key 输入key,通常是行在文件中的字节偏移量
* @param value 输入value,行内容
* @param context 上下文对象,用于输出键值对
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 将每行文本按空格分割成单词
String[] words = value.toString().split("\\s+");
for (String w : words) {
// 过滤空字符串
if (w.length() > 0) {
word.set(w);
// 输出 (单词, 1)
context.write(word, one);
}
}
}
}
// Reducer类:汇总每个单词的出现次数
// Reducer<输入Key类型, 输入Value类型, 输出Key类型, 输出Value类型>
public class WordCountReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
/**
* reduce方法:每个Key调用一次
* @param key 单词
* @param values 该单词所有的计数值(迭代器)
* @param context 上下文对象,用于输出结果
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
// 累加所有值
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
// 输出 (单词, 总次数)
context.write(key, result);
}
}
// Driver类:配置和提交作业
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// 创建配置对象
Configuration conf = new Configuration();
// 创建Job对象,Job是MapReduce作业的封装
Job job = Job.getInstance(conf, "word count");
// 设置作业类(用于查找JAR文件)
job.setJarByClass(WordCountDriver.class);
// 设置Mapper和Reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Combiner类(可选,用于本地聚合优化)
job.setCombinerClass(WordCountReducer.class);
// 设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交作业并等待完成
// waitForCompletion参数:true表示打印进度信息
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Shuffle 机制详解
Shuffle是MapReduce中最核心的过程,它连接Map和Reduce阶段:
Map端Shuffle
-
环形缓冲区:Map输出先写入内存缓冲区(默认100MB)
- 缓冲区大小由
mapreduce.task.io.sort.mb配置 - 数据在缓冲区中按分区和Key排序
- 缓冲区大小由
-
溢写(Spill):缓冲区达到阈值(80%)时,开始溢写到磁盘
- 溢写阈值由
mapreduce.map.sort.spill.percent配置 - 溢写过程中,Map任务继续向缓冲区写入数据
- 溢写阈值由
-
分区排序:溢写过程中按分区和键排序
- 分区数等于Reduce任务数
- 同一分区内按Key排序
-
合并(Merge):多个溢写文件合并成一个大文件
- 合并过程中进行排序
- 如果配置了Combiner,合并时执行局部聚合
Map端Shuffle流程:
Map输出
↓
环形缓冲区(100MB)
↓ 达到80%阈值
分区 + 排序
↓
溢写到磁盘(Spill File)
↓ 多次溢写
合并(Merge)
↓ 可选Combiner
Map输出文件(已分区、已排序)
Reduce端Shuffle
-
拉取数据(Copy):Reduce任务从Map节点拉取属于自己的分区数据
- 多个线程并行拉取
- 拉取的数据先存入内存缓冲区
-
合并(Merge):将拉取的数据合并排序
- 内存达到阈值时溢写到磁盘
- 最终合并成一个有序的文件
-
分组(Group):相同Key的数据分到一组
- 调用Reducer的reduce方法处理
Reduce端Shuffle流程:
Map输出文件
↓ Copy阶段
内存缓冲区
↓ 达到阈值
磁盘文件(Spill)
↓ Merge阶段
排序后的文件
↓ Group阶段
Reducer处理
Shuffle 配置优化
<!-- Map端配置 -->
<!-- 环形缓冲区大小,增大可减少溢写次数 -->
<property>
<name>mapreduce.task.io.sort.mb</name>
<value>100</value>
</property>
<!-- 溢写阈值,增大可减少溢写次数但增加内存压力 -->
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.80</value>
</property>
<!-- 合并因子,增大可减少合并次数但增加内存使用 -->
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>10</value>
</property>
<!-- Reduce端配置 -->
<!-- Reduce任务同时拉取的Map输出数量 -->
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>5</value>
</property>
<!-- Reduce任务内存缓冲区占堆内存的比例 -->
<property>
<name>mapreduce.reduce.shuffle.input.buffer.percent</name>
<value>0.70</value>
</property>
<!-- 内存缓冲区数据达到此比例时开始合并溢写 -->
<property>
<name>mapreduce.reduce.shuffle.merge.percent</name>
<value>0.66</value>
</property>
Combiner 优化
Combiner 是 Map 端的局部聚合,可以减少 Shuffle 数据量:
适用场景:
- 聚合操作:如求和、计数
- Combiner 输入输出类型相同
- 操作满足结合律:
combine(combine(a, b), c) = combine(a, combine(b, c))
不适用场景:
- 求平均值:
(a+b)/2 ≠ (a/2 + b/2) - 求最大/最小值的一部分(需要完整数据)
// 设置Combiner(使用Reducer作为Combiner)
job.setCombinerClass(WordCountReducer.class);
// 自定义Combiner(如果逻辑与Reducer不同)
public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) {
// 局部聚合逻辑
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
Partitioner 分区器
Partitioner 决定 Map 输出的键值对发送到哪个 Reduce 任务:
默认分区器:HashPartitioner
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
自定义分区器:
public class CustomPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
// 按首字母分区
char firstChar = key.toString().charAt(0);
if (firstChar >= 'a' && firstChar <= 'z') {
return (firstChar - 'a') % numReduceTasks;
}
return 0;
}
}
// 使用自定义分区器
job.setPartitionerClass(CustomPartitioner.class);
InputFormat 和 OutputFormat
InputFormat
InputFormat 决定如何读取输入数据:
| InputFormat | 说明 | 输入类型 |
|---|---|---|
| TextInputFormat | 默认,每行一条记录 | Key=字节偏移量,Value=行内容 |
| KeyValueTextInputFormat | 按分隔符分割Key和Value | Key=第一个分隔符前,Value=其后 |
| NLineInputFormat | 每个Map处理固定行数 | 类似TextInputFormat |
| CombineFileInputFormat | 合并小文件 | 用于处理大量小文件 |
| SequenceFileInputFormat | 读取SequenceFile | 二进制格式,高效 |
// 设置InputFormat
job.setInputFormatClass(TextInputFormat.class);
// 设置每行最大长度
TextInputFormat.setMaxInputSplitSize(job, 128 * 1024 * 1024);
// 使用NLineInputFormat,每个Map处理N行
NLineInputFormat.setNumLinesPerSplit(job, 1000);
job.setInputFormatClass(NLineInputFormat.class);
OutputFormat
OutputFormat 决定如何写入输出数据:
| OutputFormat | 说明 |
|---|---|
| TextOutputFormat | 默认,输出文本文件 |
| SequenceFileOutputFormat | 输出二进制SequenceFile |
| MultipleOutputs | 输出到多个文件 |
| LazyOutputFormat | 延迟创建输出文件 |
// 设置OutputFormat
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出分隔符
TextOutputFormat.setOutputPath(job, new Path("/output"));
MapReduce 数据类型
MapReduce使用Writable接口实现序列化:
| 数据类型 | Java类型 | 说明 |
|---|---|---|
| IntWritable | int | 整型 |
| LongWritable | long | 长整型 |
| FloatWritable | float | 浮点型 |
| DoubleWritable | double | 双精度 |
| Text | String | 文本 |
| BooleanWritable | boolean | 布尔型 |
| NullWritable | null | 空值 |
| BytesWritable | byte[] | 字节数组 |
自定义 Writable 类型:
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PersonWritable implements Writable {
private Text name;
private IntWritable age;
public PersonWritable() {
this.name = new Text();
this.age = new IntWritable();
}
public PersonWritable(String name, int age) {
this.name = new Text(name);
this.age = new IntWritable(age);
}
@Override
public void write(DataOutput out) throws IOException {
name.write(out);
age.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
name.readFields(in);
age.readFields(in);
}
@Override
public String toString() {
return name.toString() + "\t" + age.get();
}
}
MapReduce 性能优化
任务数量优化
<!-- Map任务数量:由输入分片决定 -->
<!-- 减少Map数量:增大最小分片大小 -->
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value>268435456</value> <!-- 256MB -->
</property>
<!-- Reduce任务数量 -->
<property>
<name>mapreduce.job.reduces</name>
<value>10</value>
</property>
Reduce 数量建议:
- 通常设置为集群 Slot 数的 0.95 或 1.75 倍
- 过少:Reduce 任务负载过重
- 过多:小文件过多,增加调度开销
压缩优化
MapReduce 支持在多个阶段启用压缩:
<!-- Map输出压缩 -->
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
<!-- Reduce输出压缩 -->
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>
压缩编解码器对比:
| 编解码器 | 压缩比 | 压缩速度 | 解压速度 | 是否可分割 |
|---|---|---|---|---|
| Gzip | 高 | 慢 | 快 | 否 |
| Snappy | 中 | 快 | 快 | 否 |
| LZO | 中 | 快 | 快 | 是(需建索引) |
| Bzip2 | 最高 | 最慢 | 慢 | 是 |
内存优化
<!-- Map任务内存配置 -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value> <!-- Map任务总内存 -->
</property>
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx1638m</value> <!-- JVM堆内存 -->
</property>
<!-- Reduce任务内存配置 -->
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value> <!-- Reduce任务总内存 -->
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx3276m</value> <!-- JVM堆内存 -->
</property>
推测执行
当任务执行缓慢时,MapReduce 可以启动备份任务:
<!-- 启用推测执行 -->
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
</property>
注意:对于有副作用的任务(如写入数据库),应禁用推测执行。
YARN 资源调度
YARN(Yet Another Resource Negotiator)是Hadoop 2.x引入的资源管理系统,负责集群资源的统一管理和调度。
YARN 架构
YARN采用主从架构,主要组件包括:
ResourceManager
ResourceManager是YARN的主节点,负责整个集群的资源管理:
- 资源调度:根据调度策略分配资源
- 应用管理:接收和管理应用程序提交
- 状态监控:监控集群资源使用情况
核心组件:
- Scheduler(调度器):负责资源分配,根据策略将资源分配给应用
- ApplicationsManager(应用管理器):负责接收作业提交,启动 ApplicationMaster
NodeManager
NodeManager是每个节点上的代理,负责单个节点的资源管理:
- 资源汇报:向ResourceManager汇报节点资源状态
- 容器管理:启动和监控容器
- 资源隔离:保证容器间的资源隔离
核心功能:
- 心跳机制:定期向 ResourceManager 发送心跳
- 容器管理:启动、监控、杀死容器
- 资源监控:监控 CPU、内存使用情况
- 日志聚合:收集和存储容器日志
ApplicationMaster
ApplicationMaster是每个应用程序的管理者:
- 任务切分:将应用切分成多个任务
- 资源申请:向ResourceManager申请资源
- 任务调度:在分配的容器上调度任务
- 容错处理:处理任务失败和重试
生命周期:
1. 客户端提交应用 → ResourceManager
2. ResourceManager 分配容器 → 启动 ApplicationMaster
3. ApplicationMaster 向 ResourceManager 注册
4. ApplicationMaster 申请资源 → ResourceManager
5. ResourceManager 返回可用容器列表
6. ApplicationMaster 与 NodeManager 通信 → 启动任务
7. 任务完成 → ApplicationMaster 注销
Container
Container是YARN中资源抽象,封装了:
- CPU:虚拟CPU核心数
- 内存:可用内存大小
- 其他资源:磁盘、网络等(可扩展)
资源隔离:
- 内存隔离:通过 Linux cgroups 或进程监控实现
- CPU 隔离:通过 cgroups 实现
YARN 工作流程
应用程序在YARN上的执行流程:
1. 客户端向ResourceManager提交应用
2. ResourceManager分配第一个Container
3. 在Container中启动ApplicationMaster
4. ApplicationMaster向ResourceManager注册并申请资源
5. ResourceManager返回可用的Container列表
6. ApplicationMaster与NodeManager通信启动任务
7. 任务执行完成,ApplicationMaster注销
详细流程图:
┌──────────┐ 1.提交应用 ┌────────────────┐
│ Client │ ──────────────────→ │ ResourceManager │
└──────────┘ │ (Scheduler + │
│ ASM) │
└───────┬────────┘
│
┌─────────────────────────────┼─────────────────────────────┐
│ 2.分配容器 │ │
↓ │ │
┌──────────────────┐ │ │
│ ApplicationMaster│ │ │
│ (Container 0) │ │ │
└────────┬─────────┘ │ │
│ │ │
│ 3.注册 │ 4.申请资源 │
└────────────────────────────→ │ ←───────────────────────────┘
│ │
│ 5.返回可用容器列表 │
│ ←─────────────────────────────┘
│
│ 6.启动任务
↓
┌──────────────────┐ ┌──────────────────┐
│ NodeManager │ │ NodeManager │
│ (Container 1) │ │ (Container 2) │
└──────────────────┘ └──────────────────┘
YARN 调度器
YARN支持多种调度策略:
FIFO调度器
- 先进先出,简单但效率低
- 不适合多用户共享集群
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler</value>
</property>
容量调度器(Capacity Scheduler)
容量调度器是默认调度器,特点:
- 将集群资源划分为多个队列
- 每个队列有保证的资源配额
- 支持多租户,适合企业环境
- 支持层级队列结构
配置示例:
<!-- yarn-site.xml -->
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
<!-- capacity-scheduler.xml -->
<configuration>
<!-- 根队列配置 -->
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,production,development</value>
</property>
<!-- default 队列配置 -->
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>40</value> <!-- 占总资源40% -->
</property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>60</value> <!-- 最大可扩展到60% -->
</property>
<property>
<name>yarn.scheduler.capacity.root.default.state</name>
<value>RUNNING</value>
</property>
<!-- production 队列配置 -->
<property>
<name>yarn.scheduler.capacity.root.production.capacity</name>
<value>40</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.production.maximum-capacity</name>
<value>80</value>
</property>
<!-- development 队列配置 -->
<property>
<name>yarn.scheduler.capacity.root.development.capacity</name>
<value>20</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.development.maximum-capacity</name>
<value>40</value>
</property>
</configuration>
关键配置说明:
| 配置项 | 说明 |
|---|---|
| capacity | 队列保证的资源百分比 |
| maximum-capacity | 队列最大可用的资源百分比 |
| state | 队列状态:RUNNING 或 STOPPED |
| user-limit-factor | 单用户最多可使用的资源倍数 |
| minimum-user-limit-percent | 单用户最少可使用的资源百分比 |
公平调度器(Fair Scheduler)
公平调度器的特点:
- 所有应用公平获得资源
- 支持资源抢占
- 适合多用户交互式应用
配置示例:
<!-- yarn-site.xml -->
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>
<property>
<name>yarn.scheduler.fair.allocation.file</name>
<value>/etc/hadoop/conf/fair-scheduler.xml</value>
</property>
<!-- fair-scheduler.xml -->
<?xml version="1.0"?>
<allocations>
<!-- 默认队列配置 -->
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<!-- 队列定义 -->
<queue name="production">
<minResources>8192 mb,4 vcores</minResources>
<maxResources>32768 mb,16 vcores</maxResources>
<maxRunningApps>10</maxRunningApps>
<weight>2.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
</queue>
<queue name="development">
<minResources>4096 mb,2 vcores</minResources>
<maxResources>16384 mb,8 vcores</maxResources>
<maxRunningApps>20</maxRunningApps>
<weight>1.0</weight>
</queue>
<!-- 用户限制 -->
<user name="admin">
<maxRunningApps>50</maxRunningApps>
</user>
<userMaxAppsDefault>10</userMaxAppsDefault>
</allocations>
调度器对比
| 特性 | FIFO | Capacity | Fair |
|---|---|---|---|
| 资源保证 | 无 | 队列级别 | 队列级别 |
| 多租户 | 不支持 | 支持 | 支持 |
| 资源抢占 | 不支持 | 支持 | 支持 |
| 适用场景 | 单用户 | 企业环境 | 交互式应用 |
YARN 资源模型
基本资源类型
YARN 默认支持两种资源类型:
| 资源类型 | 单位 | 说明 |
|---|---|---|
| memory | MB | 内存资源 |
| vcores | 个 | 虚拟CPU核心 |
资源配置
<!-- NodeManager 可用资源 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>8192</value> <!-- 节点总可用内存 -->
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>8</value> <!-- 节点总可用CPU核心数 -->
</property>
<!-- 容器资源限制 -->
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>1024</value> <!-- 容器最小内存 -->
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>8192</value> <!-- 容器最大内存 -->
</property>
<property>
<name>yarn.scheduler.minimum-allocation-vcores</name>
<value>1</value> <!-- 容器最小CPU -->
</property>
<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>4</value> <!-- 容器最大CPU -->
</property>
扩展资源类型(Hadoop 3.x)
Hadoop 3.x 支持自定义资源类型,如 GPU、FPGA 等:
<!-- 定义扩展资源 -->
<property>
<name>yarn.resource-types</name>
<value>resource1,resource2</value>
</property>
<!-- 节点可用扩展资源 -->
<property>
<name>yarn.nodemanager.resource-type.resource1</name>
<value>4</value>
</property>
YARN 高可用
YARN高可用通过ResourceManager HA实现:
<!-- 启用RM HA -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!-- 集群ID -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster1</value>
</property>
<!-- RM列表 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<!-- RM1地址 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>rm1.example.com</value>
</property>
<property>
<name>yarn.resourcemanager.address.rm1</name>
<value>rm1.example.com:8032</value>
</property>
<!-- RM2地址 -->
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>rm2.example.com</value>
</property>
<property>
<name>yarn.resourcemanager.address.rm2</name>
<value>rm2.example.com:8032</value>
</property>
<!-- ZooKeeper配置 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>zk1:2181,zk2:2181,zk3:2181</value>
</property>
<!-- 自动故障转移 -->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
机会容器(Opportunistic Containers,Hadoop 3.x)
Hadoop 3.x 引入机会容器,提高集群资源利用率:
两种容器类型:
| 类型 | 说明 | 调度优先级 |
|---|---|---|
| GUARANTEED | 保证容器,资源有保障 | 高 |
| OPPORTUNISTIC | 机会容器,资源无保障 | 低,可被抢占 |
工作原理:
- 集群资源不足时,机会容器可以在 NodeManager 端排队等待
- 当 GUARANTEED 容器需要资源时,机会容器会被抢占
- 机会容器失败后可以重新调度
配置启用:
<property>
<name>yarn.nodemanager.opportunistic-containers-enabled</name>
<value>true</value>
</property>
Hadoop 集群部署
部署模式
Hadoop支持三种部署模式:
| 模式 | 说明 | 适用场景 |
|---|---|---|
| 单机模式 | 单进程运行,无分布式特性 | 学习、调试 |
| 伪分布式 | 单机模拟分布式环境 | 开发、测试 |
| 完全分布式 | 多机集群部署 | 生产环境 |
核心配置文件
Hadoop的主要配置文件位于$HADOOP_HOME/etc/hadoop/目录:
core-site.xml
<configuration>
<!-- 指定HDFS的NameNode地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://namenode:9000</value>
</property>
<!-- 指定Hadoop临时目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/data/hadoop/tmp</value>
</property>
<!-- 回收站配置 -->
<property>
<name>fs.trash.interval</name>
<value>1440</value> <!-- 回收站保留时间(分钟) -->
</property>
</configuration>
hdfs-site.xml
<configuration>
<!-- 副本数量 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- 块大小 -->
<property>
<name>dfs.blocksize</name>
<value>134217728</value> <!-- 128MB -->
</property>
<!-- NameNode数据目录 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>/data/hadoop/namenode</value>
</property>
<!-- DataNode数据目录 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/hadoop/datanode</value>
</property>
<!-- NameNode HTTP UI 端口 -->
<property>
<name>dfs.namenode.http-address</name>
<value>namenode:9870</value>
</property>
<!-- Secondary NameNode 地址 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>secondarynamenode:9868</value>
</property>
<!-- 数据传输保护 -->
<property>
<name>dfs.datanode.use.dn.hostname</name>
<value>true</value>
</property>
</configuration>
yarn-site.xml
<configuration>
<!-- ResourceManager地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>resourcemanager</value>
</property>
<!-- NodeManager运行的辅助服务 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- NodeManager可用内存 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>8192</value>
</property>
<!-- NodeManager可用CPU -->
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>8</value>
</property>
<!-- 日志聚合 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value> <!-- 7天 -->
</property>
<!-- 虚拟内存检查(生产环境建议关闭) -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
</configuration>
mapred-site.xml
<configuration>
<!-- 指定MapReduce运行在YARN上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- Map任务内存 -->
<property>
<name>mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
<!-- Reduce任务内存 -->
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>4096</value>
</property>
<!-- 历史服务器地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>jobhistoryserver:10020</value>
</property>
</configuration>
集群启动流程
# 1. 格式化NameNode(仅首次)
hdfs namenode -format
# 2. 启动HDFS
start-dfs.sh
# 3. 启动YARN
start-yarn.sh
# 4. 查看进程
jps
# 5. 访问Web界面
# HDFS: http://namenode:9870
# YARN: http://resourcemanager:8088
常用运维命令
# HDFS 管理
hdfs dfsadmin -report # 查看集群状态
hdfs dfsadmin -safemode get # 查看安全模式
hdfs dfsadmin -safemode leave # 退出安全模式
hdfs dfsadmin -refreshNodes # 刷新节点配置
# YARN 管理
yarn rmadmin -refreshQueues # 刷新队列配置
yarn rmadmin -getServiceState rm1 # 查看RM状态(HA环境)
yarn application -list # 列出运行中的应用
yarn application -kill app_id # 杀死应用
yarn logs -applicationId app_id # 查看应用日志
# 平衡数据
hdfs balancer -threshold 5 # 启动数据平衡
# 磁盘平衡(Hadoop 3.x)
hdfs diskbalancer -plan dn1 # 生成计划
hdfs diskbalancer -execute dn1.plan.json # 执行计划
Hadoop 3.x 新特性总结
1. 纠删码
- 存储开销从 3x 降低到 1.4x-1.5x
- 适合冷数据存储
- 需要更多 CPU 和网络资源
2. 多 NameNode 支持
- 支持多个 Standby NameNode
- 提高容错能力
- 配合 5 个 JournalNode 可容忍 2 个节点故障
3. 机会容器
- 提高集群资源利用率
- 支持分布式调度
- 机会容器可被抢占
4. 端口变更
Hadoop 3.x 修改了多个服务的默认端口:
| 服务 | Hadoop 2.x 端口 | Hadoop 3.x 端口 |
|---|---|---|
| NameNode HTTP | 50070 | 9870 |
| NameNode HTTPS | 50470 | 9871 |
| Secondary NN HTTP | 50090 | 9868 |
| Secondary NN HTTPS | 50091 | 9869 |
| DataNode HTTP | 50075 | 9864 |
| DataNode HTTPS | 50475 | 9865 |
5. 其他改进
- Java 8+ 要求:最低 Java 版本提升到 8
- Shell 脚本重写:修复了大量历史 bug
- HDFS 路由联邦:支持大规模集群
- YARN 资源类型扩展:支持 GPU 等自定义资源
- 默认支持 S3A 和 Azure 存储
小结
本章介绍了Hadoop的核心组件和基本使用:
-
HDFS:分布式文件系统,提供高可靠的文件存储
- 主从架构:NameNode + DataNode
- 数据块和副本机制
- 机架感知和副本放置策略
- 高可用配置
- 纠删码(Hadoop 3.x)
-
MapReduce:分布式计算框架,适合离线批处理
- Map/Reduce 两阶段模型
- Shuffle 机制详解
- Combiner 优化
- 性能调优
-
YARN:资源调度系统,管理集群资源分配
- 三种调度器:FIFO、Capacity、Fair
- 资源模型和配置
- 高可用配置
- 机会容器(Hadoop 3.x)
Hadoop是大数据技术的基础,理解其原理对于学习后续的Spark、Flink等框架非常重要。在实际应用中,Hadoop通常与Hive、HBase等组件配合使用,构建完整的大数据处理平台。