跳到主要内容

大数据技术速查表

本文档提供大数据核心技术的快速参考,包括常用命令、配置参数和代码片段。

HDFS 常用命令

文件操作

# 查看目录
hdfs dfs -ls /path
hdfs dfs -ls -R /path # 递归查看

# 创建目录
hdfs dfs -mkdir -p /path/to/dir

# 上传文件
hdfs dfs -put localfile /hdfs/path/
hdfs dfs -copyFromLocal localfile /hdfs/path/
hdfs dfs -moveFromLocal localfile /hdfs/path/ # 上传后删除本地文件

# 下载文件
hdfs dfs -get /hdfs/path/file localpath/
hdfs dfs -copyToLocal /hdfs/path/file localpath/

# 查看文件内容
hdfs dfs -cat /path/file
hdfs dfs -tail /path/file # 查看文件末尾
hdfs dfs -head /path/file # 查看文件开头

# 删除文件/目录
hdfs dfs -rm /path/file
hdfs dfs -rm -r /path/dir # 递归删除
hdfs dfs -rm -skipTrash /path/file # 不放入回收站

# 移动/重命名
hdfs dfs -mv /old/path /new/path

# 复制
hdfs dfs -cp /src/path /dst/path

# 查看文件大小
hdfs dfs -du -h /path # 人类可读格式
hdfs dfs -du -s -h /path # 汇总大小

# 统计目录下文件数量
hdfs dfs -count /path
hdfs dfs -count -q /path # 显示配额信息

# 查看文件块信息
hdfs fsck /path/file -files -blocks -locations

# 设置副本数
hdfs dfs -setrep 3 /path/file
hdfs dfs -setrep -R 3 /path/dir # 递归设置

