跳到主要内容

Hive 数据仓库

Apache Hive 是构建在 Hadoop 之上的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类 SQL 查询功能。Hive 让不熟悉 MapReduce 的开发人员也能进行大数据分析。

Hive 概述

什么是 Hive?

Hive 最初由 Facebook 开发,用于处理海量的社交网络数据。它提供了一种类似 SQL 的查询语言——HiveQL(HQL),将 SQL 语句转换为 MapReduce 或 Tez、Spark 任务执行。

Hive 的设计目标是让分析师能够用熟悉的 SQL 语法来处理存储在 Hadoop 中的海量数据,而无需编写复杂的 MapReduce 程序。Hive 并不是一个真正的数据库,它更像是一个"SQL-on-Hadoop"的查询引擎,底层依赖 HDFS 进行存储,依赖 MapReduce、Tez 或 Spark 进行计算。

Hive 核心特点

特点说明
类 SQL 语法降低大数据分析门槛,分析师可快速上手
可扩展支持自定义函数(UDF、UDAF、UDTF)
多引擎支持可运行在 MapReduce、Tez、Spark 上
元数据管理统一的元数据存储,支持多工具共享
大数据处理支持处理 PB 级别数据

Hive 与传统数据库对比

理解 Hive 与传统关系型数据库的区别,有助于正确选择技术方案:

维度Hive传统数据库
数据规模TB 到 PB 级GB 到 TB 级
查询延迟分钟到小时毫秒到秒
数据更新不支持频繁更新,适合追加写入支持事务,频繁增删改
索引有限支持完善的索引机制
适用场景离线分析、数据仓库、ETLOLTP 事务处理
扩展性水平扩展,可添加节点垂直扩展为主
数据存储HDFS,分布式存储本地文件系统或共享存储

Hive 不适合需要低延迟响应的在线业务,也不适合频繁更新的场景。它的强项是批量处理海量数据,进行复杂的分析和聚合计算。

Hive 版本演进

版本发布时间重要特性
Hive 0.x2008-2012基础 SQL 支持、MapReduce 执行引擎
Hive 0.132014ACID 事务支持
Hive 1.x2015Hive on Spark、Cost-Based Optimizer
Hive 2.x2016LLAP(低延迟分析处理)、HiveStreaming
Hive 3.x2018物化视图增强、LLAP 改进
Hive 4.02024Iceberg 深度集成、原生地理空间支持、性能优化、超过 5000 项改进

Hive 4.0 重要更新

Hive 4.0 是自 2018 年 Hive 3.0 发布以来的首个主要版本,包含超过 5000 项提交,是一次重大更新:

1. Iceberg 深度集成

这是 Hive 4.0 最重要的特性。Apache Iceberg 是一种开放的数据表格式,专为大规模数据分析设计。Hive 4.0 与 Iceberg 的集成包括:

  • 原生支持创建和管理 Iceberg 表
  • 支持 OPTIMIZE TABLE 语法进行数据压缩
  • 支持快照管理(过期快照、删除孤立文件)
  • 支持 Copy-on-Write 和 Merge-on-Read 两种写入模式
  • 支持分区操作(插入、截断、删除分区)
