跳到主要内容

大数据技术速查表

本文档提供大数据核心技术的快速参考,包括常用命令、配置参数和代码片段。

HDFS 常用命令

文件操作

# 查看目录
hdfs dfs -ls /path

# 创建目录
hdfs dfs -mkdir -p /path/to/dir

# 上传文件
hdfs dfs -put localfile /hdfs/path/
hdfs dfs -copyFromLocal localfile /hdfs/path/

# 下载文件
hdfs dfs -get /hdfs/path/file localpath/
hdfs dfs -copyToLocal /hdfs/path/file localpath/

# 查看文件内容
hdfs dfs -cat /path/file
hdfs dfs -tail /path/file
hdfs dfs -head /path/file

# 删除文件/目录
hdfs dfs -rm /path/file
hdfs dfs -rm -r /path/dir

# 移动/重命名
hdfs dfs -mv /old/path /new/path

# 复制
hdfs dfs -cp /src/path /dst/path

# 查看文件大小
hdfs dfs -du -h /path
hdfs dfs -du -s -h /path

# 统计目录下文件数量
hdfs dfs -count /path

# 查看文件块信息
hdfs fsck /path/file -files -blocks -locations

权限管理

# 修改权限
hdfs dfs -chmod 755 /path
hdfs dfs -chmod -R 755 /path

# 修改所有者
hdfs dfs -chown user:group /path
hdfs dfs -chown -R user:group /path

# 修改副本数
hdfs dfs -setrep 3 /path/file
hdfs dfs -setrep -R 3 /path/dir

管理命令

# 查看集群状态
hdfs dfsadmin -report

# 安全模式
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter
hdfs dfsadmin -safemode leave

# 刷新节点
hdfs dfsadmin -refreshNodes

# 查看NameNode状态
hdfs haadmin -getServiceState nn1

# 格式化NameNode
hdfs namenode -format

HBase Shell 命令

表操作

# 创建表
create 'table', 'cf1', 'cf2'
create 'table', {NAME => 'cf', VERSIONS => 3, TTL => 86400}

# 查看表
list
describe 'table'

# 禁用/启用表
disable 'table'
enable 'table'
is_disabled 'table'
is_enabled 'table'

# 删除表
disable 'table'
drop 'table'

# 修改表
disable 'table'
alter 'table', {NAME => 'cf', VERSIONS => 5}
enable 'table'

# 清空表
truncate 'table'

数据操作

# 写入数据
put 'table', 'rowkey', 'cf:col', 'value'
put 'table', 'rowkey', 'cf:col', 'value', timestamp

# 读取数据
get 'table', 'rowkey'
get 'table', 'rowkey', 'cf'
get 'table', 'rowkey', 'cf:col'
get 'table', 'rowkey', {COLUMN => 'cf:col', VERSIONS => 3}

# 扫描
scan 'table'
scan 'table', {STARTROW => 'start', STOPROW => 'end'}
scan 'table', {COLUMNS => ['cf1', 'cf2']}
scan 'table', {LIMIT => 10}
scan 'table', {REVERSED => true}

# 删除数据
delete 'table', 'rowkey', 'cf:col'
deleteall 'table', 'rowkey'

# 统计
count 'table'

过滤器

# 值过滤
scan 'table', {FILTER => "ValueFilter(=, 'binary:value')"}

# 行键过滤
scan 'table', {FILTER => "RowFilter(=, 'substring:key')"}

# 列名过滤
scan 'table', {FILTER => "QualifierFilter(=, 'substring:col')"}

# 单列值过滤
scan 'table', {FILTER => "SingleColumnValueFilter('cf', 'col', =, 'binary:value')"}

# 组合过滤
scan 'table', {FILTER => "Filter1 AND Filter2 OR Filter3"}

Hive 常用语法

DDL

-- 创建数据库
CREATE DATABASE IF NOT EXISTS db_name
LOCATION '/path/to/db';