# 合并下载
hdfs dfs -getmerge /path/dir/* localfile.txt

权限管理

# 修改权限
hdfs dfs -chmod 755 /path
hdfs dfs -chmod -R 755 /path # 递归修改

# 修改所有者
hdfs dfs -chown user:group /path
hdfs dfs -chown -R user:group /path

# 修改组
hdfs dfs -chgrp group /path

管理命令

# 查看集群状态
hdfs dfsadmin -report

# 安全模式
hdfs dfsadmin -safemode get # 查看状态
hdfs dfsadmin -safemode enter # 进入安全模式
hdfs dfsadmin -safemode leave # 退出安全模式
hdfs dfsadmin -safemode wait # 等待退出安全模式

# 刷新节点
hdfs dfsadmin -refreshNodes

# 查看NameNode状态(HA环境)
hdfs haadmin -getServiceState nn1
hdfs haadmin -getAllServiceState

# 保存命名空间
hdfs dfsadmin -saveNamespace

# 滚动编辑日志
hdfs dfsadmin -rollEdits

# 格式化NameNode(仅首次)
hdfs namenode -format

配额管理

# 设置目录配额(文件数)
hdfs dfsadmin -setQuota 10000 /path

# 设置空间配额(字节)
hdfs dfsadmin -setSpaceQuota 1t /path

# 清除配额
hdfs dfsadmin -clrQuota /path
hdfs dfsadmin -clrSpaceQuota /path

HBase Shell 命令

表操作

# 创建表
create 'table', 'cf1', 'cf2'
create 'table', {NAME => 'cf', VERSIONS => 3, TTL => 86400}
create 'table', {NAME => 'cf1'}, {NAME => 'cf2', COMPRESSION => 'SNAPPY'}

# 查看表
list
describe 'table'
exists 'table'

# 禁用/启用表
disable 'table'
enable 'table'
is_disabled 'table'
is_enabled 'table'

# 删除表
disable 'table'
drop 'table'

# 修改表
disable 'table'
alter 'table', {NAME => 'cf', VERSIONS => 5}
alter 'table', 'cf3' # 添加列族
alter 'table', {NAME => 'cf1', METHOD => 'delete'} # 删除列族
enable 'table'

# 清空表
truncate 'table'
truncate_preserve 'table' # 保留分区

数据操作

# 写入数据
put 'table', 'rowkey', 'cf:col', 'value'
put 'table', 'rowkey', 'cf:col', 'value', timestamp

# 读取数据
get 'table', 'rowkey'
get 'table', 'rowkey', 'cf'
get 'table', 'rowkey', 'cf:col'
get 'table', 'rowkey', {COLUMN => 'cf:col', VERSIONS => 3}
get 'table', 'rowkey', {TIMERANGE => [ts1, ts2]}

# 扫描
scan 'table'
scan 'table', {STARTROW => 'start', STOPROW => 'end'}
scan 'table', {COLUMNS => ['cf1', 'cf2']}
scan 'table', {LIMIT => 10}
scan 'table', {REVERSED => true} # 反向扫描

# 删除数据
delete 'table', 'rowkey', 'cf:col'
delete 'table', 'rowkey', 'cf:col', timestamp
deleteall 'table', 'rowkey'

# 统计
count 'table'
count 'table', {INTERVAL => 1000} # 每1000行显示一次

# 追加数据
append 'table', 'rowkey', 'cf:col', 'append_value'

# 原子递增
incr 'table', 'rowkey', 'cf:counter', 1

过滤器

# 值过滤
scan 'table', {FILTER => "ValueFilter(=, 'binary:value')"}
scan 'table', {FILTER => "ValueFilter(=, 'substring:hello')"}

# 行键过滤
scan 'table', {FILTER => "RowFilter(=, 'binary:rowkey')"}
scan 'table', {FILTER => "RowFilter(=, 'substring:key')"}

# 列名过滤
scan 'table', {FILTER => "QualifierFilter(=, 'substring:col')"}

# 单列值过滤
scan 'table', {FILTER => "SingleColumnValueFilter('cf', 'col', =, 'binary:value')"}

# 分页过滤器
scan 'table', {FILTER => "PageFilter(10)"}

# 组合过滤
scan 'table', {FILTER => "Filter1 AND Filter2 OR Filter3"}

快照操作

# 创建快照
snapshot 'table', 'snapshot_name'

# 列出快照
list_snapshots

# 从快照恢复
restore_snapshot 'snapshot_name'

# 克隆快照
clone_snapshot 'snapshot_name', 'new_table'

# 删除快照
delete_snapshot 'snapshot_name'

Hive 常用语法

DDL

-- 创建数据库
CREATE DATABASE IF NOT EXISTS db_name
COMMENT 'Database description'
LOCATION '/path/to/db'
WITH DBPROPERTIES ('creator' = 'admin');

-- 查看数据库
SHOW DATABASES;
SHOW DATABASES LIKE 'db_*';
DESCRIBE DATABASE db_name;
DESCRIBE DATABASE EXTENDED db_name;

-- 创建内部表
CREATE TABLE table_name (
col1 INT COMMENT 'Column 1',
col2 STRING COMMENT 'Column 2',
col3 DECIMAL(10,2) COMMENT 'Column 3'
)
COMMENT 'Table description'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|'
MAP KEYS TERMINATED BY ':'
STORED AS TEXTFILE
TBLPROPERTIES ('skip.header.line.count'='1');

-- 创建外部表
CREATE EXTERNAL TABLE table_name (...)
LOCATION '/path/to/data';

-- 创建分区表
CREATE TABLE table_name (...)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;

-- 创建分桶表
CREATE TABLE table_name (...)
CLUSTERED BY (col) SORTED BY (col) INTO 4 BUCKETS
STORED AS ORC;

-- 创建表并指定属性
CREATE TABLE table_name (...)
STORED AS ORC
TBLPROPERTIES (
'orc.compress'='SNAPPY',
'orc.create.index'='true'
);

-- 修改表
ALTER TABLE table_name RENAME TO new_name;
ALTER TABLE table_name ADD COLUMNS (new_col STRING);
ALTER TABLE table_name CHANGE old_col new_col INT;
ALTER TABLE table_name REPLACE COLUMNS (col1 INT, col2 STRING);

-- 分区操作
ALTER TABLE table_name ADD PARTITION (year=2024, month=1);
ALTER TABLE table_name DROP PARTITION (year=2024, month=1);
ALTER TABLE table_name PARTITION (year=2024) SET LOCATION '/new/path';
MSCK REPAIR TABLE table_name; -- 恢复分区
SHOW PARTITIONS table_name;

-- 删除表
DROP TABLE IF EXISTS table_name;
TRUNCATE TABLE table_name;

DML

-- 导入数据
LOAD DATA LOCAL INPATH '/local/path/file' INTO TABLE table_name;
LOAD DATA INPATH '/hdfs/path/file' INTO TABLE table_name;
LOAD DATA LOCAL INPATH '/path/file' OVERWRITE INTO TABLE table_name;

-- 插入数据
INSERT INTO TABLE table_name VALUES (1, 'a', 100);
INSERT OVERWRITE TABLE table_name SELECT * FROM source;

-- 动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE target PARTITION(year, month)
SELECT *, year_col, month_col FROM source;

-- 多表插入
FROM source_table
INSERT INTO TABLE target1 SELECT col1, col2
INSERT INTO TABLE target2 SELECT col3, col4;

-- 导出数据
INSERT OVERWRITE LOCAL DIRECTORY '/local/path/output'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT * FROM table_name;

查询

-- 基本查询
SELECT col1, col2 FROM table_name WHERE condition;

-- 聚合
SELECT
col,
COUNT(*) as cnt,
SUM(amount) as total,
AVG(amount) as avg_amount,
MAX(amount) as max_val,
MIN(amount) as min_val
FROM table_name
GROUP BY col
HAVING COUNT(*) > 10;

-- JOIN
SELECT a.*, b.*
FROM table_a a
JOIN table_b b ON a.id = b.id;

SELECT a.*, b.*
FROM table_a a
LEFT OUTER JOIN table_b b ON a.id = b.id;

-- Map Join 提示
SELECT /*+ MAPJOIN(small_table) */
a.*, b.*
FROM big_table a
JOIN small_table b ON a.id = b.id;