-- 创建 Iceberg 表
CREATE TABLE iceberg_table (
id INT,
name STRING,
event_time TIMESTAMP
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES (
'format-version' = '2',
'write.format.default' = 'parquet'
);

-- 优化 Iceberg 表(合并小文件)
OPTIMIZE TABLE iceberg_table;

-- 过期旧快照
ALTER TABLE iceberg_table EXECUTE EXPIRE_SNAPSHOTS('2024-01-01 00:00:00');

2. 原生地理空间支持

Hive 4.0 引入了原生地理空间函数,无需额外安装 ESRI UDF:

-- 创建包含地理数据的表
CREATE TABLE locations (
id INT,
name STRING,
point GEOMETRY,
polygon GEOMETRY
);

-- 地理空间查询
SELECT
name,
ST_Distance(point, ST_Point(116.4074, 39.9042)) as distance
FROM locations
WHERE ST_Within(point, ST_Polygon(...));

-- 支持的地理空间函数
ST_Point(x, y) -- 创建点
ST_Polygon(...) -- 创建多边形
ST_Distance(g1, g2) -- 计算距离
ST_Within(g1, g2) -- 判断是否包含
ST_Intersection(g1, g2) -- 计算交集
ST_Buffer(g, distance) -- 创建缓冲区

3. 性能优化

  • 改进的查询优化器
  • 更高效的元数据操作
  • 减少文件系统调用
  • 改进的向量化执行

4. 其他重要改进

  • 新增 typeof UDF,返回表达式的数据类型
  • 改进的日期时间解析
  • 更好的 Parquet 和 ORC 支持
  • 升级依赖库版本(Hadoop 3.3.6、Tez 0.10.3、ORC 1.8.5)

Hive 架构

Hive 采用经典的客户端-服务器架构,主要组件包括:

用户接口(Client Interface)

Hive 提供多种用户交互方式:

  • CLI(命令行接口):传统的 Hive 命令行工具,直接与 Hive 服务交互
  • Beeline:基于 JDBC 的命令行工具,通过 Thrift 协议连接 HiveServer2
  • JDBC/ODBC:标准数据库连接接口,支持各种 BI 工具和应用集成
  • Web GUI:通过 Hue 等工具提供 Web 界面

元数据存储(Metastore)

Metastore 是 Hive 的核心组件,存储所有表的元数据信息:

  • 表定义:表名、列名、数据类型、注释
  • 存储信息:数据位置、文件格式、SerDe 配置
  • 分区信息:分区列、分区值、分区位置
  • 统计信息:行数、文件数、数据大小

Metastore 通常使用 MySQL、PostgreSQL 或 Derby 作为后端存储。元数据独立存储的好处是可以被多个工具共享,比如 Spark、Presto 等都可以访问 Hive 的元数据。

驱动器(Driver)

Driver 是 Hive 的核心执行引擎,负责整个查询的生命周期管理:

  1. 解析器(Parser):将 SQL 字符串解析为抽象语法树(AST)
  2. 语义分析器(Semantic Analyzer):将 AST 转换为查询块,检查表和列是否存在
  3. 逻辑计划生成器:生成逻辑执行计划
  4. 优化器(Optimizer):优化执行计划,包括谓词下推、列裁剪、Join 优化等
  5. 物理计划生成器:将逻辑计划转换为物理执行计划
  6. 执行器(Executor):将任务提交给执行引擎运行

执行引擎

Hive 支持多种执行引擎:

引擎特点适用场景
MapReduce稳定可靠,但性能较低兼容性要求高的场景
TezDAG 执行模型,性能优于 MR大多数生产环境
Spark内存计算,性能最佳需要快速响应的场景

LLAP 低延迟分析处理

LLAP(Low Latency Analytical Processing)是 Hive 2.0 引入的关键特性,通过常驻内存的守护进程大幅降低查询延迟,使 Hive 能够支持交互式分析场景。

LLAP 架构

传统 Hive 查询需要启动 Tez 应用,每次查询都有启动开销。LLAP 通过引入常驻后台服务解决了这个问题:

传统模式 vs LLAP 模式

特性传统模式LLAP 模式
启动开销每次查询启动应用守护进程常驻
查询延迟秒到分钟级亚秒到秒级
资源利用独占资源共享资源池
缓存无内置缓存内存缓存热数据
适用场景大批量离线处理交互式分析、BI 报表

LLAP 核心组件

  1. LLAP Daemon:运行在每个节点上的常驻进程

    • 执行查询片段
    • 管理内存缓存
    • 处理数据预取
  2. LLAP Cache:内存缓存层

    • 缓存热点数据
    • 支持 ORC/Parquet 列级缓存
    • LRU 淘汰策略
  3. Query Fragment:查询片段

    • 将查询拆分为小片段
    • 分发给 LLAP Daemon 执行

LLAP 配置

<!-- 启用 LLAP -->
<property>
<name>hive.llap.execution.mode</name>
<value>all</value> <!-- all, auto, none -->
</property>

<!-- LLAP Daemon 内存 -->
<property>
<name>hive.llap.daemon.memory.mb</name>
<value>16384</value> <!-- 16GB -->
</property>

<!-- LLAP 缓存大小 -->
<property>
<name>hive.llap.io.memory.size</name>
<value>8589934592</value> <!-- 8GB -->
</property>

<!-- LLAP Daemon 数量 -->
<property>
<name>hive.llap.daemon.num.executors</name>
<value>4</value>
</property>

<!-- 启用 IO 电梯(异步 IO) -->
<property>
<name>hive.llap.io.enabled</name>
<value>true</value>
</property>

<!-- 缓存命中率监控 -->
<property>
<name>hive.llap.io.cache.stats.track.cache.hit.ratio</name>
<value>true</value>
</property>

LLAP 使用场景

适合 LLAP 的场景

  • 交互式 BI 报表查询
  • 低延迟仪表板
  • 探索性数据分析
  • 需要快速响应的查询

不太适合 LLAP 的场景

  • 超大规模批量 ETL
  • 需要大量 Shuffle 的复杂查询
  • 资源受限的小集群

LLAP 监控

-- 查看 LLAP 状态
SET hive.llap.execution.mode;

-- 查看缓存统计
-- 通过 YARN UI 或 Grafana 监控
-- 关键指标:缓存命中率、查询延迟、并发请求数

监控要点

指标说明建议值
Cache Hit Ratio缓存命中率> 80%
Average Query Time平均查询时间< 10s
Executor Utilization执行器利用率60-80%
Memory Usage内存使用率< 85%

数据模型

数据库

数据库是表的命名空间,用于组织和管理相关表:

-- 创建数据库
CREATE DATABASE IF NOT EXISTS analytics
COMMENT 'Analytics data warehouse'
LOCATION '/user/hive/warehouse/analytics.db'
WITH DBPROPERTIES (
'creator' = 'data_team',
'created_at' = '2024-01-01'
);

-- 查看数据库列表
SHOW DATABASES;
SHOW DATABASES LIKE 'ana*';

-- 查看数据库详情
DESCRIBE DATABASE analytics;
DESCRIBE DATABASE EXTENDED analytics;

-- 切换数据库
USE analytics;

-- 删除数据库
DROP DATABASE IF EXISTS analytics CASCADE; -- CASCADE 会级联删除所有表

表类型

Hive 支持多种表类型,理解它们的区别对于数据管理至关重要。

内部表(Managed Table)

内部表由 Hive 完全管理数据和元数据。删除表时,数据和元数据都会被删除:

-- 创建内部表
CREATE TABLE employees (
id INT COMMENT 'Employee ID',
name STRING COMMENT 'Employee name',
department STRING COMMENT 'Department name',
salary DECIMAL(10, 2) COMMENT 'Monthly salary',
hire_date DATE COMMENT 'Hire date'
)
COMMENT 'Employee information table'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|'
MAP KEYS TERMINATED BY ':'
NULL DEFINED AS ''
STORED AS TEXTFILE;

-- 查看表结构
DESCRIBE employees;
DESCRIBE FORMATTED employees; -- 更详细的信息
SHOW CREATE TABLE employees; -- 显示建表语句

内部表适合存放中间结果、临时数据或 Hive 独占的数据。

外部表(External Table)

外部表只管理元数据,数据由用户管理。删除表时只删除元数据,数据保留:

-- 创建外部表
CREATE EXTERNAL TABLE access_logs (
log_time TIMESTAMP,
ip_address STRING,
request_url STRING,
status_code INT,
response_size BIGINT
)
COMMENT 'Web server access logs'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data/logs/access' -- 指定数据位置
TBLPROPERTIES (
'external.table.purge' = 'false' -- 删除表时保留数据
);

-- 外部表也可以配置为删除表时同时删除数据
-- ALTER TABLE access_logs SET TBLPROPERTIES ('external.table.purge' = 'true');

外部表适合管理已经存在的数据,或者需要与其他工具共享的数据。

内部表 vs 外部表

操作内部表外部表
删除表(DROP TABLE)删除元数据和数据只删除元数据,数据保留
数据管理Hive 管理用户管理
适用场景中间结果、临时数据、独占数据共享数据、原始数据

选择原则:如果数据只由 Hive 使用,选择内部表;如果数据需要与其他工具共享,或者需要保留原始数据,选择外部表。

分区表

分区表将数据按分区列的值存储在不同目录中,是 Hive 最重要的性能优化手段之一。

创建分区表

-- 创建分区表
CREATE TABLE sales (
order_id BIGINT,
product_id INT,
customer_id INT,
quantity INT,
amount DECIMAL(12, 2)
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS PARQUET;

-- 添加分区
ALTER TABLE sales ADD PARTITION (year=2024, month=1, day=1);
ALTER TABLE sales ADD PARTITION (year=2024, month=1, day=2);

-- 批量添加分区
ALTER TABLE sales ADD IF NOT EXISTS
PARTITION (year=2024, month=1, day=1)
PARTITION (year=2024, month=1, day=2)
PARTITION (year=2024, month=1, day=3);

-- 查看分区
SHOW PARTITIONS sales;
SHOW PARTITIONS sales PARTITION(year=2024);

-- 删除分区
ALTER TABLE sales DROP IF EXISTS PARTITION (year=2024, month=1, day=1);

分区查询

-- 分区裁剪:只扫描需要的分区
SELECT * FROM sales
WHERE year=2024 AND month=1;

-- 查看分区信息
DESCRIBE FORMATTED sales PARTITION (year=2024, month=1, day=1);

分区设计原则

分区设计直接影响查询性能:

  1. 分区列选择:选择经常用于过滤条件的列作为分区列,如日期、地区
  2. 分区粒度:不要过度分区,每个分区应该有足够的数据量(建议至少 1GB)
  3. 避免数据倾斜:分区之间的数据量应该相对均衡
分区目录结构示例:
/user/hive/warehouse/sales/
├── year=2024/
│ ├── month=1/
│ │ ├── day=1/
│ │ │ └── data.parquet
│ │ ├── day=2/
│ │ │ └── data.parquet
│ │ └── ...
│ └── month=2/
│ └── ...
└── ...

动态分区插入

当需要根据数据自动创建分区时,使用动态分区:

-- 启用动态分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict; -- 允许所有列都是动态分区

-- 动态分区插入
INSERT INTO TABLE sales PARTITION(year, month, day)
SELECT
order_id,
product_id,
customer_id,
quantity,
amount,
year(order_date) as year,
month(order_date) as month,
day(order_date) as day
FROM staging_sales;

动态分区的注意事项:

  • 动态分区列必须放在 SELECT 语句的最后
  • 动态分区会创建大量分区时,需要调整 hive.exec.max.dynamic.partitions 参数

分桶表

分桶表将数据按列的哈希值分散到固定数量的文件中,用于优化 Join 和采样查询。

-- 创建分桶表
CREATE TABLE users_bucketed (
user_id BIGINT,
username STRING,
email STRING,
registration_date DATE,
country STRING
)
CLUSTERED BY (user_id) INTO 16 BUCKETS -- 按 user_id 哈希分 16 个桶
SORTED BY (user_id) -- 桶内按 user_id 排序
STORED AS ORC;

-- 插入数据到分桶表(需要启用分桶)
SET hive.enforce.bucketing=true; -- 或 SET hive.strict.checks.bucketing=false

INSERT INTO TABLE users_bucketed
SELECT * FROM users_staging;

分桶表的优势:

  1. 优化 Join:两个表按相同列分桶,可以进行桶间 Join,避免全表 Shuffle
  2. 高效采样:可以只读取部分桶进行采样分析
  3. 数据均匀分布:避免数据倾斜

存储格式

选择合适的存储格式对性能影响巨大:

文本格式(TEXTFILE)

-- 文本格式是默认格式,适合导入导出
CREATE TABLE text_table (
id INT,
name STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

优点:可读性好,便于调试 缺点:占用空间大,查询性能差

行列式存储(ORC/Parquet)

-- ORC 格式(Hive 原生优化格式)
CREATE TABLE orc_table (
id INT,
name STRING,
amount DECIMAL(10, 2)
)
STORED AS ORC
TBLPROPERTIES (
'orc.compress' = 'SNAPPY', -- 压缩算法
'orc.create.index' = 'true', -- 创建索引
'orc.stripe.size' = '67108864' -- 条带大小
);

-- Parquet 格式(跨平台兼容性好)
CREATE TABLE parquet_table (
id INT,
name STRING,
amount DECIMAL(10, 2)
)
STORED AS PARQUET
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY',
'parquet.block.size' = '134217728'
);

ORC 与 Parquet 对比:

特性ORCParquet
压缩率更高
查询性能更优(Hive)优(多平台)
生态支持Hive 原生Spark、Impala 等
索引支持支持有限
推荐场景Hive 主导的分析多引擎混用场景

数据类型

基本数据类型

类型说明示例
TINYINT1 字节整数10Y
SMALLINT2 字节整数1000S
INT4 字节整数100000
BIGINT8 字节整数1000000000L
FLOAT单精度浮点3.14F
DOUBLE双精度浮点3.14159
DECIMAL(p,s)精确数值DECIMAL(10,2)
STRING变长字符串'hello'
VARCHAR(n)有长度限制的字符串VARCHAR(100)
CHAR(n)定长字符串CHAR(10)
BOOLEAN布尔值TRUE/FALSE
DATE日期'2024-01-01'
TIMESTAMP时间戳'2024-01-01 12:00:00'
BINARY二进制数据cast('abc' as binary)

复杂数据类型

-- ARRAY:数组类型
CREATE TABLE array_example (
id INT,
tags ARRAY<STRING>,
scores ARRAY<INT>
);

-- 插入数据
INSERT INTO array_example VALUES
(1, array('tag1', 'tag2', 'tag3'), array(90, 85, 92));

-- 查询数组元素
SELECT id, tags[0] as first_tag, size(tags) as tag_count
FROM array_example;

-- 展开数组(LATERAL VIEW)
SELECT id, tag
FROM array_example
LATERAL VIEW explode(tags) t AS tag;


-- MAP:映射类型
CREATE TABLE map_example (
id INT,
properties MAP<STRING, STRING>,
metrics MAP<STRING, INT>
);

-- 插入数据
INSERT INTO map_example VALUES
(1, map('color', 'red', 'size', 'large'), map('views', 1000, 'clicks', 50));

-- 查询 Map 元素
SELECT id, properties['color'] as color, metrics['views'] as views
FROM map_example;

-- 获取所有 Key 和 Value
SELECT id, map_keys(properties) as keys, map_values(properties) as values
FROM map_example;

-- 展开Map
SELECT id, key, value
FROM map_example
LATERAL VIEW explode(properties) p AS key, value;


-- STRUCT:结构体类型
CREATE TABLE struct_example (
id INT,
address STRUCT<street:STRING, city:STRING, zip:STRING>,
location STRUCT<lat:DOUBLE, lon:DOUBLE>
);

-- 插入数据
INSERT INTO struct_example VALUES
(1, named_struct('street', 'Main St', 'city', 'Beijing', 'zip', '100000'),
named_struct('lat', 39.9042, 'lon', 116.4074));

-- 查询结构体字段
SELECT id, address.city, address.zip, location.lat, location.lon
FROM struct_example;


-- 嵌套复杂类型
CREATE TABLE nested_example (
id INT,
user_profile STRUCT<
name:STRING,
contacts:ARRAY<STRUCT<type:STRING, value:STRING>>,
preferences:MAP<STRING, STRING>
>
);

HiveQL 查询

基本查询

-- 选择列(列裁剪)
SELECT name, department, salary FROM employees;

-- WHERE 条件过滤
SELECT * FROM employees
WHERE salary > 50000
AND department IN ('Engineering', 'Sales');

-- DISTINCT 去重
SELECT DISTINCT department FROM employees;

-- LIMIT 限制结果
SELECT * FROM employees LIMIT 10;

-- 列别名
SELECT
name AS employee_name,
salary * 12 AS annual_salary
FROM employees;

-- CASE WHEN 条件表达式
SELECT
name,
salary,
CASE
WHEN salary >= 100000 THEN 'Senior'
WHEN salary >= 50000 THEN 'Mid-level'
ELSE 'Junior'
END AS level,
CASE department
WHEN 'Engineering' THEN 'Tech'
WHEN 'Sales' THEN 'Business'
ELSE 'Other'
END AS dept_category
FROM employees;

-- NULL 处理
SELECT
name,
COALESCE(bonus, 0) AS bonus, -- 返回第一个非空值
NVL(commission, 0) AS commission, -- NULL 替换
NULLIF(salary, 0) AS salary_check -- 如果相等返回 NULL
FROM employees;

聚合函数

-- 基本聚合
SELECT
COUNT(*) AS total_count,
COUNT(DISTINCT department) AS dept_count,
SUM(salary) AS total_salary,
AVG(salary) AS avg_salary,
MAX(salary) AS max_salary,
MIN(salary) AS min_salary,
STDDEV(salary) AS salary_stddev,
VARIANCE(salary) AS salary_var
FROM employees;

-- GROUP BY 分组
SELECT
department,
COUNT(*) AS emp_count,
AVG(salary) AS avg_salary,
MAX(salary) AS max_salary,
MIN(salary) AS min_salary
FROM employees
GROUP BY department;

-- HAVING 过滤分组结果
SELECT
department,
COUNT(*) AS emp_count,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department
HAVING COUNT(*) > 10 AND AVG(salary) > 50000;

-- GROUPING SETS:多维度分组
SELECT
department,
job_level,
COUNT(*) AS emp_count,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department, job_level
GROUPING SETS (
(department, job_level), -- 按部门和级别分组
(department), -- 只按部门分组
(job_level), -- 只按级别分组
() -- 总计
);

-- CUBE:所有可能的组合
SELECT
department,
job_level,
COUNT(*) AS emp_count
FROM employees
GROUP BY department, job_level WITH CUBE;

-- ROLLUP:层级聚合
SELECT
department,
job_level,
COUNT(*) AS emp_count
FROM employees
GROUP BY department, job_level WITH ROLLUP;
-- 等价于 GROUPING SETS ((department, job_level), (department), ())

JOIN 操作

Hive 支持多种 Join 类型,理解它们的使用场景很重要:

-- 内连接(INNER JOIN)
SELECT e.name, e.department, d.location
FROM employees e
INNER JOIN departments d ON e.department = d.dept_name;

-- 左外连接(LEFT OUTER JOIN)
SELECT e.name, d.dept_name
FROM employees e
LEFT JOIN departments d ON e.department = d.dept_name;

-- 右外连接(RIGHT OUTER JOIN)
SELECT e.name, d.dept_name
FROM employees e
RIGHT JOIN departments d ON e.department = d.dept_name;

-- 全外连接(FULL OUTER JOIN)
SELECT e.name, d.dept_name
FROM employees e
FULL OUTER JOIN departments d ON e.department = d.dept_name;

-- 左半连接(LEFT SEMI JOIN):相当于 IN 子查询
SELECT e.name, e.department
FROM employees e
LEFT SEMI JOIN departments d ON e.department = d.dept_name;
-- 等价于:WHERE e.department IN (SELECT dept_name FROM departments)

-- 交叉连接(CROSS JOIN):笛卡尔积
SELECT e.name, d.dept_name
FROM employees e
CROSS JOIN departments d;

Map Join 优化

当 Join 的一方是小表时,可以使用 Map Join 避免 Reduce 阶段:

-- 手动指定 Map Join
SELECT /*+ MAPJOIN(d) */
e.name,
e.department,
d.location
FROM employees e
JOIN departments d ON e.department = d.dept_name;

-- 自动 Map Join(推荐)
SET hive.auto.convert.join=true;
SET hive.auto.convert.join.noconditionaltask=true;
SET hive.auto.convert.join.noconditionaltask.size=10000000; -- 10MB

Join 优化技巧

-- 大表 Join 大表时的优化
-- 1. 过滤条件尽早应用
SELECT e.name, d.dept_name
FROM (SELECT * FROM employees WHERE active = 1) e
JOIN (SELECT * FROM departments WHERE status = 'active') d
ON e.department = d.dept_name;

-- 2. 使用 STREAMTABLE 提示
-- 默认 Hive 会尝试缓存右表,如果左表更大则需要指定
SELECT /*+ STREAMTABLE(e) */ e.name, d.dept_name
FROM employees e
JOIN departments d ON e.department = d.dept_name;

排序

-- ORDER BY:全局排序,只有一个 Reducer
SELECT * FROM employees ORDER BY salary DESC;

-- SORT BY:每个 Reducer 内部排序,非全局有序
SELECT * FROM employees SORT BY salary DESC;

-- DISTRIBUTE BY + SORT BY:控制分区和排序
-- 按 department 分发到不同 Reducer,每个 Reducer 内按 salary 排序
SELECT * FROM employees
DISTRIBUTE BY department
SORT BY salary DESC;

-- CLUSTER BY:等于 DISTRIBUTE BY + SORT BY(同一列)
SELECT * FROM employees CLUSTER BY department;
-- 等价于 DISTRIBUTE BY department SORT BY department

ORDER BY 与 SORT BY 的区别:

特性ORDER BYSORT BY
排序范围全局排序Reducer 内排序
Reducer 数量只有 1 个可以多个
性能数据量大时慢较快
结果有序性完全有序部分有序

子查询

-- WHERE 子查询
SELECT * FROM employees
WHERE department IN (
SELECT dept_name FROM departments WHERE location = 'Beijing'
);

-- FROM 子查询
SELECT dept, avg_salary
FROM (
SELECT
department AS dept,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department
) t
WHERE avg_salary > 50000;

-- WITH 子句(CTE,Common Table Expression)
WITH
dept_stats AS (
SELECT department, AVG(salary) AS avg_salary
FROM employees
GROUP BY department
),
high_salary_depts AS (
SELECT department FROM dept_stats WHERE avg_salary > 80000
)
SELECT e.name, e.department, e.salary
FROM employees e
JOIN high_salary_depts h ON e.department = h.department;

-- EXISTS 子查询
SELECT name FROM employees e
WHERE EXISTS (
SELECT 1 FROM departments d
WHERE d.dept_name = e.department
AND d.location = 'Beijing'
);

窗口函数

窗口函数是 Hive 中最强大的分析功能之一,它可以在不减少行数的情况下进行聚合计算。窗口函数也叫 OLAP 函数,非常适合数据分析场景。

基本语法

function_name([expression]) OVER (
[PARTITION BY partition_expression]
[ORDER BY sort_expression [ASC|DESC]]
[frame_clause]
)
  • PARTITION BY:将数据分组,窗口函数在每个分组内独立计算
  • ORDER BY:在分组内排序,影响某些窗口函数的计算结果
  • frame_clause:定义窗口范围,如 ROWS BETWEEN ... AND ...

聚合窗口函数

-- 累计求和
SELECT
name,
department,
salary,
SUM(salary) OVER (PARTITION BY department ORDER BY hire_date) AS running_total
FROM employees;

-- 移动平均
SELECT
order_date,
amount,
AVG(amount) OVER (
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7days
FROM daily_sales;

-- 分组内聚合
SELECT
name,
department,
salary,
MAX(salary) OVER (PARTITION BY department) AS dept_max_salary,
AVG(salary) OVER (PARTITION BY department) AS dept_avg_salary,
salary - AVG(salary) OVER (PARTITION BY department) AS diff_from_avg
FROM employees;

-- 去重计数
SELECT
department,
COUNT(DISTINCT employee_id) OVER (PARTITION BY department) AS unique_employees
FROM employees;

排名函数

-- ROW_NUMBER:行号,连续递增,不重复
SELECT
name,
department,
salary,
ROW_NUMBER() OVER (ORDER BY salary DESC) AS overall_rank,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank
FROM employees;

-- RANK:排名,相同值排名相同,跳过后续排名
SELECT
name,
salary,
RANK() OVER (ORDER BY salary DESC) AS salary_rank
FROM employees;
-- 结果示例:1, 2, 2, 4, 5(两个第2名后跳到第4名)

-- DENSE_RANK:排名,相同值排名相同,不跳过
SELECT
name,
salary,
DENSE_RANK() OVER (ORDER BY salary DESC) AS salary_rank
FROM employees;
-- 结果示例:1, 2, 2, 3, 4(两个第2名后是第3名)

-- NTILE:将数据分成 N 组
SELECT
name,
salary,
NTILE(4) OVER (ORDER BY salary DESC) AS quartile -- 分成4组
FROM employees;

-- PERCENT_RANK:百分比排名
SELECT
name,
salary,
PERCENT_RANK() OVER (ORDER BY salary) AS pct_rank -- 0 到 1 之间
FROM employees;

-- CUME_DIST:累积分布
SELECT
name,
salary,
CUME_DIST() OVER (ORDER BY salary) AS cumulative_dist -- 小于等于当前值的比例
FROM employees;

排名函数对比:

函数说明示例(相同值)
ROW_NUMBER连续递增编号1, 2, 3, 4, 5
RANK相同值相同排名,跳过1, 2, 2, 4, 5
DENSE_RANK相同值相同排名,不跳过1, 2, 2, 3, 4

值访问函数

-- LEAD:访问后续行
SELECT
order_date,
amount,
LEAD(amount, 1) OVER (ORDER BY order_date) AS next_day_amount,
LEAD(amount, 3, 0) OVER (ORDER BY order_date) AS amount_3_days_later -- 默认值为 0
FROM daily_sales;

-- LAG:访问前行
SELECT
order_date,
amount,
LAG(amount, 1) OVER (ORDER BY order_date) AS prev_day_amount,
amount - LAG(amount, 1) OVER (ORDER BY order_date) AS daily_change
FROM daily_sales;

-- FIRST_VALUE:分组内第一个值
SELECT
name,
department,
salary,
FIRST_VALUE(salary) OVER (PARTITION BY department ORDER BY hire_date) AS first_hire_salary
FROM employees;

-- LAST_VALUE:分组内最后一个值
SELECT
name,
department,
salary,
LAST_VALUE(salary) OVER (
PARTITION BY department
ORDER BY hire_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS last_hire_salary
FROM employees;

窗口范围定义

-- ROWS:基于行数
SELECT
order_date,
amount,
AVG(amount) OVER (
ORDER BY order_date
ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
) AS moving_avg
FROM daily_sales;

-- RANGE:基于值范围(通常用于日期)
SELECT
order_date,
amount,
SUM(amount) OVER (
ORDER BY order_date
RANGE BETWEEN INTERVAL '7' DAY PRECEDING AND CURRENT ROW
) AS rolling_7day_sum
FROM daily_sales;

-- 窗口范围选项
-- UNBOUNDED PRECEDING:从分区的第一行开始
-- n PRECEDING:从当前行向前 n 行
-- CURRENT ROW:当前行
-- n FOLLOWING:从当前行向后 n 行
-- UNBOUNDED FOLLOWING:到分区的最后一行结束

WINDOW 子句

当多个窗口函数使用相同的窗口定义时,可以使用 WINDOW 子句简化:

SELECT 
name,
department,
salary,
SUM(salary) OVER w AS total,
AVG(salary) OVER w AS average,
ROW_NUMBER() OVER w AS row_num
FROM employees
WINDOW w AS (PARTITION BY department ORDER BY hire_date);

数据导入导出

数据导入

-- 从本地文件导入
LOAD DATA LOCAL INPATH '/local/path/data.csv'
INTO TABLE employees;

-- 从 HDFS 导入
LOAD DATA INPATH '/hdfs/path/data.csv'
INTO TABLE employees;

-- 覆盖导入
LOAD DATA LOCAL INPATH '/local/path/data.csv'
OVERWRITE INTO TABLE employees;

-- 导入到分区表
LOAD DATA LOCAL INPATH '/local/path/data.csv'
INTO TABLE sales PARTITION(year=2024, month=1);

-- INSERT 导入
INSERT INTO TABLE employees VALUES
(1, 'Alice', 'Engineering', 80000, '2024-01-15'),
(2, 'Bob', 'Sales', 70000, '2024-01-20');

-- INSERT OVERWRITE 覆盖
INSERT OVERWRITE TABLE employees
SELECT * FROM temp_employees;

-- 多表插入
FROM source_table
INSERT INTO TABLE target1 SELECT col1, col2
INSERT INTO TABLE target2 SELECT col3, col4
INSERT OVERWRITE TABLE target3 SELECT col5, col6;

-- 动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO TABLE sales PARTITION(year, month)
SELECT
order_id,
product_id,
amount,
year(order_date) as year,
month(order_date) as month
FROM staging_sales;

数据导出

-- 导出到本地
INSERT OVERWRITE LOCAL DIRECTORY '/local/path/output'
SELECT * FROM employees;

-- 导出到 HDFS
INSERT OVERWRITE DIRECTORY '/hdfs/path/output'
SELECT * FROM employees;

-- 指定格式导出
INSERT OVERWRITE LOCAL DIRECTORY '/local/path/output'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|'
MAP KEYS TERMINATED BY ':'
NULL DEFINED AS ''
SELECT * FROM employees;

-- 使用 hive 命令导出
-- hive -e "SELECT * FROM employees" > output.txt
-- hive -f query.hql > output.txt

导出工具

# 使用 sqoop 导出到关系数据库
sqoop export \
--connect jdbc:mysql://localhost:3306/target_db \
--username root \
--password password \
--table target_table \
--export-dir /user/hive/warehouse/source_table \
--input-fields-terminated-by '\001'

视图与物化视图

普通视图

视图是虚拟表,不存储数据,每次查询时动态计算:

-- 创建视图
CREATE VIEW IF NOT EXISTS high_salary_employees
COMMENT 'Employees with salary above 80000'
AS
SELECT id, name, department, salary
FROM employees
WHERE salary > 80000;

-- 创建带列别名的视图
CREATE VIEW dept_salary_stats (dept, emp_count, avg_salary, max_salary)
AS
SELECT
department,
COUNT(*),
AVG(salary),
MAX(salary)
FROM employees
GROUP BY department;

-- 查询视图
SELECT * FROM high_salary_employees WHERE department = 'Engineering';

-- 查看视图定义
SHOW CREATE TABLE high_salary_employees;
DESCRIBE EXTENDED high_salary_employees;

-- 删除视图
DROP VIEW IF EXISTS high_salary_employees;

视图的使用场景:

  • 简化复杂查询
  • 提供数据安全层(隐藏敏感列)
  • 统一数据访问接口

物化视图

物化视图存储查询结果,可以增量更新。与普通视图不同,物化视图实际存储数据,查询时直接读取预计算结果,大幅提升复杂聚合查询的性能。

创建物化视图

-- 创建物化视图
CREATE MATERIALIZED VIEW order_summary
DISABLE REWRITE -- 初始禁用查询重写
AS
SELECT
product_id,
COUNT(*) as order_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount
FROM orders
GROUP BY product_id;

-- 查看物化视图
SHOW MATERIALIZED VIEWS;
DESCRIBE FORMATTED order_summary;

查询重写

物化视图最大的价值在于自动查询重写。当用户查询的聚合粒度比物化视图更粗时,优化器可以自动使用物化视图:

-- 启用查询重写
ALTER MATERIALIZED VIEW order_summary ENABLE REWRITE;

-- 此时,以下查询会自动使用物化视图
-- 原本需要扫描 orders 全表,现在只需扫描物化视图
SELECT
SUM(order_count) as total_orders,
SUM(total_amount) as grand_total
FROM order_summary;

-- 查看是否使用了物化视图
EXPLAIN SELECT SUM(order_count) FROM order_summary;

增量刷新

当源表数据变化时,物化视图需要刷新。Hive 支持多种刷新策略:

-- 手动完全刷新(重建整个物化视图)
ALTER MATERIALIZED VIEW order_summary REBUILD;

-- 增量刷新(仅更新变化的数据)
-- 需要源表是 ACID 表,且物化视图支持增量刷新
ALTER MATERIALIZED VIEW order_summary REBUILD
WITH INCREMENTAL;

-- 查看刷新状态
DESCRIBE FORMATTED order_summary;

增量刷新的限制

  • 源表必须是 ACID 事务表
  • 物化视图的查询不能包含某些复杂操作
  • 删除操作可能触发全量刷新

物化视图最佳实践

-- 1. 选择高频查询创建物化视图
-- 分析查询日志,找出重复执行的聚合查询

-- 2. 合理选择聚合粒度
-- 粒度太细:存储开销大,刷新慢
-- 粒度太粗:查询重写机会少
CREATE MATERIALIZED VIEW sales_daily_summary
AS
SELECT
DATE(order_time) as order_date,
product_category,
store_region,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
GROUP BY DATE(order_time), product_category, store_region;

-- 3. 定时刷新
-- 可以通过脚本或调度工具定期刷新
-- crontab: 0 */4 * * * hive -e "ALTER MATERIALIZED VIEW order_summary REBUILD;"

-- 4. 监控刷新时间
-- 如果刷新时间过长,考虑重新设计物化视图

使用场景

场景说明
BI 报表预计算常见报表指标
实时仪表板定期刷新的汇总数据
数据分析加速探索性分析
数据服务提供快速查询响应

Iceberg 表集成

Apache Iceberg 是一种开放的数据表格式,专为大规模数据分析设计。Hive 4.0 提供了原生 Iceberg 支持。

为什么选择 Iceberg?

与传统 Hive 表相比,Iceberg 提供了更多高级特性:

特性传统 Hive 表Iceberg 表
模式演进需要重写数据支持添加/删除/重命名列
分区演进需要重写数据动态调整分区策略
时间旅行不支持支持快照查询
回滚不支持支持快照回滚
小文件需要手动合并自动合并
并发写入有限支持完整支持

创建 Iceberg 表

-- 创建 Iceberg 表
CREATE TABLE iceberg_orders (
order_id BIGINT,
customer_id INT,
product_id INT,
order_time TIMESTAMP,
amount DECIMAL(12, 2)
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES (
'format-version' = '2', -- Iceberg 格式版本
'write.format.default' = 'parquet', -- 数据文件格式
'write.target-file-size-bytes' = '134217728' -- 目标文件大小 128MB
);

-- 创建分区表
CREATE TABLE iceberg_events (
event_id BIGINT,
event_type STRING,
event_time TIMESTAMP,
user_id BIGINT,
event_data STRING
)
PARTITIONED BY ( -- Iceberg 支持隐藏分区
days(event_time), -- 按天分区
bucket(16, user_id) -- 按 user_id 哈希分桶
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES (
'format-version' = '2',
'write.format.default' = 'parquet'
);

时间旅行查询

Iceberg 支持查询历史快照,这对于数据审计和问题排查非常有用:

-- 查看表快照历史
SELECT * FROM iceberg_orders.snapshots;

-- 查询特定快照
SELECT * FROM iceberg_orders
FOR SYSTEM_TIME AS OF '2024-01-15 10:00:00';

-- 查询特定快照 ID
SELECT * FROM iceberg_orders
FOR SYSTEM_VERSION AS OF 1234567890123456789;

-- 比较两个快照的差异
SELECT * FROM iceberg_orders
FOR SYSTEM_TIME AS OF '2024-01-15 10:00:00'
EXCEPT
SELECT * FROM iceberg_orders
FOR SYSTEM_TIME AS OF '2024-01-14 10:00:00';

快照管理

Iceberg 表会累积历史快照,需要定期清理:

-- 过期旧快照(保留最近 7 天)
ALTER TABLE iceberg_orders EXECUTE EXPIRE_SNAPSHOTS('2024-01-08 00:00:00');

-- 删除孤立文件(没有被任何快照引用的文件)
ALTER TABLE iceberg_orders EXECUTE DELETE_ORPHAN_FILES;

-- 合并小文件
ALTER TABLE iceberg_orders EXECUTE OPTIMIZE;

-- 查看表属性
DESCRIBE EXTENDED iceberg_orders;

从 Hive 表迁移到 Iceberg

-- 方式1:创建新表并插入数据
CREATE TABLE iceberg_orders_new
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES ('format-version' = '2')
AS SELECT * FROM hive_orders;

-- 方式2:使用 LOAD DATA
CREATE TABLE iceberg_orders (
-- 列定义
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES ('format-version' = '2');

LOAD DATA INPATH '/user/hive/warehouse/hive_orders'
INTO TABLE iceberg_orders;

函数

内置函数

字符串函数

-- 字符串长度
SELECT length(name) FROM employees;

-- 字符串拼接
SELECT concat(name, '-', department) FROM employees;
SELECT concat_ws(',', name, department) FROM employees; -- 带分隔符

-- 字符串截取
SELECT substr(name, 1, 3) FROM employees; -- 从第1个字符开始,取3个
SELECT substring(name, -3) FROM employees; -- 取最后3个字符

-- 大小写转换
SELECT upper(name), lower(name), initcap(name) FROM employees; -- 首字母大写

-- 去除空格
SELECT trim(name), ltrim(name), rtrim(name) FROM employees;

-- 字符串分割
SELECT split(name, ' ') FROM employees; -- 返回数组

-- 字符串替换
SELECT replace(name, 'old', 'new') FROM employees;

-- 正则匹配
SELECT regexp_extract(name, '([A-Za-z]+)', 1) FROM employees; -- 提取匹配组
SELECT regexp_replace(name, '[0-9]+', '#') FROM employees; -- 正则替换

-- 查找位置
SELECT instr(name, 'abc') FROM employees; -- 返回位置,未找到返回 0
SELECT locate('abc', name, 1) FROM employees; -- 从位置 1 开始查找

-- 格式化
SELECT format_number(salary, 2) FROM employees; -- 数字格式化
SELECT printf('Name: %s, Salary: %.2f', name, salary) FROM employees;

日期函数

-- 当前日期和时间
SELECT current_date(); -- 当前日期
SELECT current_timestamp(); -- 当前时间戳
SELECT unix_timestamp(); -- 当前 Unix 时间戳

-- 日期加减
SELECT date_add('2024-01-01', 7); -- 加 7 天
SELECT date_sub('2024-01-01', 7); -- 减 7 天
SELECT add_months('2024-01-01', 3); -- 加 3 个月

-- 日期差
SELECT datediff('2024-01-10', '2024-01-01'); -- 相差天数
SELECT months_between('2024-03-01', '2024-01-01'); -- 相差月数

-- 日期格式化
SELECT date_format('2024-01-01', 'yyyy-MM'); -- '2024-01'
SELECT from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss');

-- 提取日期部分
SELECT year('2024-01-15'); -- 2024
SELECT month('2024-01-15'); -- 1
SELECT day('2024-01-15'); -- 15
SELECT dayofweek('2024-01-15'); -- 星期几(1=周日)
SELECT weekofyear('2024-01-15'); -- 一年中第几周
SELECT quarter('2024-01-15'); -- 季度

-- 日期转换
SELECT to_date('2024-01-01 12:00:00'); -- 转为日期
SELECT to_timestamp('2024-01-01 12:00:00'); -- 转为时间戳

-- 字符串转日期
SELECT cast('2024-01-01' as date);
SELECT from_unixtime(unix_timestamp('20240101', 'yyyyMMdd'), 'yyyy-MM-dd');

数学函数

-- 取整
SELECT round(3.14159, 2); -- 3.14
SELECT floor(3.9); -- 3(向下取整)
SELECT ceil(3.1); -- 4(向上取整)

-- 绝对值和符号
SELECT abs(-10); -- 10
SELECT sign(-10); -- -1

-- 幂运算和开方
SELECT pow(2, 10); -- 1024
SELECT sqrt(16); -- 4
SELECT cbrt(27); -- 3(立方根)

-- 对数
SELECT log(10, 100); -- 2(以 10 为底 100 的对数)
SELECT ln(2.71828); -- 自然对数
SELECT log2(8); -- 3
SELECT log10(100); -- 2

-- 三角函数
SELECT sin(0), cos(0), tan(0);
SELECT asin(0), acos(1), atan(0);

-- 随机数
SELECT rand(); -- 0 到 1 之间
SELECT rand(42); -- 固定种子的随机数

-- 其他
SELECT greatest(1, 5, 3, 2); -- 5(最大值)
SELECT least(1, 5, 3, 2); -- 1(最小值)
SELECT pmod(10, 3); -- 1(正取模)

条件函数

-- IF 函数
SELECT if(salary > 50000, 'High', 'Low') FROM employees;

-- CASE WHEN
SELECT
name,
CASE
WHEN salary >= 100000 THEN 'Senior'
WHEN salary >= 50000 THEN 'Mid'
ELSE 'Junior'
END as level
FROM employees;

-- NVL:空值替换
SELECT nvl(bonus, 0) FROM employees;

-- COALESCE:返回第一个非空值
SELECT coalesce(bonus, commission, 0) FROM employees;

-- NULLIF:如果相等返回 NULL
SELECT nullif(salary, 0) FROM employees;

集合函数

-- 数组函数
SELECT size(array(1, 2, 3)); -- 3
SELECT array_contains(array(1, 2, 3), 2); -- true
SELECT sort_array(array(3, 1, 2)); -- [1, 2, 3]

-- Map 函数
SELECT size(map('a', 1, 'b', 2)); -- 2
SELECT map_keys(map('a', 1, 'b', 2)); -- ['a', 'b']
SELECT map_values(map('a', 1, 'b', 2)); -- [1, 2]

-- 数组展开
SELECT explode(array(1, 2, 3)); -- 展开为多行
SELECT posexplode(array(1, 2, 3)); -- 展开并返回位置

-- Map 展开
SELECT explode(map('a', 1, 'b', 2)); -- 展开为多行(key, value)

自定义函数

当内置函数无法满足需求时,可以开发自定义函数。

UDF(用户定义函数)

UDF 是一进一出的函数,类似于 UPPERLENGTH 等:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

/**
* 自定义 UDF:去除字符串首尾引号
* 使用方式:SELECT remove_quotes('"hello"');
*/
public class RemoveQuotes extends UDF {

public Text evaluate(Text input) {
if (input == null) {
return null;
}

String str = input.toString();
// 去除首尾引号
String result = str.replaceAll("^\"|\"$", "");
return new Text(result);
}

// 支持重载
public Text evaluate(Text input, Text quoteChar) {
if (input == null) {
return null;
}

String str = input.toString();
String quote = quoteChar != null ? quoteChar.toString() : "\"";
String result = str.replaceAll("^" + quote + "|" + quote + "$", "");
return new Text(result);
}
}

编译和部署:

# 编译
javac -cp $(hadoop classpath):$(hive classpath) RemoveQuotes.java

# 打包
jar cvf my-udf.jar com/example/hive/udf/RemoveQuotes.class

# 在 Hive 中使用
ADD JAR /path/to/my-udf.jar;
CREATE TEMPORARY FUNCTION remove_quotes AS 'com.example.hive.udf.RemoveQuotes';

SELECT remove_quotes('"hello"'); -- 返回 hello

GenericUDF(通用 UDF)

GenericUDF 支持复杂类型和更灵活的参数处理:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

/**
* 自定义 GenericUDF:字符串拼接(支持任意数量参数)
*/
public class ConcatAll extends GenericUDF {

private StringObjectInspector[] inspectors;

@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {

if (arguments.length < 2) {
throw new UDFArgumentLengthException(
"ConcatAll requires at least 2 arguments");
}

inspectors = new StringObjectInspector[arguments.length];
for (int i = 0; i < arguments.length; i++) {
if (!(arguments[i] instanceof StringObjectInspector)) {
throw new UDFArgumentException(
"All arguments must be strings");
}
inspectors[i] = (StringObjectInspector) arguments[i];
}

return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}

@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < arguments.length; i++) {
Object value = arguments[i].get();
if (value != null) {
sb.append(inspectors[i].getPrimitiveJavaObject(value));
}
}
return sb.toString();
}

@Override
public String getDisplayString(String[] children) {
return "concat_all(" + String.join(", ", children) + ")";
}
}

UDAF(用户定义聚合函数)

UDAF 是多进一出的聚合函数,类似于 SUMAVG 等:

package com.example.hive.udaf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.DoubleWritable;

/**
* 自定义 UDAF:计算几何平均数
*/
public class GeometricMean extends AbstractGenericUDAFResolver {

@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {

if (parameters.length != 1) {
throw new UDFArgumentTypeException(
parameters.length - 1, "Exactly one argument is required");
}

return new GeometricMeanEvaluator();
}

public static class GeometricMeanEvaluator extends GenericUDAFEvaluator {

// 输入数据类型检查器
private PrimitiveObjectInspector inputInspector;

// 中间结果:存储 log_sum 和 count
static class LogSumCount implements AggregationBuffer {
double logSum;
long count;
}

@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
super.init(m, parameters);

if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
inputInspector = (PrimitiveObjectInspector) parameters[0];
}

// 输出类型
return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
}

@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
LogSumCount buffer = new LogSumCount();
reset(buffer);
return buffer;
}

@Override
public void reset(AggregationBuffer agg) throws HiveException {
LogSumCount buffer = (LogSumCount) agg;
buffer.logSum = 0.0;
buffer.count = 0;
}

@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
if (parameters[0] != null) {
LogSumCount buffer = (LogSumCount) agg;
double value = inputInspector.getDouble(parameters[0]);
if (value > 0) {
buffer.logSum += Math.log(value);
buffer.count++;
}
}
}

@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
LogSumCount buffer = (LogSumCount) agg;
double[] result = new double[2];
result[0] = buffer.logSum;
result[1] = buffer.count;
return result;
}

@Override
public void merge(AggregationBuffer agg, Object partial)
throws HiveException {
if (partial != null) {
LogSumCount buffer = (LogSumCount) agg;
double[] partialResult = (double[]) partial;
buffer.logSum += partialResult[0];
buffer.count += (long) partialResult[1];
}
}

@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
LogSumCount buffer = (LogSumCount) agg;
if (buffer.count == 0) {
return null;
}
// 几何平均数 = exp(sum(log(x)) / n)
return new DoubleWritable(Math.exp(buffer.logSum / buffer.count));
}
}
}

