大数据技术速查表
本文档提供大数据核心技术的快速参考,包括常用命令、配置参数和代码片段。
HDFS 常用命令
文件操作
# 查看目录
hdfs dfs -ls /path
hdfs dfs -ls -R /path # 递归查看
# 创建目录
hdfs dfs -mkdir -p /path/to/dir
# 上传文件
hdfs dfs -put localfile /hdfs/path/
hdfs dfs -copyFromLocal localfile /hdfs/path/
hdfs dfs -moveFromLocal localfile /hdfs/path/ # 上传后删除本地文件
# 下载文件
hdfs dfs -get /hdfs/path/file localpath/
hdfs dfs -copyToLocal /hdfs/path/file localpath/
# 查看文件内容
hdfs dfs -cat /path/file
hdfs dfs -tail /path/file # 查看文件末尾
hdfs dfs -head /path/file # 查看文件开头
# 删除文件/目录
hdfs dfs -rm /path/file
hdfs dfs -rm -r /path/dir # 递归删除
hdfs dfs -rm -skipTrash /path/file # 不放入回收站
# 移动/重命名
hdfs dfs -mv /old/path /new/path
# 复制
hdfs dfs -cp /src/path /dst/path
# 查看文件大小
hdfs dfs -du -h /path # 人类可读格式
hdfs dfs -du -s -h /path # 汇总大小
# 统计目录下文件数量
hdfs dfs -count /path
hdfs dfs -count -q /path # 显示配额信息
# 查看文件块信息
hdfs fsck /path/file -files -blocks -locations
# 设置副本数
hdfs dfs -setrep 3 /path/file
hdfs dfs -setrep -R 3 /path/dir # 递归设置
# 合并下载
hdfs dfs -getmerge /path/dir/* localfile.txt
权限管理
# 修改权限
hdfs dfs -chmod 755 /path
hdfs dfs -chmod -R 755 /path # 递归修改
# 修改所有者
hdfs dfs -chown user:group /path
hdfs dfs -chown -R user:group /path
# 修改组
hdfs dfs -chgrp group /path
管理命令
# 查看集群状态
hdfs dfsadmin -report
# 安全模式
hdfs dfsadmin -safemode get # 查看状态
hdfs dfsadmin -safemode enter # 进入安全模式
hdfs dfsadmin -safemode leave # 退出安全模式
hdfs dfsadmin -safemode wait # 等待退出安全模式
# 刷新节点
hdfs dfsadmin -refreshNodes
# 查看NameNode状态(HA环境)
hdfs haadmin -getServiceState nn1
hdfs haadmin -getAllServiceState
# 保存命名空间
hdfs dfsadmin -saveNamespace
# 滚动编辑日志
hdfs dfsadmin -rollEdits
# 格式化NameNode(仅首次)
hdfs namenode -format
配额管理
# 设置目录配额(文件数)
hdfs dfsadmin -setQuota 10000 /path
# 设置空间配额(字节)
hdfs dfsadmin -setSpaceQuota 1t /path
# 清除配额
hdfs dfsadmin -clrQuota /path
hdfs dfsadmin -clrSpaceQuota /path
HBase Shell 命令
表操作
# 创建表
create 'table', 'cf1', 'cf2'
create 'table', {NAME => 'cf', VERSIONS => 3, TTL => 86400}
create 'table', {NAME => 'cf1'}, {NAME => 'cf2', COMPRESSION => 'SNAPPY'}
# 查看表
list
describe 'table'
exists 'table'
# 禁用/启用表
disable 'table'
enable 'table'
is_disabled 'table'
is_enabled 'table'
# 删除表
disable 'table'
drop 'table'
# 修改表
disable 'table'
alter 'table', {NAME => 'cf', VERSIONS => 5}
alter 'table', 'cf3' # 添加列族
alter 'table', {NAME => 'cf1', METHOD => 'delete'} # 删除列族
enable 'table'
# 清空表
truncate 'table'
truncate_preserve 'table' # 保留分区
数据操作
# 写入数据
put 'table', 'rowkey', 'cf:col', 'value'
put 'table', 'rowkey', 'cf:col', 'value', timestamp
# 读取数据
get 'table', 'rowkey'
get 'table', 'rowkey', 'cf'
get 'table', 'rowkey', 'cf:col'
get 'table', 'rowkey', {COLUMN => 'cf:col', VERSIONS => 3}
get 'table', 'rowkey', {TIMERANGE => [ts1, ts2]}
# 扫描
scan 'table'
scan 'table', {STARTROW => 'start', STOPROW => 'end'}
scan 'table', {COLUMNS => ['cf1', 'cf2']}
scan 'table', {LIMIT => 10}
scan 'table', {REVERSED => true} # 反向扫描
# 删除数据
delete 'table', 'rowkey', 'cf:col'
delete 'table', 'rowkey', 'cf:col', timestamp
deleteall 'table', 'rowkey'
# 统计
count 'table'
count 'table', {INTERVAL => 1000} # 每1000行显示一次
# 追加数据
append 'table', 'rowkey', 'cf:col', 'append_value'
# 原子递增
incr 'table', 'rowkey', 'cf:counter', 1
过滤器
# 值过滤
scan 'table', {FILTER => "ValueFilter(=, 'binary:value')"}
scan 'table', {FILTER => "ValueFilter(=, 'substring:hello')"}
# 行键过滤
scan 'table', {FILTER => "RowFilter(=, 'binary:rowkey')"}
scan 'table', {FILTER => "RowFilter(=, 'substring:key')"}
# 列名过滤
scan 'table', {FILTER => "QualifierFilter(=, 'substring:col')"}
# 单列值过滤
scan 'table', {FILTER => "SingleColumnValueFilter('cf', 'col', =, 'binary:value')"}
# 分页过滤器
scan 'table', {FILTER => "PageFilter(10)"}
# 组合过滤
scan 'table', {FILTER => "Filter1 AND Filter2 OR Filter3"}
快照操作
# 创建快照
snapshot 'table', 'snapshot_name'
# 列出快照
list_snapshots
# 从快照恢复
restore_snapshot 'snapshot_name'
# 克隆快照
clone_snapshot 'snapshot_name', 'new_table'
# 删除快照
delete_snapshot 'snapshot_name'
Hive 常用语法
DDL
-- 创建数据库
CREATE DATABASE IF NOT EXISTS db_name
COMMENT 'Database description'
LOCATION '/path/to/db'
WITH DBPROPERTIES ('creator' = 'admin');
-- 查看数据库
SHOW DATABASES;
SHOW DATABASES LIKE 'db_*';
DESCRIBE DATABASE db_name;
DESCRIBE DATABASE EXTENDED db_name;
-- 创建内部表
CREATE TABLE table_name (
col1 INT COMMENT 'Column 1',
col2 STRING COMMENT 'Column 2',
col3 DECIMAL(10,2) COMMENT 'Column 3'
)
COMMENT 'Table description'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|'
MAP KEYS TERMINATED BY ':'
STORED AS TEXTFILE
TBLPROPERTIES ('skip.header.line.count'='1');
-- 创建外部表
CREATE EXTERNAL TABLE table_name (...)
LOCATION '/path/to/data';
-- 创建分区表
CREATE TABLE table_name (...)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;
-- 创建分桶表
CREATE TABLE table_name (...)
CLUSTERED BY (col) SORTED BY (col) INTO 4 BUCKETS
STORED AS ORC;
-- 创建表并指定属性
CREATE TABLE table_name (...)
STORED AS ORC
TBLPROPERTIES (
'orc.compress'='SNAPPY',
'orc.create.index'='true'
);
-- 修改表
ALTER TABLE table_name RENAME TO new_name;
ALTER TABLE table_name ADD COLUMNS (new_col STRING);
ALTER TABLE table_name CHANGE old_col new_col INT;
ALTER TABLE table_name REPLACE COLUMNS (col1 INT, col2 STRING);
-- 分区操作
ALTER TABLE table_name ADD PARTITION (year=2024, month=1);
ALTER TABLE table_name DROP PARTITION (year=2024, month=1);
ALTER TABLE table_name PARTITION (year=2024) SET LOCATION '/new/path';
MSCK REPAIR TABLE table_name; -- 恢复分区
SHOW PARTITIONS table_name;
-- 删除表
DROP TABLE IF EXISTS table_name;
TRUNCATE TABLE table_name;
DML
-- 导入数据
LOAD DATA LOCAL INPATH '/local/path/file' INTO TABLE table_name;
LOAD DATA INPATH '/hdfs/path/file' INTO TABLE table_name;
LOAD DATA LOCAL INPATH '/path/file' OVERWRITE INTO TABLE table_name;
-- 插入数据
INSERT INTO TABLE table_name VALUES (1, 'a', 100);
INSERT OVERWRITE TABLE table_name SELECT * FROM source;
-- 动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE target PARTITION(year, month)
SELECT *, year_col, month_col FROM source;
-- 多表插入
FROM source_table
INSERT INTO TABLE target1 SELECT col1, col2
INSERT INTO TABLE target2 SELECT col3, col4;
-- 导出数据
INSERT OVERWRITE LOCAL DIRECTORY '/local/path/output'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT * FROM table_name;
查询
-- 基本查询
SELECT col1, col2 FROM table_name WHERE condition;
-- 聚合
SELECT
col,
COUNT(*) as cnt,
SUM(amount) as total,
AVG(amount) as avg_amount,
MAX(amount) as max_val,
MIN(amount) as min_val
FROM table_name
GROUP BY col
HAVING COUNT(*) > 10;
-- JOIN
SELECT a.*, b.*
FROM table_a a
JOIN table_b b ON a.id = b.id;
SELECT a.*, b.*
FROM table_a a
LEFT OUTER JOIN table_b b ON a.id = b.id;
-- Map Join 提示
SELECT /*+ MAPJOIN(small_table) */
a.*, b.*
FROM big_table a
JOIN small_table b ON a.id = b.id;
-- 窗口函数
SELECT
col,
ROW_NUMBER() OVER (PARTITION BY group_col ORDER BY order_col) as rn,
RANK() OVER (PARTITION BY group_col ORDER BY order_col) as rk,
DENSE_RANK() OVER (PARTITION BY group_col ORDER BY order_col) as dr,
SUM(col) OVER (PARTITION BY group_col ORDER BY order_col
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_sum,
LAG(col, 1) OVER (ORDER BY order_col) as prev_val,
LEAD(col, 1) OVER (ORDER BY order_col) as next_val
FROM table_name;
-- 排序
SELECT * FROM table_name ORDER BY col; -- 全局排序
SELECT * FROM table_name SORT BY col; -- 局部排序
SELECT * FROM table_name DISTRIBUTE BY col1 SORT BY col2;
-- CTE(公共表表达式)
WITH cte1 AS (
SELECT * FROM table1 WHERE condition
),
cte2 AS (
SELECT * FROM table2 WHERE condition
)
SELECT a.*, b.* FROM cte1 a JOIN cte2 b ON a.id = b.id;
常用函数
-- 字符串函数
CONCAT(str1, str2, ...) -- 拼接
CONCAT_WS(separator, str1, str2, ...) -- 带分隔符拼接
SUBSTR(str, start, length) -- 截取
SPLIT(str, pattern) -- 分割
LENGTH(str) -- 长度
UPPER(str) / LOWER(str) -- 大小写
TRIM(str) / LTRIM(str) / RTRIM(str) -- 去空格
REGEXP_EXTRACT(str, pattern, idx) -- 正则提取
REGEXP_REPLACE(str, pattern, replacement)-- 正则替换
INSTR(str, substr) -- 查找子串位置
-- 日期函数
CURRENT_DATE() -- 当前日期
CURRENT_TIMESTAMP() -- 当前时间戳
DATE_ADD(date, days) -- 日期加
DATE_SUB(date, days) -- 日期减
DATEDIFF(date1, date2) -- 日期差
DATE_FORMAT(date, pattern) -- 格式化
YEAR(date) / MONTH(date) / DAY(date) -- 提取年月日
FROM_UNIXTIME(timestamp) -- Unix时间戳转日期
UNIX_TIMESTAMP(date) -- 日期转Unix时间戳
-- 数学函数
ROUND(num, d) -- 四舍五入
FLOOR(num) / CEIL(num) -- 向下/向上取整
ABS(num) -- 绝对值
POW(base, exp) -- 幂运算
SQRT(num) -- 平方根
RAND() -- 随机数
-- 条件函数
IF(condition, true_val, false_val) -- 条件
CASE WHEN cond1 THEN val1 WHEN cond2 THEN val2 ELSE val3 END
NVL(val, default) -- 空值替换
COALESCE(val1, val2, ...) -- 返回第一个非空值
-- 集合函数
SIZE(array|map) -- 元素个数
ARRAY_CONTAINS(array, val) -- 是否包含
MAP_KEYS(map) / MAP_VALUES(map) -- 获取键/值
-- 类型转换
CAST(expr AS type)
Spark 常用操作
RDD 操作
// 创建RDD
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 4) // 指定分区数
val rdd = sc.textFile("hdfs://path/file")
val rdd = sc.wholeTextFiles("hdfs://path/dir") // 读取目录下所有文件
// 基本转换
rdd.map(x => x * 2)
rdd.filter(x => x > 0)
rdd.flatMap(x => x.split(" "))
rdd.distinct()
rdd.union(otherRdd)
rdd.intersection(otherRdd)
rdd.subtract(otherRdd)
rdd.sample(withReplacement, fraction, seed)
// Key-Value操作
pairRdd.reduceByKey(_ + _)
pairRdd.groupByKey()
pairRdd.mapValues(_ * 2)
pairRdd.flatMapValues(_.split(","))
pairRdd.sortByKey(ascending)
pairRdd.join(otherRdd)
pairRdd.leftOuterJoin(otherRdd)
pairRdd.rightOuterJoin(otherRdd)
pairRdd.cogroup(otherRdd)
pairRdd.subtractByKey(otherRdd)
// 聚合操作
pairRdd.aggregateByKey(zeroValue)(seqOp, combOp)
pairRdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
pairRdd.foldByKey(zeroValue)(_ + _)
// 行动操作
rdd.collect()
rdd.collectAsMap() // PairRDD 转为 Map
rdd.count()
rdd.countByValue()
rdd.reduce(_ + _)
rdd.fold(0)(_ + _)
rdd.aggregate(zeroValue)(seqOp, combOp)
rdd.first()
rdd.take(n)
rdd.takeOrdered(n)
rdd.top(n)
rdd.countByKey()
rdd.foreach(println)
rdd.saveAsTextFile("path")
rdd.saveAsSequenceFile("path")
// 分区操作
rdd.getNumPartitions
rdd.repartition(n) // 增加或减少分区(触发Shuffle)
rdd.coalesce(n) // 减少分区(不触发Shuffle)
rdd.partitionBy(new HashPartitioner(n))
// 排序
rdd.sortBy(x => x)
rdd.sortBy(x => x, ascending = false)
// 持久化
rdd.cache() // 等同于 persist(MEMORY_ONLY)
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
rdd.unpersist()
DataFrame 操作
// 创建DataFrame
val df = spark.read.csv("path")
val df = spark.read.json("path")
val df = spark.read.parquet("path")
val df = spark.read.format("jdbc").options(...).load()
val df = Seq((1, "a"), (2, "b")).toDF("id", "name")
// 基本操作
df.select("col1", "col2")
df.selectExpr("col1", "col2 * 2 as col2_doubled")
df.filter($"col" > 0)
df.filter("col > 0")
df.where($"col" > 0)
df.distinct()
df.dropDuplicates("col1", "col2")
df.drop("col")
df.withColumn("new_col", $"col" * 2)
df.withColumnRenamed("old_name", "new_name")
// 排序和限制
df.orderBy($"col".desc)
df.sort($"col1".asc, $"col2".desc)
df.limit(10)
// 分组聚合
df.groupBy("col").count()
df.groupBy("col").agg(
count("*").as("cnt"),
sum("amount").as("total"),
avg("amount").as("avg_amount"),
max("amount").as("max_val"),
min("amount").as("min_val")
)
// 连接
df1.join(df2, "key")
df1.join(df2, Seq("key1", "key2"))
df1.join(df2, df1("id") === df2("id"), "left")
// 集合操作
df1.union(df2)
df1.unionByName(df2)
df1.intersect(df2)
df1.except(df2)
// SQL查询
df.createOrReplaceTempView("table")
spark.sql("SELECT * FROM table WHERE col > 0")
// 写入
df.write.csv("path")
df.write.parquet("path")
df.write.json("path")
df.write.mode("overwrite").parquet("path")
df.write.mode("append").saveAsTable("table")
df.write.format("jdbc").options(...).save()
Spark Streaming
// DStream
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val counts = words.map((_, 1)).reduceByKey(_ + _)
counts.print()
ssc.start()
ssc.awaitTermination()
// Structured Streaming
val spark = SparkSession.builder().getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val words = lines.as[String].flatMap(_.split(" "))
val counts = words.groupBy("value").count()
val query = counts.writeStream
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
Spark 常用配置
// Executor 配置
spark.executor.memory // Executor 内存
spark.executor.cores // Executor CPU 核数
spark.executor.instances // Executor 实例数
spark.executor.memoryOverhead // 堆外内存
// Driver 配置
spark.driver.memory // Driver 内存
spark.driver.maxResultSize // 最大结果大小
// 内存配置
spark.memory.fraction // Spark 内存占比,默认 0.6
spark.memory.storageFraction // 存储内存占比,默认 0.5
// 并行度配置
spark.default.parallelism // 默认并行度
spark.sql.shuffle.partitions // Shuffle 分区数
// 序列化配置
spark.serializer // 序列化器,推荐 Kryo
spark.kryoserializer.buffer.max // Kryo 缓冲区大小
// Shuffle 配置
spark.shuffle.compress // Shuffle 输出压缩
spark.shuffle.spill.compress // Shuffle 溢写压缩
// 动态资源分配
spark.dynamicAllocation.enabled // 启用动态资源分配
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors
spark.dynamicAllocation.initialExecutors
// AQE (Adaptive Query Execution, Spark 3.0+)
spark.sql.adaptive.enabled // 启用 AQE
spark.sql.adaptive.coalescePartitions.enabled // 合并小分区
spark.sql.adaptive.skewJoin.enabled // 处理数据倾斜
Flink 常用操作
DataStream API
// 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 数据源
DataStream<String> stream = env.socketTextStream("localhost", 9999);
DataStream<Event> kafka = env.addSource(
new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));
DataStream<Event> file = env.readFile(...);
// 转换操作
stream.map(x -> x.toUpperCase())
stream.filter(x -> x.length() > 0)
stream.flatMap((x, out) -> { ... })
stream.keyBy(e -> e.getKey())
// 窗口操作
stream.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("count")
stream.keyBy(e -> e.getKey())
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
stream.keyBy(e -> e.getKey())
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.process(...)
// 输出
stream.print()
stream.addSink(new FlinkKafkaProducer<>(...))
stream.writeAsText("/path/output")
// 执行
env.execute("JobName")
时间和窗口
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 分配时间戳和水位线
stream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.getTimestamp())
);
// 窗口类型
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // 滚动
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动
.window(EventTimeSessionWindows.withGap(Time.seconds(10))) // 会话
.window(GlobalWindows.create()) // 全局
// 窗口函数
.window(...).reduce((a, b) -> { ... })
.window(...).aggregate(new MyAggregateFunction())
.window(...).process(new MyProcessWindowFunction())
状态管理
// Keyed State
private ValueState<Long> countState;
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));
countState.update(value);
Long value = countState.value();
private ListState<String> listState;
listState.add(element);
private MapState<String, Integer> mapState;
mapState.put(key, value);
Integer value = mapState.get(key);
// 状态后端
env.setStateBackend(new HashMapStateBackend()); // 内存
env.setStateBackend(new EmbeddedRocksDBStateBackend()); // RocksDB
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");
Flink SQL
-- 创建表
CREATE TABLE orders (
order_id INT,
user_id INT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- 窗口查询
SELECT
user_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
SUM(amount) as total_amount
FROM orders
GROUP BY
user_id,
TUMBLE(order_time, INTERVAL '1' HOUR);
Flink 配置
# 核心配置
parallelism.default: 4
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 4
# 检查点配置
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: rocksdb
state.checkpoints.dir: hdfs:///checkpoints
# 高可用配置
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs:///flink/ha
Kafka 常用命令
Topic 管理
# 创建Topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic topic_name \
--partitions 3 \
--replication-factor 2 \
--config retention.ms=86400000
# 查看Topic列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看Topic详情
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic topic_name
# 修改Topic配置
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics --entity-name topic_name \
--add-config retention.ms=86400000
# 增加分区(只能增加,不能减少)
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic topic_name \
--partitions 6
# 删除Topic
kafka-topics.sh --delete \
--bootstrap-server localhost:9092 \
--topic topic_name
消息操作
# 生产消息
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--property parse.key=true \
--property key.separator=,
# 消费消息
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--from-beginning
# 指定消费组
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--group group_name
# 显示消息Key
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--property print.key=true \
--property key.separator=,
# 查看消费组列表
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
# 查看消费组详情
kafka-consumer-groups.sh --describe \
--bootstrap-server localhost:9092 \
--group group_name
# 重置消费位置
kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server localhost:9092 \
--group group_name \
--topic topic_name \
--to-earliest \
--execute
# 删除消费组
kafka-consumer-groups.sh --delete \
--bootstrap-server localhost:9092 \
--group group_name
性能测试
# 生产者性能测试
kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 1000000 \
--record-size 1000 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092
# 消费者性能测试
kafka-consumer-perf-test.sh \
--topic test-topic \
--messages 1000000 \
--bootstrap-server localhost:9092
常用配置参数
Hadoop 核心配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| fs.defaultFS | file:/// | 默认文件系统 |
| dfs.replication | 3 | 副本数 |
| dfs.blocksize | 128MB | 块大小 |
| dfs.namenode.name.dir | - | NameNode 数据目录 |
| dfs.datanode.data.dir | - | DataNode 数据目录 |
| dfs.namenode.handler.count | 10 | NameNode 处理线程数 |
| dfs.datanode.handler.count | 10 | DataNode 处理线程数 |
| mapreduce.framework.name | local | 计算框架 |
YARN 配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| yarn.nodemanager.resource.memory-mb | 8192 | NodeManager 可用内存 |
| yarn.nodemanager.resource.cpu-vcores | 8 | NodeManager 可用 CPU 核数 |
| yarn.scheduler.minimum-allocation-mb | 1024 | 容器最小内存 |
| yarn.scheduler.maximum-allocation-mb | 8192 | 容器最大内存 |
| yarn.nodemanager.vmem-check-enabled | true | 虚拟内存检查 |
| yarn.nodemanager.pmem-check-enabled | true | 物理内存检查 |
| yarn.resourcemanager.scheduler.class | CapacityScheduler | 调度器类 |
Spark 配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| spark.executor.memory | 1g | Executor 内存 |
| spark.executor.cores | 1 | Executor 核心数 |
| spark.executor.instances | 2 | Executor 实例数 |
| spark.driver.memory | 1g | Driver 内存 |
| spark.sql.shuffle.partitions | 200 | Shuffle 分区数 |
| spark.default.parallelism | 总核数 | 默认并行度 |
| spark.serializer | JavaSerializer | 序列化器 |
| spark.memory.fraction | 0.6 | Spark 内存占比 |
| spark.memory.storageFraction | 0.5 | 存储内存占比 |
Flink 配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| parallelism.default | 1 | 默认并行度 |
| taskmanager.memory.process.size | 1728m | TaskManager 内存 |
| taskmanager.numberOfTaskSlots | 1 | 每个 TaskManager 的槽位数 |
| state.backend | hashmap | 状态后端 |
| execution.checkpointing.interval | - | 检查点间隔 |
| execution.checkpointing.mode | exactly_once | 检查点模式 |
Kafka Broker 配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| broker.id | - | Broker 唯一标识 |
| listeners | PLAINTEXT://:9092 | 监听地址 |
| log.dirs | /tmp/kafka-logs | 日志存储目录 |
| num.partitions | 1 | 默认分区数 |
| default.replication.factor | 1 | 默认副本数 |
| log.retention.hours | 168 | 消息保留时间(小时) |
| log.segment.bytes | 1GB | 日志段大小 |
| log.cleanup.policy | delete | 清理策略 |
Kafka 生产者配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| acks | all | 确认机制 |
| retries | 2147483647 | 重试次数 |
| batch.size | 16384 | 批量发送大小 |
| linger.ms | 5 | 批量等待时间 |
| buffer.memory | 33554432 | 缓冲区大小 |
| compression.type | none | 压缩类型 |
| enable.idempotence | false | 幂等性开关 |
| max.in.flight.requests.per.connection | 5 | 最大未确认请求数 |
Kafka 消费者配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| group.id | - | 消费者组 ID |
| auto.offset.reset | latest | 无 Offset 时的行为 |
| enable.auto.commit | true | 自动提交 Offset |
| auto.commit.interval.ms | 5000 | 自动提交间隔 |
| max.poll.records | 500 | 单次拉取最大记录数 |
| max.poll.interval.ms | 300000 | 拉取最大间隔 |
| session.timeout.ms | 45000 | 会话超时时间 |
| heartbeat.interval.ms | 3000 | 心跳间隔 |
HBase 配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| hbase.regionserver.handler.count | 30 | RegionServer RPC 处理线程数 |
| hbase.hregion.memstore.flush.size | 128MB | MemStore 刷写阈值 |
| hbase.regionserver.global.memstore.size | 0.4 | MemStore 占堆内存比例 |
| hfile.block.cache.size | 0.4 | 读缓存占堆内存比例 |
| hbase.hregion.max.filesize | 10GB | Region 分裂阈值 |
| hbase.client.scanner.caching | 100 | 客户端扫描缓存行数 |
性能调优要点
HDFS 调优
- 增加 DataNode 数量,提高并行度
- 调整块大小,减少元数据占用
- 启用短路读取,减少网络开销
- 配置合适的副本数
- 启用 HDFS 缓存
<!-- 短路读取配置 -->
<property>
<name>dfs.client.read.shortcircuit</name>
<value>true</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/lib/hadoop-hdfs/dn_socket</value>
</property>
Hive 调优
- 使用分区表和分桶表
- 选择合适的文件格式(ORC/Parquet)
- 启用压缩
- 使用向量化查询
- 调整 Map 和 Reduce 数量
- 使用 CBO(基于成本的优化器)
-- 启用向量化查询
SET hive.vectorized.execution.enabled = true;
SET hive.vectorized.execution.reduce.enabled = true;
-- 使用 Tez 引擎
SET hive.execution.engine = tez;
-- 启用 CBO
SET hive.cbo.enable = true;
SET hive.compute.query.using.stats = true;
Spark 调优
- 调整并行度(每个 CPU 核 2-3 个任务)
- 合理分配内存(Storage 和 Execution)
- 使用广播 Join 处理小表
- 处理数据倾斜
- 使用 Kryo 序列化
- 启用 AQE(自适应查询执行)
// 推荐配置
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Flink 调优
- 设置合理的并行度
- 配置状态后端(RocksDB)
- 启用增量检查点
- 优化网络缓冲区
- 处理背压问题
# 推荐配置
state.backend: rocksdb
state.backend.incremental: true
execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 600000
taskmanager.network.memory.fraction: 0.15
Kafka 调优
- 生产者:增大 batch.size 和 linger.ms,启用压缩
- 消费者:增大 fetch.min.bytes 和 max.poll.records
- Broker:增加分区数,调整日志段大小
- 启用幂等性和事务保证消息可靠性
- 监控消费者延迟(Consumer Lag)
# 生产者优化
compression.type=lz4
batch.size=65536
linger.ms=10
# 消费者优化
fetch.min.bytes=1048576
fetch.max.wait.ms=500
max.poll.records=1000
HBase 调优
- 预分区避免热点
- 合理设计 RowKey
- 调整 MemStore 和 BlockCache 大小
- 使用压缩和布隆过滤器
- 调整 Region 大小
<!-- 推荐配置 -->
<property>
<name>hbase.regionserver.handler.count</name>
<value>100</value>
</property>
<property>
<name>hbase.hregion.memstore.flush.size</name>
<value>268435456</value> <!-- 256MB -->
</property>
常见问题排查
内存溢出(OOM)
| 场景 | 可能原因 | 解决方案 |
|---|---|---|
| Driver OOM | collect() 数据过大 | 使用 take() 或写入文件 |
| Executor OOM | 数据倾斜、分区过大 | 增加分区数、处理倾斜 |
| MapReduce OOM | 任务内存不足 | 增加 mapreduce.map/reduce.memory.mb |
数据倾斜
| 症状 | 可能原因 | 解决方案 |
|---|---|---|
| 少数任务执行慢 | Key 分布不均 | 加盐、广播 Join |
| 某节点负载高 | 热点数据 | 预分区、散列 Key |
性能慢
| 场景 | 可能原因 | 解决方案 |
|---|---|---|
| Shuffle 慢 | 分区数不合理 | 调整分区数 |
| 小文件多 | 输出文件过多 | 合并小文件 |
| GC 频繁 | 内存不足 | 增加内存、序列化存储 |