-- 窗口函数
SELECT
col,
ROW_NUMBER() OVER (PARTITION BY group_col ORDER BY order_col) as rn,
RANK() OVER (PARTITION BY group_col ORDER BY order_col) as rk,
DENSE_RANK() OVER (PARTITION BY group_col ORDER BY order_col) as dr,
SUM(col) OVER (PARTITION BY group_col ORDER BY order_col
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_sum,
LAG(col, 1) OVER (ORDER BY order_col) as prev_val,
LEAD(col, 1) OVER (ORDER BY order_col) as next_val
FROM table_name;

-- 排序
SELECT * FROM table_name ORDER BY col; -- 全局排序
SELECT * FROM table_name SORT BY col; -- 局部排序
SELECT * FROM table_name DISTRIBUTE BY col1 SORT BY col2;

-- CTE(公共表表达式)
WITH cte1 AS (
SELECT * FROM table1 WHERE condition
),
cte2 AS (
SELECT * FROM table2 WHERE condition
)
SELECT a.*, b.* FROM cte1 a JOIN cte2 b ON a.id = b.id;

常用函数

-- 字符串函数
CONCAT(str1, str2, ...) -- 拼接
CONCAT_WS(separator, str1, str2, ...) -- 带分隔符拼接
SUBSTR(str, start, length) -- 截取
SPLIT(str, pattern) -- 分割
LENGTH(str) -- 长度
UPPER(str) / LOWER(str) -- 大小写
TRIM(str) / LTRIM(str) / RTRIM(str) -- 去空格
REGEXP_EXTRACT(str, pattern, idx) -- 正则提取
REGEXP_REPLACE(str, pattern, replacement)-- 正则替换
INSTR(str, substr) -- 查找子串位置

-- 日期函数
CURRENT_DATE() -- 当前日期
CURRENT_TIMESTAMP() -- 当前时间戳
DATE_ADD(date, days) -- 日期加
DATE_SUB(date, days) -- 日期减
DATEDIFF(date1, date2) -- 日期差
DATE_FORMAT(date, pattern) -- 格式化
YEAR(date) / MONTH(date) / DAY(date) -- 提取年月日
FROM_UNIXTIME(timestamp) -- Unix时间戳转日期
UNIX_TIMESTAMP(date) -- 日期转Unix时间戳

-- 数学函数
ROUND(num, d) -- 四舍五入
FLOOR(num) / CEIL(num) -- 向下/向上取整
ABS(num) -- 绝对值
POW(base, exp) -- 幂运算
SQRT(num) -- 平方根
RAND() -- 随机数

-- 条件函数
IF(condition, true_val, false_val) -- 条件
CASE WHEN cond1 THEN val1 WHEN cond2 THEN val2 ELSE val3 END
NVL(val, default) -- 空值替换
COALESCE(val1, val2, ...) -- 返回第一个非空值

-- 集合函数
SIZE(array|map) -- 元素个数
ARRAY_CONTAINS(array, val) -- 是否包含
MAP_KEYS(map) / MAP_VALUES(map) -- 获取键/值

-- 类型转换
CAST(expr AS type)

Spark 常用操作

RDD 操作

// 创建RDD
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 4) // 指定分区数
val rdd = sc.textFile("hdfs://path/file")
val rdd = sc.wholeTextFiles("hdfs://path/dir") // 读取目录下所有文件