UDTF(用户定义表生成函数)

UDTF 是一进多出的函数,类似于 explode

package com.example.hive.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

import java.util.ArrayList;
import java.util.List;

/**
* 自定义 UDTF:将字符串按分隔符分割为多行
* 使用方式:SELECT split_to_rows('a,b,c', ',');
* 结果:三行,分别是 a, b, c
*/
public class SplitToRows extends GenericUDTF {

private StringObjectInspector stringInspector;

@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {

if (args.length != 2) {
throw new UDFArgumentLengthException("SplitToRows takes exactly 2 arguments");
}

stringInspector = (StringObjectInspector) args[0];

// 定义输出列
List<String> fieldNames = new ArrayList<>();
fieldNames.add("value");

List<ObjectInspector> fieldOIs = new ArrayList<>();
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

return ObjectInspectorFactory.getStandardStructObjectInspector(
fieldNames, fieldOIs);
}

@Override
public void process(Object[] args) throws HiveException {
String input = stringInspector.getPrimitiveJavaObject(args[0]);
String delimiter = stringInspector.getPrimitiveJavaObject(args[1]);

if (input == null || delimiter == null) {
return;
}

String[] parts = input.split(delimiter);
for (String part : parts) {
String[] result = new String[1];
result[0] = part;
forward(result);
}
}

@Override
public void close() throws HiveException {
// 清理资源
}
}