-- 创建内部表
CREATE TABLE table_name (
col1 INT,
col2 STRING,
col3 DECIMAL(10,2)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

-- 创建外部表
CREATE EXTERNAL TABLE table_name (...)
LOCATION '/path/to/data';

-- 创建分区表
CREATE TABLE table_name (...)
PARTITIONED BY (year INT, month INT);

-- 创建分桶表
CREATE TABLE table_name (...)
CLUSTERED BY (col) INTO 4 BUCKETS;

-- 修改表
ALTER TABLE table_name ADD COLUMNS (new_col STRING);
ALTER TABLE table_name DROP COLUMN col_name;
ALTER TABLE table_name RENAME TO new_name;

-- 添加分区
ALTER TABLE table_name ADD PARTITION (year=2024, month=1);

DML

-- 导入数据
LOAD DATA LOCAL INPATH '/path/file' INTO TABLE table_name;
LOAD DATA INPATH '/hdfs/path/file' INTO TABLE table_name;

-- 插入数据
INSERT INTO TABLE table_name VALUES (1, 'a', 100);
INSERT OVERWRITE TABLE table_name SELECT * FROM source;

-- 动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE target PARTITION(year, month)
SELECT *, year, month FROM source;

-- 导出数据
INSERT OVERWRITE LOCAL DIRECTORY '/path/output'
SELECT * FROM table_name;

查询

-- 基本查询
SELECT col1, col2 FROM table_name WHERE condition;

-- 聚合
SELECT col, COUNT(*) FROM table_name GROUP BY col;

-- JOIN
SELECT a.*, b.* FROM table_a a JOIN table_b b ON a.id = b.id;

-- 窗口函数
SELECT
col,
ROW_NUMBER() OVER (PARTITION BY group_col ORDER BY order_col),
SUM(col) OVER (PARTITION BY group_col)
FROM table_name;

-- 排序
SELECT * FROM table_name ORDER BY col; -- 全局排序
SELECT * FROM table_name SORT BY col; -- 局部排序
SELECT * FROM table_name DISTRIBUTE BY col1 SORT BY col2;

Spark 常用操作

RDD 操作

// 创建RDD
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd = sc.textFile("hdfs://path/file")

// 转换操作
rdd.map(x => x * 2)
rdd.filter(x => x > 0)
rdd.flatMap(x => x.split(" "))
rdd.distinct()
rdd.union(otherRdd)
rdd.intersection(otherRdd)

// Key-Value操作
pairRdd.reduceByKey(_ + _)
pairRdd.groupByKey()
pairRdd.mapValues(_ * 2)
pairRdd.sortByKey()
pairRdd.join(otherRdd)

// 行动操作
rdd.collect()
rdd.count()
rdd.reduce(_ + _)
rdd.first()
rdd.take(10)
rdd.foreach(println)

// 持久化
rdd.cache()
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.unpersist()

DataFrame 操作

// 创建DataFrame
val df = spark.read.csv("path")
val df = spark.read.json("path")
val df = spark.read.parquet("path")
val df = Seq((1, "a"), (2, "b")).toDF("id", "name")

// 基本操作
df.select("col1", "col2")
df.filter($"col" > 0)
df.groupBy("col").count()
df.orderBy($"col".desc)

// SQL查询
df.createOrReplaceTempView("table")
spark.sql("SELECT * FROM table")

// 写入
df.write.csv("path")
df.write.parquet("path")
df.write.mode("overwrite").saveAsTable("table")

Spark Streaming

// 创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))

// DStream操作
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val counts = words.map((_, 1)).reduceByKey(_ + _)
counts.print()

// 启动
ssc.start()
ssc.awaitTermination()

DataStream API

// 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 数据源
DataStream<String> stream = env.socketTextStream("localhost", 9999);
DataStream<Event> stream = env.addSource(new FlinkKafkaConsumer<>(...));

// 转换操作
stream.map(x -> x.toUpperCase())
stream.filter(x -> x.length() > 0)
stream.flatMap((x, out) -> { ... })
stream.keyBy(e -> e.getKey())

// 窗口操作
stream.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("count")

// 输出
stream.print()
stream.addSink(new FlinkKafkaProducer<>(...))

// 执行
env.execute("JobName")

时间和窗口

// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 分配时间戳和水位线
stream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.getTimestamp())
);

// 窗口类型
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // 滚动
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动
.window(EventTimeSessionWindows.withGap(Time.seconds(10))) // 会话

状态管理

// ValueState
private ValueState<Long> countState;
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));
countState.update(value);
Long value = countState.value();

// ListState
private ListState<String> listState;
listState.add(element);
Iterable<String> elements = listState.get();

// MapState
private MapState<String, Integer> mapState;
mapState.put(key, value);
Integer value = mapState.get(key);

Kafka 常用命令

Topic 管理

# 创建Topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic topic_name \
--partitions 3 \
--replication-factor 2

# 查看Topic列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看Topic详情
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic topic_name

# 删除Topic
kafka-topics.sh --delete \
--bootstrap-server localhost:9092 \
--topic topic_name

# 修改Topic配置
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics --entity-name topic_name \
--add-config retention.ms=86400000

消息操作

# 生产消息
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name

# 消费消息
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--from-beginning

# 指定消费组
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--group group_name

# 查看消费组
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 查看消费组详情
kafka-consumer-groups.sh --describe \
--bootstrap-server localhost:9092 \
--group group_name

# 重置消费位置
kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server localhost:9092 \
--group group_name \
--topic topic_name \
--to-earliest \
--execute

常用配置参数

Hadoop 核心配置

参数默认值说明
fs.defaultFSfile:///默认文件系统
dfs.replication3副本数
dfs.blocksize128MB块大小
dfs.namenode.name.dir-NameNode数据目录
dfs.datanode.data.dir-DataNode数据目录

YARN 配置

参数默认值说明
yarn.nodemanager.resource.memory-mb8192NodeManager可用内存
yarn.nodemanager.resource.cpu-vcores8NodeManager可用CPU核数
yarn.scheduler.minimum-allocation-mb1024容器最小内存
yarn.scheduler.maximum-allocation-mb8192容器最大内存

Spark 配置

参数默认值说明
spark.executor.memory1gExecutor内存
spark.executor.cores1Executor核心数
spark.executor.instances2Executor实例数
spark.driver.memory1gDriver内存
spark.sql.shuffle.partitions200Shuffle分区数
参数默认值说明
parallelism.default1默认并行度
taskmanager.memory.process.size1728mTaskManager内存
taskmanager.numberOfTaskSlots1每个TaskManager的槽位数
state.backendhashmap状态后端
execution.checkpointing.interval-检查点间隔

性能调优要点

HDFS 调优

  • 增加DataNode数量,提高并行度
  • 调整块大小,减少元数据占用
  • 启用短路读取,减少网络开销
  • 配置合适的副本数

Hive 调优

  • 使用分区表和分桶表
  • 选择合适的文件格式(ORC/Parquet)
  • 启用压缩
  • 使用向量化查询
  • 调整Map和Reduce数量

Spark 调优

  • 增加并行度
  • 调整内存分配
  • 使用广播变量
  • 处理数据倾斜
  • 优化Shuffle
  • 设置合理的并行度
  • 配置状态后端
  • 启用增量检查点
  • 优化网络缓冲区
  • 处理背压问题