// 基本转换
rdd.map(x => x * 2)
rdd.filter(x => x > 0)
rdd.flatMap(x => x.split(" "))
rdd.distinct()
rdd.union(otherRdd)
rdd.intersection(otherRdd)
rdd.subtract(otherRdd)
rdd.sample(withReplacement, fraction, seed)

// Key-Value操作
pairRdd.reduceByKey(_ + _)
pairRdd.groupByKey()
pairRdd.mapValues(_ * 2)
pairRdd.flatMapValues(_.split(","))
pairRdd.sortByKey(ascending)
pairRdd.join(otherRdd)
pairRdd.leftOuterJoin(otherRdd)
pairRdd.rightOuterJoin(otherRdd)
pairRdd.cogroup(otherRdd)
pairRdd.subtractByKey(otherRdd)

// 聚合操作
pairRdd.aggregateByKey(zeroValue)(seqOp, combOp)
pairRdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
pairRdd.foldByKey(zeroValue)(_ + _)

// 行动操作
rdd.collect()
rdd.collectAsMap() // PairRDD 转为 Map
rdd.count()
rdd.countByValue()
rdd.reduce(_ + _)
rdd.fold(0)(_ + _)
rdd.aggregate(zeroValue)(seqOp, combOp)
rdd.first()
rdd.take(n)
rdd.takeOrdered(n)
rdd.top(n)
rdd.countByKey()
rdd.foreach(println)
rdd.saveAsTextFile("path")
rdd.saveAsSequenceFile("path")

// 分区操作
rdd.getNumPartitions
rdd.repartition(n) // 增加或减少分区(触发Shuffle)
rdd.coalesce(n) // 减少分区(不触发Shuffle)
rdd.partitionBy(new HashPartitioner(n))

// 排序
rdd.sortBy(x => x)
rdd.sortBy(x => x, ascending = false)

// 持久化
rdd.cache() // 等同于 persist(MEMORY_ONLY)
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
rdd.unpersist()

DataFrame 操作

// 创建DataFrame
val df = spark.read.csv("path")
val df = spark.read.json("path")
val df = spark.read.parquet("path")
val df = spark.read.format("jdbc").options(...).load()
val df = Seq((1, "a"), (2, "b")).toDF("id", "name")

// 基本操作
df.select("col1", "col2")
df.selectExpr("col1", "col2 * 2 as col2_doubled")
df.filter($"col" > 0)
df.filter("col > 0")
df.where($"col" > 0)
df.distinct()
df.dropDuplicates("col1", "col2")
df.drop("col")
df.withColumn("new_col", $"col" * 2)
df.withColumnRenamed("old_name", "new_name")

// 排序和限制
df.orderBy($"col".desc)
df.sort($"col1".asc, $"col2".desc)
df.limit(10)

// 分组聚合
df.groupBy("col").count()
df.groupBy("col").agg(
count("*").as("cnt"),
sum("amount").as("total"),
avg("amount").as("avg_amount"),
max("amount").as("max_val"),
min("amount").as("min_val")
)

// 连接
df1.join(df2, "key")
df1.join(df2, Seq("key1", "key2"))
df1.join(df2, df1("id") === df2("id"), "left")