注册和永久使用 UDF

-- 临时函数(当前会话有效)
ADD JAR /path/to/my-udf.jar;
CREATE TEMPORARY FUNCTION my_func AS 'com.example.hive.udf.MyUDF';

-- 永久函数(所有会话可用)
CREATE FUNCTION my_db.my_func AS 'com.example.hive.udf.MyUDF'
USING JAR 'hdfs:///user/hive/lib/my-udf.jar';

-- 查看函数
SHOW FUNCTIONS LIKE 'my_*';
DESCRIBE FUNCTION my_func;
DESCRIBE FUNCTION EXTENDED my_func;

-- 删除函数
DROP TEMPORARY FUNCTION IF EXISTS my_func;
DROP FUNCTION IF EXISTS my_db.my_func;

性能调优

执行引擎优化

-- 使用 Tez 引擎(推荐)
SET hive.execution.engine=tez;

-- 使用 Spark 引擎
SET hive.execution.engine=spark;

-- Tez 优化配置
SET tez.grouping.min-size=134217728; -- 最小分组大小 128MB
SET tez.grouping.max-size=268435456; -- 最大分组大小 256MB
SET tez.runtime.unordered.output.buffer.size-mb=100;

并行执行

-- 启用并行执行
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=16; -- 并行线程数

-- 向量化执行(批量处理)
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled=true;

