大数据技术速查表
本文档提供大数据核心技术的快速参考,包括常用命令、配置参数和代码片段。
HDFS 常用命令
文件操作
# 查看目录
hdfs dfs -ls /path
# 创建目录
hdfs dfs -mkdir -p /path/to/dir
# 上传文件
hdfs dfs -put localfile /hdfs/path/
hdfs dfs -copyFromLocal localfile /hdfs/path/
# 下载文件
hdfs dfs -get /hdfs/path/file localpath/
hdfs dfs -copyToLocal /hdfs/path/file localpath/
# 查看文件内容
hdfs dfs -cat /path/file
hdfs dfs -tail /path/file
hdfs dfs -head /path/file
# 删除文件/目录
hdfs dfs -rm /path/file
hdfs dfs -rm -r /path/dir
# 移动/重命名
hdfs dfs -mv /old/path /new/path
# 复制
hdfs dfs -cp /src/path /dst/path
# 查看文件大小
hdfs dfs -du -h /path
hdfs dfs -du -s -h /path
# 统计目录下文件数量
hdfs dfs -count /path
# 查看文件块信息
hdfs fsck /path/file -files -blocks -locations
权限管理
# 修改权限
hdfs dfs -chmod 755 /path
hdfs dfs -chmod -R 755 /path
# 修改所有者
hdfs dfs -chown user:group /path
hdfs dfs -chown -R user:group /path
# 修改副本数
hdfs dfs -setrep 3 /path/file
hdfs dfs -setrep -R 3 /path/dir
管理命令
# 查看集群状态
hdfs dfsadmin -report
# 安全模式
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter
hdfs dfsadmin -safemode leave
# 刷新节点
hdfs dfsadmin -refreshNodes
# 查看NameNode状态
hdfs haadmin -getServiceState nn1
# 格式化NameNode
hdfs namenode -format
HBase Shell 命令
表操作
# 创建表
create 'table', 'cf1', 'cf2'
create 'table', {NAME => 'cf', VERSIONS => 3, TTL => 86400}
# 查看表
list
describe 'table'
# 禁用/启用表
disable 'table'
enable 'table'
is_disabled 'table'
is_enabled 'table'
# 删除表
disable 'table'
drop 'table'
# 修改表
disable 'table'
alter 'table', {NAME => 'cf', VERSIONS => 5}
enable 'table'
# 清空表
truncate 'table'
数据操作
# 写入数据
put 'table', 'rowkey', 'cf:col', 'value'
put 'table', 'rowkey', 'cf:col', 'value', timestamp
# 读取数据
get 'table', 'rowkey'
get 'table', 'rowkey', 'cf'
get 'table', 'rowkey', 'cf:col'
get 'table', 'rowkey', {COLUMN => 'cf:col', VERSIONS => 3}
# 扫描
scan 'table'
scan 'table', {STARTROW => 'start', STOPROW => 'end'}
scan 'table', {COLUMNS => ['cf1', 'cf2']}
scan 'table', {LIMIT => 10}
scan 'table', {REVERSED => true}
# 删除数据
delete 'table', 'rowkey', 'cf:col'
deleteall 'table', 'rowkey'
# 统计
count 'table'
过滤器
# 值过滤
scan 'table', {FILTER => "ValueFilter(=, 'binary:value')"}
# 行键过滤
scan 'table', {FILTER => "RowFilter(=, 'substring:key')"}
# 列名过滤
scan 'table', {FILTER => "QualifierFilter(=, 'substring:col')"}
# 单列值过滤
scan 'table', {FILTER => "SingleColumnValueFilter('cf', 'col', =, 'binary:value')"}
# 组合过滤
scan 'table', {FILTER => "Filter1 AND Filter2 OR Filter3"}
Hive 常用语法
DDL
-- 创建数据库
CREATE DATABASE IF NOT EXISTS db_name
LOCATION '/path/to/db';
-- 创建内部表
CREATE TABLE table_name (
col1 INT,
col2 STRING,
col3 DECIMAL(10,2)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
-- 创建外部表
CREATE EXTERNAL TABLE table_name (...)
LOCATION '/path/to/data';
-- 创建分区表
CREATE TABLE table_name (...)
PARTITIONED BY (year INT, month INT);
-- 创建分桶表
CREATE TABLE table_name (...)
CLUSTERED BY (col) INTO 4 BUCKETS;
-- 修改表
ALTER TABLE table_name ADD COLUMNS (new_col STRING);
ALTER TABLE table_name DROP COLUMN col_name;
ALTER TABLE table_name RENAME TO new_name;
-- 添加分区
ALTER TABLE table_name ADD PARTITION (year=2024, month=1);
DML
-- 导入数据
LOAD DATA LOCAL INPATH '/path/file' INTO TABLE table_name;
LOAD DATA INPATH '/hdfs/path/file' INTO TABLE table_name;
-- 插入数据
INSERT INTO TABLE table_name VALUES (1, 'a', 100);
INSERT OVERWRITE TABLE table_name SELECT * FROM source;
-- 动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE target PARTITION(year, month)
SELECT *, year, month FROM source;
-- 导出数据
INSERT OVERWRITE LOCAL DIRECTORY '/path/output'
SELECT * FROM table_name;
查询
-- 基本查询
SELECT col1, col2 FROM table_name WHERE condition;
-- 聚合
SELECT col, COUNT(*) FROM table_name GROUP BY col;
-- JOIN
SELECT a.*, b.* FROM table_a a JOIN table_b b ON a.id = b.id;
-- 窗口函数
SELECT
col,
ROW_NUMBER() OVER (PARTITION BY group_col ORDER BY order_col),
SUM(col) OVER (PARTITION BY group_col)
FROM table_name;
-- 排序
SELECT * FROM table_name ORDER BY col; -- 全局排序
SELECT * FROM table_name SORT BY col; -- 局部排序
SELECT * FROM table_name DISTRIBUTE BY col1 SORT BY col2;
Spark 常用操作
RDD 操作
// 创建RDD
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd = sc.textFile("hdfs://path/file")
// 转换操作
rdd.map(x => x * 2)
rdd.filter(x => x > 0)
rdd.flatMap(x => x.split(" "))
rdd.distinct()
rdd.union(otherRdd)
rdd.intersection(otherRdd)
// Key-Value操作
pairRdd.reduceByKey(_ + _)
pairRdd.groupByKey()
pairRdd.mapValues(_ * 2)
pairRdd.sortByKey()
pairRdd.join(otherRdd)
// 行动操作
rdd.collect()
rdd.count()
rdd.reduce(_ + _)
rdd.first()
rdd.take(10)
rdd.foreach(println)
// 持久化
rdd.cache()
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.unpersist()
DataFrame 操作
// 创建DataFrame
val df = spark.read.csv("path")
val df = spark.read.json("path")
val df = spark.read.parquet("path")
val df = Seq((1, "a"), (2, "b")).toDF("id", "name")
// 基本操作
df.select("col1", "col2")
df.filter($"col" > 0)
df.groupBy("col").count()
df.orderBy($"col".desc)
// SQL查询
df.createOrReplaceTempView("table")
spark.sql("SELECT * FROM table")
// 写入
df.write.csv("path")
df.write.parquet("path")
df.write.mode("overwrite").saveAsTable("table")
Spark Streaming
// 创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
// DStream操作
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val counts = words.map((_, 1)).reduceByKey(_ + _)
counts.print()
// 启动
ssc.start()
ssc.awaitTermination()
Flink 常用操作
DataStream API
// 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 数据源
DataStream<String> stream = env.socketTextStream("localhost", 9999);
DataStream<Event> stream = env.addSource(new FlinkKafkaConsumer<>(...));
// 转换操作
stream.map(x -> x.toUpperCase())
stream.filter(x -> x.length() > 0)
stream.flatMap((x, out) -> { ... })
stream.keyBy(e -> e.getKey())
// 窗口操作
stream.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("count")
// 输出
stream.print()
stream.addSink(new FlinkKafkaProducer<>(...))
// 执行
env.execute("JobName")
时间和窗口
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 分配时间戳和水位线
stream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.getTimestamp())
);
// 窗口类型
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // 滚动
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动
.window(EventTimeSessionWindows.withGap(Time.seconds(10))) // 会话
状态管理
// ValueState
private ValueState<Long> countState;
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));
countState.update(value);
Long value = countState.value();
// ListState
private ListState<String> listState;
listState.add(element);
Iterable<String> elements = listState.get();
// MapState
private MapState<String, Integer> mapState;
mapState.put(key, value);
Integer value = mapState.get(key);
Kafka 常用命令
Topic 管理
# 创建Topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic topic_name \
--partitions 3 \
--replication-factor 2
# 查看Topic列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看Topic详情
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic topic_name
# 删除Topic
kafka-topics.sh --delete \
--bootstrap-server localhost:9092 \
--topic topic_name
# 修改Topic配置
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics --entity-name topic_name \
--add-config retention.ms=86400000
消息操作
# 生产消息
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name
# 消费消息
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--from-beginning
# 指定消费组
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--group group_name
# 查看消费组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 查看消费组详情
kafka-consumer-groups.sh --describe \
--bootstrap-server localhost:9092 \
--group group_name
# 重置消费位置
kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server localhost:9092 \
--group group_name \
--topic topic_name \
--to-earliest \
--execute
常用配置参数
Hadoop 核心配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| fs.defaultFS | file:/// | 默认文件系统 |
| dfs.replication | 3 | 副本数 |
| dfs.blocksize | 128MB | 块大小 |
| dfs.namenode.name.dir | - | NameNode数据目录 |
| dfs.datanode.data.dir | - | DataNode数据目录 |
YARN 配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| yarn.nodemanager.resource.memory-mb | 8192 | NodeManager可用内存 |
| yarn.nodemanager.resource.cpu-vcores | 8 | NodeManager可用CPU核数 |
| yarn.scheduler.minimum-allocation-mb | 1024 | 容器最小内存 |
| yarn.scheduler.maximum-allocation-mb | 8192 | 容器最大内存 |
Spark 配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| spark.executor.memory | 1g | Executor内存 |
| spark.executor.cores | 1 | Executor核心数 |
| spark.executor.instances | 2 | Executor实例数 |
| spark.driver.memory | 1g | Driver内存 |
| spark.sql.shuffle.partitions | 200 | Shuffle分区数 |
Flink 配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| parallelism.default | 1 | 默认并行度 |
| taskmanager.memory.process.size | 1728m | TaskManager内存 |
| taskmanager.numberOfTaskSlots | 1 | 每个TaskManager的槽位数 |
| state.backend | hashmap | 状态后端 |
| execution.checkpointing.interval | - | 检查点间隔 |
性能调优要点
HDFS 调优
- 增加DataNode数量,提高并行度
- 调整块大小,减少元数据占用
- 启用短路读取,减少网络开销
- 配置合适的副本数
Hive 调优
- 使用分区表和分桶表
- 选择合适的文件格式(ORC/Parquet)
- 启用压缩
- 使用向量化查询
- 调整Map和Reduce数量
Spark 调优
- 增加并行度
- 调整内存分配
- 使用广播变量
- 处理数据倾斜
- 优化Shuffle
Flink 调优
- 设置合理的并行度
- 配置状态后端
- 启用增量检查点
- 优化网络缓冲区
- 处理背压问题