// 集合操作
df1.union(df2)
df1.unionByName(df2)
df1.intersect(df2)
df1.except(df2)

// SQL查询
df.createOrReplaceTempView("table")
spark.sql("SELECT * FROM table WHERE col > 0")

// 写入
df.write.csv("path")
df.write.parquet("path")
df.write.json("path")
df.write.mode("overwrite").parquet("path")
df.write.mode("append").saveAsTable("table")
df.write.format("jdbc").options(...).save()

Spark Streaming

// DStream
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val counts = words.map((_, 1)).reduceByKey(_ + _)
counts.print()
ssc.start()
ssc.awaitTermination()

// Structured Streaming
val spark = SparkSession.builder().getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()

val words = lines.as[String].flatMap(_.split(" "))
val counts = words.groupBy("value").count()

val query = counts.writeStream
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()

query.awaitTermination()

Spark 常用配置

// Executor 配置
spark.executor.memory // Executor 内存
spark.executor.cores // Executor CPU 核数
spark.executor.instances // Executor 实例数
spark.executor.memoryOverhead // 堆外内存

// Driver 配置
spark.driver.memory // Driver 内存
spark.driver.maxResultSize // 最大结果大小

// 内存配置
spark.memory.fraction // Spark 内存占比,默认 0.6
spark.memory.storageFraction // 存储内存占比,默认 0.5

// 并行度配置
spark.default.parallelism // 默认并行度
spark.sql.shuffle.partitions // Shuffle 分区数

// 序列化配置
spark.serializer // 序列化器,推荐 Kryo
spark.kryoserializer.buffer.max // Kryo 缓冲区大小

// Shuffle 配置
spark.shuffle.compress // Shuffle 输出压缩
spark.shuffle.spill.compress // Shuffle 溢写压缩

// 动态资源分配
spark.dynamicAllocation.enabled // 启用动态资源分配
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors
spark.dynamicAllocation.initialExecutors

// AQE (Adaptive Query Execution, Spark 3.0+)
spark.sql.adaptive.enabled // 启用 AQE
spark.sql.adaptive.coalescePartitions.enabled // 合并小分区
spark.sql.adaptive.skewJoin.enabled // 处理数据倾斜

DataStream API

// 创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 数据源
DataStream<String> stream = env.socketTextStream("localhost", 9999);
DataStream<Event> kafka = env.addSource(
new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));
DataStream<Event> file = env.readFile(...);

// 转换操作
stream.map(x -> x.toUpperCase())
stream.filter(x -> x.length() > 0)
stream.flatMap((x, out) -> { ... })
stream.keyBy(e -> e.getKey())

// 窗口操作
stream.keyBy(e -> e.getKey())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("count")

stream.keyBy(e -> e.getKey())
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

stream.keyBy(e -> e.getKey())
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.process(...)

// 输出
stream.print()
stream.addSink(new FlinkKafkaProducer<>(...))
stream.writeAsText("/path/output")

// 执行
env.execute("JobName")

时间和窗口

// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 分配时间戳和水位线
stream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.getTimestamp())
);

// 窗口类型
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // 滚动
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动
.window(EventTimeSessionWindows.withGap(Time.seconds(10))) // 会话
.window(GlobalWindows.create()) // 全局

// 窗口函数
.window(...).reduce((a, b) -> { ... })
.window(...).aggregate(new MyAggregateFunction())
.window(...).process(new MyProcessWindowFunction())

状态管理

// Keyed State
private ValueState<Long> countState;
countState = getRuntimeContext().getState(
new ValueStateDescriptor<>("count", Long.class));
countState.update(value);
Long value = countState.value();

private ListState<String> listState;
listState.add(element);

private MapState<String, Integer> mapState;
mapState.put(key, value);
Integer value = mapState.get(key);

// 状态后端
env.setStateBackend(new HashMapStateBackend()); // 内存
env.setStateBackend(new EmbeddedRocksDBStateBackend()); // RocksDB
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");
-- 创建表
CREATE TABLE orders (
order_id INT,
user_id INT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);

-- 窗口查询
SELECT
user_id,
TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
SUM(amount) as total_amount
FROM orders
GROUP BY
user_id,
TUMBLE(order_time, INTERVAL '1' HOUR);
# 核心配置
parallelism.default: 4
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 4