Cost-Based Optimizer(CBO)

-- 启用 CBO
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;

-- 收集统计信息
ANALYZE TABLE employees COMPUTE STATISTICS;
ANALYZE TABLE employees COMPUTE STATISTICS FOR COLUMNS;
ANALYZE TABLE sales PARTITION(year=2024, month=1) COMPUTE STATISTICS;

存储优化

-- 使用列式存储
CREATE TABLE optimized_table (...)
STORED AS ORC
TBLPROPERTIES (
'orc.compress' = 'SNAPPY',
'orc.create.index' = 'true'
);

-- 启用压缩
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

-- 合并小文件
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000; -- 每个任务合并的文件大小
SET hive.merge.smallfiles.avgsize=16000000; -- 触发合并的阈值

Join 优化

-- 自动 Map Join
SET hive.auto.convert.join=true;
SET hive.auto.convert.join.noconditionaltask=true;
SET hive.auto.convert.join.noconditionaltask.size=20000000; -- 20MB

-- Skew Join(处理数据倾斜)
SET hive.optimize.skewjoin=true;
SET hive.skewjoin.key=100000; -- 倾斜阈值

-- 大表 Join 大表优化
-- 1. 使用分桶表
-- 2. 使用分区裁剪
-- 3. 使用 STREAMTABLE 提示
SELECT /*+ STREAMTABLE(large_table) */ *
FROM large_table l
JOIN small_table s ON l.id = s.id;