# 检查点配置
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: rocksdb
state.checkpoints.dir: hdfs:///checkpoints

# 高可用配置
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs:///flink/ha

Kafka 常用命令

Topic 管理

# 创建Topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic topic_name \
--partitions 3 \
--replication-factor 2 \
--config retention.ms=86400000

# 查看Topic列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看Topic详情
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic topic_name

# 修改Topic配置
kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics --entity-name topic_name \
--add-config retention.ms=86400000

# 增加分区(只能增加,不能减少)
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic topic_name \
--partitions 6

# 删除Topic
kafka-topics.sh --delete \
--bootstrap-server localhost:9092 \
--topic topic_name

消息操作

# 生产消息
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--property parse.key=true \
--property key.separator=,

# 消费消息
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--from-beginning

# 指定消费组
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--group group_name

# 显示消息Key
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic_name \
--property print.key=true \
--property key.separator=,

# 查看消费组列表
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

# 查看消费组详情
kafka-consumer-groups.sh --describe \
--bootstrap-server localhost:9092 \
--group group_name

# 重置消费位置
kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server localhost:9092 \
--group group_name \
--topic topic_name \
--to-earliest \
--execute

# 删除消费组
kafka-consumer-groups.sh --delete \
--bootstrap-server localhost:9092 \
--group group_name

性能测试

# 生产者性能测试
kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 1000000 \
--record-size 1000 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092

# 消费者性能测试
kafka-consumer-perf-test.sh \
--topic test-topic \
--messages 1000000 \
--bootstrap-server localhost:9092

常用配置参数

Hadoop 核心配置

参数默认值说明
fs.defaultFSfile:///默认文件系统
dfs.replication3副本数
dfs.blocksize128MB块大小
dfs.namenode.name.dir-NameNode 数据目录
dfs.datanode.data.dir-DataNode 数据目录
dfs.namenode.handler.count10NameNode 处理线程数
dfs.datanode.handler.count10DataNode 处理线程数
mapreduce.framework.namelocal计算框架

YARN 配置

参数默认值说明
yarn.nodemanager.resource.memory-mb8192NodeManager 可用内存
yarn.nodemanager.resource.cpu-vcores8NodeManager 可用 CPU 核数
yarn.scheduler.minimum-allocation-mb1024容器最小内存
yarn.scheduler.maximum-allocation-mb8192容器最大内存
yarn.nodemanager.vmem-check-enabledtrue虚拟内存检查
yarn.nodemanager.pmem-check-enabledtrue物理内存检查
yarn.resourcemanager.scheduler.classCapacityScheduler调度器类

Spark 配置

参数默认值说明
spark.executor.memory1gExecutor 内存
spark.executor.cores1Executor 核心数
spark.executor.instances2Executor 实例数
spark.driver.memory1gDriver 内存
spark.sql.shuffle.partitions200Shuffle 分区数
spark.default.parallelism总核数默认并行度
spark.serializerJavaSerializer序列化器
spark.memory.fraction0.6Spark 内存占比
spark.memory.storageFraction0.5存储内存占比
参数默认值说明
parallelism.default1默认并行度
taskmanager.memory.process.size1728mTaskManager 内存
taskmanager.numberOfTaskSlots1每个 TaskManager 的槽位数
state.backendhashmap状态后端
execution.checkpointing.interval-检查点间隔
execution.checkpointing.modeexactly_once检查点模式

Kafka Broker 配置

参数默认值说明
broker.id-Broker 唯一标识
listenersPLAINTEXT://:9092监听地址
log.dirs/tmp/kafka-logs日志存储目录
num.partitions1默认分区数
default.replication.factor1默认副本数
log.retention.hours168消息保留时间(小时)
log.segment.bytes1GB日志段大小
log.cleanup.policydelete清理策略

Kafka 生产者配置

参数默认值说明
acksall确认机制
retries2147483647重试次数
batch.size16384批量发送大小
linger.ms5批量等待时间
buffer.memory33554432缓冲区大小
compression.typenone压缩类型
enable.idempotencefalse幂等性开关
max.in.flight.requests.per.connection5最大未确认请求数

Kafka 消费者配置

参数默认值说明
group.id-消费者组 ID
auto.offset.resetlatest无 Offset 时的行为
enable.auto.committrue自动提交 Offset
auto.commit.interval.ms5000自动提交间隔
max.poll.records500单次拉取最大记录数
max.poll.interval.ms300000拉取最大间隔
session.timeout.ms45000会话超时时间
heartbeat.interval.ms3000心跳间隔

HBase 配置

参数默认值说明
hbase.regionserver.handler.count30RegionServer RPC 处理线程数
hbase.hregion.memstore.flush.size128MBMemStore 刷写阈值
hbase.regionserver.global.memstore.size0.4MemStore 占堆内存比例
hfile.block.cache.size0.4读缓存占堆内存比例
hbase.hregion.max.filesize10GBRegion 分裂阈值
hbase.client.scanner.caching100客户端扫描缓存行数

性能调优要点

HDFS 调优

  • 增加 DataNode 数量,提高并行度
  • 调整块大小,减少元数据占用
  • 启用短路读取,减少网络开销
  • 配置合适的副本数
  • 启用 HDFS 缓存
<!-- 短路读取配置 -->
<property>
<name>dfs.client.read.shortcircuit</name>
<value>true</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/lib/hadoop-hdfs/dn_socket</value>
</property>

Hive 调优

  • 使用分区表和分桶表
  • 选择合适的文件格式(ORC/Parquet)
  • 启用压缩
  • 使用向量化查询
  • 调整 Map 和 Reduce 数量
  • 使用 CBO(基于成本的优化器)
-- 启用向量化查询
SET hive.vectorized.execution.enabled = true;
SET hive.vectorized.execution.reduce.enabled = true;

-- 使用 Tez 引擎
SET hive.execution.engine = tez;

-- 启用 CBO
SET hive.cbo.enable = true;
SET hive.compute.query.using.stats = true;

Spark 调优

  • 调整并行度(每个 CPU 核 2-3 个任务)
  • 合理分配内存(Storage 和 Execution)
  • 使用广播 Join 处理小表
  • 处理数据倾斜
  • 使用 Kryo 序列化
  • 启用 AQE(自适应查询执行)
// 推荐配置
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
  • 设置合理的并行度
  • 配置状态后端(RocksDB)
  • 启用增量检查点
  • 优化网络缓冲区
  • 处理背压问题
# 推荐配置
state.backend: rocksdb
state.backend.incremental: true
execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 600000
taskmanager.network.memory.fraction: 0.15

Kafka 调优

  • 生产者:增大 batch.size 和 linger.ms,启用压缩
  • 消费者:增大 fetch.min.bytes 和 max.poll.records
  • Broker:增加分区数,调整日志段大小
  • 启用幂等性和事务保证消息可靠性
  • 监控消费者延迟(Consumer Lag)
# 生产者优化
compression.type=lz4
batch.size=65536
linger.ms=10

# 消费者优化
fetch.min.bytes=1048576
fetch.max.wait.ms=500
max.poll.records=1000

HBase 调优

  • 预分区避免热点
  • 合理设计 RowKey
  • 调整 MemStore 和 BlockCache 大小
  • 使用压缩和布隆过滤器
  • 调整 Region 大小
<!-- 推荐配置 -->
<property>
<name>hbase.regionserver.handler.count</name>
<value>100</value>
</property>
<property>
<name>hbase.hregion.memstore.flush.size</name>
<value>268435456</value> <!-- 256MB -->
</property>

常见问题排查

内存溢出(OOM)

场景可能原因解决方案
Driver OOMcollect() 数据过大使用 take() 或写入文件
Executor OOM数据倾斜、分区过大增加分区数、处理倾斜
MapReduce OOM任务内存不足增加 mapreduce.map/reduce.memory.mb

数据倾斜

症状可能原因解决方案
少数任务执行慢Key 分布不均加盐、广播 Join
某节点负载高热点数据预分区、散列 Key

性能慢

场景可能原因解决方案
Shuffle 慢分区数不合理调整分区数
小文件多输出文件过多合并小文件
GC 频繁内存不足增加内存、序列化存储