查询优化

-- 列裁剪:只选择需要的列
-- 好
SELECT id, name FROM employees WHERE department = 'Engineering';
-- 差
SELECT * FROM employees WHERE department = 'Engineering';

-- 分区裁剪:使用分区过滤
SELECT * FROM sales WHERE year=2024 AND month=1;

-- 谓词下推:尽早过滤
-- 好
SELECT * FROM (
SELECT * FROM orders WHERE status = 'completed'
) t JOIN customers c ON t.customer_id = c.id;
-- 差
SELECT * FROM orders t JOIN customers c ON t.customer_id = c.id
WHERE t.status = 'completed';

-- 使用 SORT BY 替代 ORDER BY(当不需要全局排序时)
SELECT * FROM employees SORT BY salary DESC;

-- 限制返回行数
SELECT * FROM large_table LIMIT 1000;

-- 避免 SELECT DISTINCT(GROUP BY 通常更快)
SELECT department FROM employees GROUP BY department;
-- 比 SELECT DISTINCT department FROM employees; 更快

内存和资源配置

-- Map 任务配置
SET mapreduce.map.memory.mb=4096;
SET mapreduce.map.java.opts=-Xmx3072m;

-- Reduce 任务配置
SET mapreduce.reduce.memory.mb=8192;
SET mapreduce.reduce.java.opts=-Xmx6144m;

-- Reduce 任务数量
SET mapreduce.job.reduces=10;
-- 或根据数据量自动设置
SET hive.exec.reducers.bytes.per.reducer=256000000; -- 每个 reducer 处理的数据量

常见问题排查

-- 1. 查看执行计划
EXPLAIN SELECT * FROM employees WHERE department = 'Engineering';
EXPLAIN EXTENDED SELECT * FROM employees WHERE department = 'Engineering';

-- 2. 查看 Stage 依赖
EXPLAIN DEPENDENCY SELECT * FROM employees e JOIN departments d;

-- 3. 锁问题排查
SHOW LOCKS;
SHOW LOCKS employees;
-- 禁用锁(开发环境)
SET hive.support.concurrency=false;

-- 4. 小文件问题
-- 查看文件数量
DESCRIBE FORMATTED employees;
-- 合并小文件
INSERT OVERWRITE TABLE employees SELECT * FROM employees;

数据仓库建模

维度建模基础

维度建模是一种专门用于数据仓库的设计方法,由 Ralph Kimball 提出。核心概念包括:

事实表(Fact Table)

事实表存储业务过程的度量数据,包含:

  • 度量列:可聚合的数值数据,如销售额、订单数量
  • 维度外键:关联到维度表的外键
-- 销售事实表
CREATE TABLE fact_sales (
date_key INT, -- 日期维度外键
product_key INT, -- 产品维度外键
store_key INT, -- 门店维度外键
customer_key INT, -- 客户维度外键
sales_amount DECIMAL(12,2), -- 销售金额(可加度量)
quantity INT, -- 销售数量(可加度量)
unit_price DECIMAL(10,2), -- 单价(不可加度量)
transaction_count INT -- 交易次数(可加度量)
)
PARTITIONED BY (year INT, month INT)
STORED AS ORC;

维度表(Dimension Table)

维度表存储业务实体的描述性信息:

-- 产品维度表
CREATE TABLE dim_product (
product_key INT, -- 代理键
product_id STRING, -- 业务键
product_name STRING,
category_name STRING,
brand_name STRING,
supplier_name STRING,
is_active BOOLEAN,
effective_date DATE,
expiry_date DATE,
current_flag BOOLEAN -- 当前有效标识
)
STORED AS ORC;

-- 日期维度表
CREATE TABLE dim_date (
date_key INT,
full_date DATE,
day_of_week INT,
day_name STRING,
week_of_year INT,
month INT,
month_name STRING,
quarter INT,
year INT,
is_weekend BOOLEAN,
is_holiday BOOLEAN
)
STORED AS ORC;

星型模型与雪花模型

星型模型(Star Schema)

星型模型是最简单的维度模型,所有维度表直接与事实表关联:

-- 星型模型示例
-- 事实表
CREATE TABLE fact_order (
order_id BIGINT,
customer_key INT,
product_key INT,
time_key INT,
store_key INT,
order_amount DECIMAL(12,2),
quantity INT
);

-- 维度表直接关联事实表
-- dim_customer、dim_product、dim_time、dim_store 都直接关联 fact_order

-- 查询示例
SELECT
p.category_name,
t.year,
t.month,
SUM(f.order_amount) as total_amount
FROM fact_order f
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_time t ON f.time_key = t.time_key
GROUP BY p.category_name, t.year, t.month;

优点:

  • 查询简单,Join 次数少
  • 适合 OLAP 分析
  • 性能好

雪花模型(Snowflake Schema)

雪花模型是星型模型的规范化形式,维度表可以进一步分解:

-- 雪花模型示例
-- 事实表
CREATE TABLE fact_order_snowflake (
order_id BIGINT,
customer_key INT,
product_key INT,
time_key INT,
store_key INT,
order_amount DECIMAL(12,2),
quantity INT
);

-- 产品维度关联品牌维度
CREATE TABLE dim_product_snowflake (
product_key INT,
product_name STRING,
brand_key INT, -- 关联品牌维度
category_key INT -- 关联类别维度
);

CREATE TABLE dim_brand (
brand_key INT,
brand_name STRING,
brand_country STRING
);

CREATE TABLE dim_category (
category_key INT,
category_name STRING,
parent_category_key INT
);

-- 查询需要更多 Join
SELECT
c.category_name,
b.brand_name,
SUM(f.order_amount) as total_amount
FROM fact_order_snowflake f
JOIN dim_product_snowflake p ON f.product_key = p.product_key
JOIN dim_brand b ON p.brand_key = b.brand_key
JOIN dim_category c ON p.category_key = c.category_key
GROUP BY c.category_name, b.brand_name;

优缺点对比:

特性星型模型雪花模型
查询复杂度简单较复杂
Join 次数
查询性能较差
存储空间较大(冗余)较小(规范化)
维护难度简单复杂
推荐场景大多数数仓场景维度层级复杂场景

缓慢变化维度(SCD)

维度数据会随时间变化,处理变化的方式称为缓慢变化维度(Slowly Changing Dimension):

SCD Type 1:覆盖旧值

直接更新维度记录,不保留历史:

-- 更新客户信息(覆盖)
INSERT OVERWRITE TABLE dim_customer
SELECT
customer_key,
customer_id,
customer_name,
'新地址' as address, -- 新值覆盖旧值
phone,
email
FROM dim_customer
WHERE customer_id = 'C001';

SCD Type 2:新增历史记录

新增一条记录保留历史,使用代理键区分:

-- 新增历史记录
-- 1. 设置旧记录失效
UPDATE dim_customer
SET current_flag = false, expiry_date = CURRENT_DATE
WHERE customer_id = 'C001' AND current_flag = true;

-- 2. 插入新记录
INSERT INTO dim_customer VALUES
(next_key, 'C001', '新名称', '新地址', ..., CURRENT_DATE, NULL, true);

SCD Type 3:新增字段

新增字段存储上一个值:

CREATE TABLE dim_customer_scd3 (
customer_key INT,
customer_id STRING,
customer_name STRING,
current_address STRING, -- 当前地址
previous_address STRING, -- 上一个地址
address_change_date DATE -- 变更日期
);

数据分层架构

现代数据仓库通常采用分层架构:

ODS(原始数据层)
↓ 清洗、转换
DWD(明细数据层)
↓ 汇总
DWS(汇总数据层)
↓ 面向应用
ADS(应用数据层)
-- ODS:原始数据层,保持数据原貌
CREATE TABLE ods_orders (
order_id STRING,
customer_id STRING,
product_id STRING,
order_time STRING,
amount STRING,
raw_data STRING
)
PARTITIONED BY (dt STRING)
STORED AS TEXTFILE;

-- DWD:明细数据层,清洗后的明细数据
CREATE TABLE dwd_order_detail (
order_id BIGINT,
customer_key INT,
product_key INT,
order_time TIMESTAMP,
amount DECIMAL(12,2),
quantity INT
)
PARTITIONED BY (dt DATE)
STORED AS ORC;

-- DWS:汇总数据层,按主题汇总
CREATE TABLE dws_order_summary_daily (
stat_date DATE,
customer_key INT,
order_count BIGINT,
total_amount DECIMAL(15,2),
avg_order_amount DECIMAL(10,2)
)
PARTITIONED BY (dt DATE)
STORED AS ORC;

-- ADS:应用数据层,面向业务
CREATE TABLE ads_customer_order_stats (
customer_id STRING,
customer_name STRING,
total_orders_30d BIGINT,
total_amount_30d DECIMAL(15,2),
last_order_date DATE
)
STORED AS ORC;

Hive 事务

Hive 从 0.13 版本开始支持 ACID 事务,但有一定限制。

启用事务支持

-- 启用事务支持
SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on=true;
SET hive.compactor.worker.threads=1;

创建事务表

-- 创建支持 ACID 的事务表
CREATE TABLE transactional_table (
id INT,
name STRING,
value DECIMAL(10,2)
)
CLUSTERED BY (id) INTO 4 BUCKETS -- 必须分桶
STORED AS ORC -- 必须是 ORC 格式
TBLPROPERTIES (
'transactional' = 'true',
'transactional_properties' = 'default'
);

CRUD 操作

-- 插入数据
INSERT INTO TABLE transactional_table VALUES
(1, 'Alice', 100.00),
(2, 'Bob', 200.00);

-- 更新数据
UPDATE transactional_table
SET value = 150.00
WHERE id = 1;

-- 删除数据
DELETE FROM transactional_table WHERE id = 2;

-- 合并操作(Merge)
MERGE INTO transactional_table t
USING source_table s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.value = s.value
WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.name, s.value);

事务限制

  • 表必须是分桶的 ORC 表
  • 不支持子查询、JOIN 等复杂语句中的 UPDATE/DELETE
  • 事务会产生增量文件,需要定期压缩(Compaction)

Hive on Spark

将 Hive 查询运行在 Spark 上可以显著提升性能:

-- 切换到 Spark 引擎
SET hive.execution.engine=spark;

-- Spark 资源配置
SET spark.executor.memory=4g;
SET spark.executor.cores=2;
SET spark.executor.instances=10;
SET spark.driver.memory=2g;

-- Spark 动态分配
SET spark.dynamicAllocation.enabled=true;
SET spark.dynamicAllocation.minExecutors=1;
SET spark.dynamicAllocation.maxExecutors=20;
SET spark.dynamicAllocation.initialExecutors=5;

-- Spark 优化配置
SET spark.sql.shuffle.partitions=200;
SET spark.sql.adaptive.enabled=true;

实践案例

案例1:构建用户行为分析数仓

假设需要分析电商网站的用户行为,包括浏览、点击、购买等事件。

数据分层设计

-- === ODS 层:原始数据 ===
-- 保持数据原貌,按天分区
CREATE TABLE ods_user_events (
event_id STRING,
user_id STRING,
event_type STRING, -- view, click, cart, purchase
product_id STRING,
category_id STRING,
event_time STRING,
platform STRING,
device_info STRING
)
COMMENT '用户行为原始数据'
PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

-- === DWD 层:明细数据 ===
-- 清洗转换后的明细数据
CREATE TABLE dwd_user_event_detail (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
product_id BIGINT,
category_id BIGINT,
event_time TIMESTAMP,
platform STRING COMMENT 'web/app/miniprogram',
device_type STRING,
province STRING,
city STRING
)
COMMENT '用户行为明细数据'
PARTITIONED BY (dt DATE)
STORED AS ORC
TBLPROPERTIES ('orc.compress' = 'SNAPPY');

-- 数据清洗
INSERT OVERWRITE TABLE dwd_user_event_detail PARTITION(dt)
SELECT
CAST(event_id AS BIGINT),
CAST(user_id AS BIGINT),
event_type,
CAST(product_id AS BIGINT),
CAST(category_id AS BIGINT),
FROM_UNIXTIME(CAST(event_time AS BIGINT)/1000) as event_time,
platform,
GET_JSON_OBJECT(device_info, '$.device_type') as device_type,
GET_JSON_OBJECT(device_info, '$.province') as province,
GET_JSON_OBJECT(device_info, '$.city') as city,
TO_DATE(FROM_UNIXTIME(CAST(event_time AS BIGINT)/1000)) as dt
FROM ods_user_events
WHERE dt = '2024-01-15';

-- === DWS 层:汇总数据 ===
-- 用户日行为汇总
CREATE TABLE dws_user_action_daily (
user_id BIGINT,
dt DATE,
view_count BIGINT,
click_count BIGINT,
cart_count BIGINT,
purchase_count BIGINT,
purchase_amount DECIMAL(12, 2),
unique_products BIGINT,
unique_categories BIGINT
)
COMMENT '用户日行为汇总'
PARTITIONED BY (dt DATE)
STORED AS ORC;

-- 汇总计算
INSERT OVERWRITE TABLE dws_user_action_daily PARTITION(dt)
SELECT
user_id,
dt,
SUM(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END) as view_count,
SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) as click_count,
SUM(CASE WHEN event_type = 'cart' THEN 1 ELSE 0 END) as cart_count,
SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchase_count,
SUM(CASE WHEN event_type = 'purchase' THEN
GET_JSON_OBJECT(device_info, '$.amount') ELSE 0 END) as purchase_amount,
COUNT(DISTINCT product_id) as unique_products,
COUNT(DISTINCT category_id) as unique_categories,
dt
FROM dwd_user_event_detail
GROUP BY user_id, dt;

-- === ADS 层:应用数据 ===
-- 活跃用户指标
CREATE TABLE ads_active_users (
stat_date DATE,
dau BIGINT COMMENT '日活用户数',
wau BIGINT COMMENT '周活用户数',
mau BIGINT COMMENT '月活用户数',
avg_session_duration DOUBLE COMMENT '平均停留时长',
avg_pages_per_user DOUBLE COMMENT '人均浏览页数'
)
STORED AS ORC;

-- 计算 DAU/WAU/MAU
INSERT OVERWRITE TABLE ads_active_users
SELECT
'2024-01-15' as stat_date,
COUNT(DISTINCT CASE WHEN dt = '2024-01-15' THEN user_id END) as dau,
COUNT(DISTINCT CASE WHEN dt >= '2024-01-09' THEN user_id END) as wau,
COUNT(DISTINCT CASE WHEN dt >= '2024-01-01' THEN user_id END) as mau,
AVG(session_duration) as avg_session_duration,
AVG(page_views) as avg_pages_per_user
FROM (
SELECT
user_id,
dt,
SUM(view_count) as page_views,
-- 估算停留时长(假设每次浏览平均 30 秒)
SUM(view_count) * 30 as session_duration
FROM dws_user_action_daily
WHERE dt >= '2024-01-01'
GROUP BY user_id, dt
) t;

案例2:处理数据倾斜

数据倾斜是 Hive 查询中最常见的问题之一,表现为某些任务执行时间远超其他任务。

识别数据倾斜

-- 方法1:查看任务执行情况
-- 在 YARN UI 中观察任务进度,如果某个任务卡在 99%

-- 方法2:检查 Key 分布
SELECT
CASE
WHEN cnt < 1000 THEN '<1K'
WHEN cnt < 10000 THEN '1K-10K'
WHEN cnt < 100000 THEN '10K-100K'
WHEN cnt < 1000000 THEN '100K-1M'
ELSE '>1M'
END as bucket,
COUNT(*) as key_count
FROM (
SELECT key, COUNT(*) as cnt
FROM large_table
GROUP BY key
) t
GROUP BY
CASE
WHEN cnt < 1000 THEN '<1K'
WHEN cnt < 10000 THEN '1K-10K'
WHEN cnt < 100000 THEN '10K-100K'
WHEN cnt < 1000000 THEN '100K-1M'
ELSE '>1M'
END;

解决方案

-- 问题:按 user_id 分组时出现倾斜
SELECT user_id, COUNT(*) as cnt
FROM events
GROUP BY user_id;

-- 方案1:加盐(Salt)
-- 将倾斜 Key 分散到多个 Reduce
SELECT
CASE
WHEN user_id IN ('skew_key1', 'skew_key2')
THEN CONCAT(user_id, '_', CAST(RAND() * 10 AS INT))
ELSE user_id
END as salted_user_id,
COUNT(*) as cnt
FROM events
GROUP BY
CASE
WHEN user_id IN ('skew_key1', 'skew_key2')
THEN CONCAT(user_id, '_', CAST(RAND() * 10 AS INT))
ELSE user_id
END;

-- 方案2:Map Join 处理小表
-- 如果一方是小表
SET hive.auto.convert.join=true;
SET hive.auto.convert.join.noconditionaltask.size=100000000;

SELECT /*+ MAPJOIN(small_dim) */
e.user_id,
d.user_name,
COUNT(*) as cnt
FROM events e
JOIN user_dim d ON e.user_id = d.user_id
GROUP BY e.user_id, d.user_name;

-- 方案3:Skew Join
-- 启用倾斜 Join 优化
SET hive.optimize.skewjoin=true;
SET hive.skewjoin.key=100000; -- 超过 10 万条记录视为倾斜

-- 方案4:两阶段聚合
-- 先局部聚合,再全局聚合
SELECT user_id, SUM(cnt) as total_cnt
FROM (
SELECT user_id, COUNT(*) as cnt
FROM events
GROUP BY user_id, CAST(RAND() * 100 AS INT) -- 局部聚合
) t
GROUP BY user_id; -- 全局聚合

案例3:小文件治理

大量小文件会严重影响 HDFS 性能和 Hive 查询效率。

检测小文件

-- 查看表的文件数量和大小
DESCRIBE FORMATTED table_name;

-- 查看分区文件数量
SELECT
partition_name,
COUNT(*) as file_count,
SUM(file_size) / 1024 / 1024 as total_size_mb,
AVG(file_size) / 1024 / 1024 as avg_file_size_mb
FROM (
SELECT
CONCAT(year, '-', month) as partition_name,
file_name,
file_size
FROM table_name -- 这里需要使用系统表
) t
GROUP BY partition_name;

解决方案

-- 方案1:启用合并
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000; -- 目标文件大小 256MB
SET hive.merge.smallfiles.avgsize=16000000; -- 小于 16MB 触发合并

-- 方案2:使用 DISTRIBUTE BY 控制输出
INSERT OVERWRITE TABLE target_table
SELECT * FROM source_table
DISTRIBUTE BY CAST(RAND() * 10 AS INT); -- 控制输出文件数量

-- 方案3:定期合并
-- 对于已存在的小文件,使用 INSERT OVERWRITE 重建
INSERT OVERWRITE TABLE table_name PARTITION(year, month)
SELECT * FROM table_name;

-- 方案4:使用 Tez 或 Spark 引擎
-- Tez/Spark 对小文件处理更优化
SET hive.execution.engine=tez;

案例4:增量数据处理

处理每天新增的增量数据,确保数据一致性。

-- 场景:每日增量订单数据合并到主表

-- 1. 创建增量表
CREATE TABLE orders_incremental (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DECIMAL(12, 2),
status STRING,
update_time TIMESTAMP
)
PARTITIONED BY (dt DATE)
STORED AS ORC;

-- 2. 创建主表(支持事务)
CREATE TABLE orders_master (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DECIMAL(12, 2),
status STRING,
update_time TIMESTAMP
)
CLUSTERED BY (order_id) INTO 16 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');

-- 3. 合并增量数据
-- 方式 A:使用 MERGE 语句(Hive 3.0+)
MERGE INTO orders_master m
USING (
SELECT * FROM orders_incremental
WHERE dt = '2024-01-15'
) i
ON m.order_id = i.order_id
WHEN MATCHED THEN UPDATE SET
m.status = i.status,
m.amount = i.amount,
m.update_time = i.update_time
WHEN NOT MATCHED THEN INSERT VALUES (
i.order_id, i.customer_id, i.order_time,
i.amount, i.status, i.update_time
);

-- 方式 B:使用 UNION ALL + ROW_NUMBER 去重(适用非事务表)
INSERT OVERWRITE TABLE orders_master
SELECT order_id, customer_id, order_time, amount, status, update_time
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY update_time DESC) as rn
FROM (
SELECT * FROM orders_master
UNION ALL
SELECT * FROM orders_incremental WHERE dt = '2024-01-15'
) t
) t2
WHERE rn = 1;

小结

本章详细介绍了 Hive 数据仓库的核心概念和使用方法:

  1. Hive 架构:用户接口、元数据存储、驱动器、执行引擎的协作
  2. 数据模型:内部表、外部表、分区表、分桶表的设计和使用
  3. 存储格式:文本格式、ORC、Parquet 的特点和选择
  4. HiveQL 查询:基本查询、聚合、Join、排序、子查询
  5. 窗口函数:聚合窗口、排名函数、值访问函数的详细使用
  6. 自定义函数:UDF、GenericUDF、UDAF、UDTF 的开发方法
  7. 性能调优:执行引擎、并行执行、存储优化、查询优化
  8. 数据仓库建模:维度建模、星型模型、缓慢变化维度、分层架构
  9. 事务支持:ACID 事务的配置和使用

Hive 是大数据离线分析的核心工具,掌握 Hive 对于构建企业级数据仓库至关重要。在实际项目中,Hive 常与 Spark、Flink、Kafka 等组件配合使用,构建完整的批流一体数据平台。

参考资源