Hive 数据仓库
Apache Hive 是构建在 Hadoop 之上的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类 SQL 查询功能。Hive 让不熟悉 MapReduce 的开发人员也能进行大数据分析。
Hive 概述
什么是 Hive?
Hive 最初由 Facebook 开发,用于处理海量的社交网络数据。它提供了一种类似 SQL 的查询语言——HiveQL(HQL),将 SQL 语句转换为 MapReduce 或 Tez、Spark 任务执行。
Hive 的设计目标是让分析师能够用熟悉的 SQL 语法来处理存储在 Hadoop 中的海量数据,而无需编写复杂的 MapReduce 程序。Hive 并不是一个真正的数据库,它更像是一个"SQL-on-Hadoop"的查询引擎,底层依赖 HDFS 进行存储,依赖 MapReduce、Tez 或 Spark 进行计算。
Hive 核心特点
| 特点 | 说明 |
|---|---|
| 类 SQL 语法 | 降低大数据分析门槛,分析师可快速上手 |
| 可扩展 | 支持自定义函数(UDF、UDAF、UDTF) |
| 多引擎支持 | 可运行在 MapReduce、Tez、Spark 上 |
| 元数据管理 | 统一的元数据存储,支持多工具共享 |
| 大数据处理 | 支持处理 PB 级别数据 |
Hive 与传统数据库对比
理解 Hive 与传统关系型数据库的区别,有助于正确选择技术方案:
| 维度 | Hive | 传统数据库 |
|---|---|---|
| 数据规模 | TB 到 PB 级 | GB 到 TB 级 |
| 查询延迟 | 分钟到小时 | 毫秒到秒 |
| 数据更新 | 不支持频繁更新,适合追加写入 | 支持事务,频繁增删改 |
| 索引 | 有限支持 | 完善的索引机制 |
| 适用场景 | 离线分析、数据仓库、ETL | OLTP 事务处理 |
| 扩展性 | 水平扩展,可添加节点 | 垂直扩展为主 |
| 数据存储 | HDFS,分布式存储 | 本地文件系统或共享存储 |
Hive 不适合需要低延迟响应的在线业务,也不适合频繁更新的场景。它的强项是批量处理海量数据,进行复杂的分析和聚合计算。
Hive 版本演进
| 版本 | 发布时间 | 重要特性 |
|---|---|---|
| Hive 0.x | 2008-2012 | 基础 SQL 支持、MapReduce 执行引擎 |
| Hive 0.13 | 2014 | ACID 事务支持 |
| Hive 1.x | 2015 | Hive on Spark、Cost-Based Optimizer |
| Hive 2.x | 2016 | LLAP(低延迟分析处理)、HiveStreaming |
| Hive 3.x | 2018 | 物化视图增强、LLAP 改进 |
| Hive 4.0 | 2024 | Iceberg 深度集成、原生地理空间支持、性能优化、超过 5000 项改进 |
Hive 4.0 重要更新:
Hive 4.0 是自 2018 年 Hive 3.0 发布以来的首个主要版本,包含超过 5000 项提交,是一次重大更新:
1. Iceberg 深度集成
这是 Hive 4.0 最重要的特性。Apache Iceberg 是一种开放的数据表格式,专为大规模数据分析设计。Hive 4.0 与 Iceberg 的集成包括:
- 原生支持创建和管理 Iceberg 表
- 支持
OPTIMIZE TABLE语法进行数据压缩 - 支持快照管理(过期快照、删除孤立文件)
- 支持 Copy-on-Write 和 Merge-on-Read 两种写入模式
- 支持分区操作(插入、截断、删除分区)
-- 创建 Iceberg 表
CREATE TABLE iceberg_table (
id INT,
name STRING,
event_time TIMESTAMP
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES (
'format-version' = '2',
'write.format.default' = 'parquet'
);
-- 优化 Iceberg 表(合并小文件)
OPTIMIZE TABLE iceberg_table;
-- 过期旧快照
ALTER TABLE iceberg_table EXECUTE EXPIRE_SNAPSHOTS('2024-01-01 00:00:00');
2. 原生地理空间支持
Hive 4.0 引入了原生地理空间函数,无需额外安装 ESRI UDF:
-- 创建包含地理数据的表
CREATE TABLE locations (
id INT,
name STRING,
point GEOMETRY,
polygon GEOMETRY
);
-- 地理空间查询
SELECT
name,
ST_Distance(point, ST_Point(116.4074, 39.9042)) as distance
FROM locations
WHERE ST_Within(point, ST_Polygon(...));
-- 支持的地理空间函数
ST_Point(x, y) -- 创建点
ST_Polygon(...) -- 创建多边形
ST_Distance(g1, g2) -- 计算距离
ST_Within(g1, g2) -- 判断是否包含
ST_Intersection(g1, g2) -- 计算交集
ST_Buffer(g, distance) -- 创建缓冲区
3. 性能优化
- 改进的查询优化器
- 更高效的元数据操作
- 减少文件系统调用
- 改进的向量化执行
4. 其他重要改进
- 新增
typeofUDF,返回表达式的数据类型 - 改进的日期时间解析
- 更好的 Parquet 和 ORC 支持
- 升级依赖库版本(Hadoop 3.3.6、Tez 0.10.3、ORC 1.8.5)
Hive 架构
Hive 采用经典的客户端-服务器架构,主要组件包括:
用户接口(Client Interface)
Hive 提供多种用户交互方式:
- CLI(命令行接口):传统的 Hive 命令行工具,直接与 Hive 服务交互
- Beeline:基于 JDBC 的命令行工具,通过 Thrift 协议连接 HiveServer2
- JDBC/ODBC:标准数据库连接接口,支持各种 BI 工具和应用集成
- Web GUI:通过 Hue 等工具提供 Web 界面
元数据存储(Metastore)
Metastore 是 Hive 的核心组件,存储所有表的元数据信息:
- 表定义:表名、列名、数据类型、注释
- 存储信息:数据位置、文件格式、SerDe 配置
- 分区信息:分区列、分区值、分区位置
- 统计信息:行数、文件数、数据大小
Metastore 通常使用 MySQL、PostgreSQL 或 Derby 作为后端存储。元数据独立存储的好处是可以被多个工具共享,比如 Spark、Presto 等都可以访问 Hive 的元数据。
驱动器(Driver)
Driver 是 Hive 的核心执行引擎,负责整个查询的生命周期管理:
- 解析器(Parser):将 SQL 字符串解析为抽象语法树(AST)
- 语义分析器(Semantic Analyzer):将 AST 转换为查询块,检查表和列是否存在
- 逻辑计划生成器:生成逻辑执行计划
- 优化器(Optimizer):优化执行计划,包括谓词下推、列裁剪、Join 优化等
- 物理计划生成器:将逻辑计划转换为物理执行计划
- 执行器(Executor):将任务提交给执行引擎运行
执行引擎
Hive 支持多种执行引擎:
| 引擎 | 特点 | 适用场景 |
|---|---|---|
| MapReduce | 稳定可靠,但性能较低 | 兼容性要求高的场景 |
| Tez | DAG 执行模型,性能优于 MR | 大多数生产环境 |
| Spark | 内存计算,性能最佳 | 需要快速响应的场景 |
LLAP 低延迟分析处理
LLAP(Low Latency Analytical Processing)是 Hive 2.0 引入的关键特性,通过常驻内存的守护进程大幅降低查询延迟,使 Hive 能够支持交互式分析场景。
LLAP 架构
传统 Hive 查询需要启动 Tez 应用,每次查询都有启动开销。LLAP 通过引入常驻后台服务解决了这个问题:
传统模式 vs LLAP 模式:
| 特性 | 传统模式 | LLAP 模式 |
|---|---|---|
| 启动开销 | 每次查询启动应用 | 守护进程常驻 |
| 查询延迟 | 秒到分钟级 | 亚秒到秒级 |
| 资源利用 | 独占资源 | 共享资源池 |
| 缓存 | 无内置缓存 | 内存缓存热数据 |
| 适用场景 | 大批量离线处理 | 交互式分析、BI 报表 |
LLAP 核心组件:
-
LLAP Daemon:运行在每个节点上的常驻进程
- 执行查询片段
- 管理内存缓存
- 处理数据预取
-
LLAP Cache:内存缓存层
- 缓存热点数据
- 支持 ORC/Parquet 列级缓存
- LRU 淘汰策略
-
Query Fragment:查询片段
- 将查询拆分为小片段
- 分发给 LLAP Daemon 执行
LLAP 配置
<!-- 启用 LLAP -->
<property>
<name>hive.llap.execution.mode</name>
<value>all</value> <!-- all, auto, none -->
</property>
<!-- LLAP Daemon 内存 -->
<property>
<name>hive.llap.daemon.memory.mb</name>
<value>16384</value> <!-- 16GB -->
</property>
<!-- LLAP 缓存大小 -->
<property>
<name>hive.llap.io.memory.size</name>
<value>8589934592</value> <!-- 8GB -->
</property>
<!-- LLAP Daemon 数量 -->
<property>
<name>hive.llap.daemon.num.executors</name>
<value>4</value>
</property>
<!-- 启用 IO 电梯(异步 IO) -->
<property>
<name>hive.llap.io.enabled</name>
<value>true</value>
</property>
<!-- 缓存命中率监控 -->
<property>
<name>hive.llap.io.cache.stats.track.cache.hit.ratio</name>
<value>true</value>
</property>
LLAP 使用场景
适合 LLAP 的场景:
- 交互式 BI 报表查询
- 低延迟仪表板
- 探索性数据分析
- 需要快速响应的查询
不太适合 LLAP 的场景:
- 超大规模批量 ETL
- 需要大量 Shuffle 的复杂查询
- 资源受限的小集群
LLAP 监控
-- 查看 LLAP 状态
SET hive.llap.execution.mode;
-- 查看缓存统计
-- 通过 YARN UI 或 Grafana 监控
-- 关键指标:缓存命中率、查询延迟、并发请求数
监控要点:
| 指标 | 说明 | 建议值 |
|---|---|---|
| Cache Hit Ratio | 缓存命中率 | > 80% |
| Average Query Time | 平均查询时间 | < 10s |
| Executor Utilization | 执行器利用率 | 60-80% |
| Memory Usage | 内存使用率 | < 85% |
数据模型
数据库
数据库是表的命名空间,用于组织和管理相关表:
-- 创建数据库
CREATE DATABASE IF NOT EXISTS analytics
COMMENT 'Analytics data warehouse'
LOCATION '/user/hive/warehouse/analytics.db'
WITH DBPROPERTIES (
'creator' = 'data_team',
'created_at' = '2024-01-01'
);
-- 查看数据库列表
SHOW DATABASES;
SHOW DATABASES LIKE 'ana*';
-- 查看数据库详情
DESCRIBE DATABASE analytics;
DESCRIBE DATABASE EXTENDED analytics;
-- 切换数据库
USE analytics;
-- 删除数据库
DROP DATABASE IF EXISTS analytics CASCADE; -- CASCADE 会级联删除所有表
表类型
Hive 支持多种表类型,理解它们的区别对于数据管理至关重要。
内部表(Managed Table)
内部表由 Hive 完全管理数据和元数据。删除表时,数据和元数据都会被删除:
-- 创建内部表
CREATE TABLE employees (
id INT COMMENT 'Employee ID',
name STRING COMMENT 'Employee name',
department STRING COMMENT 'Department name',
salary DECIMAL(10, 2) COMMENT 'Monthly salary',
hire_date DATE COMMENT 'Hire date'
)
COMMENT 'Employee information table'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|'
MAP KEYS TERMINATED BY ':'
NULL DEFINED AS ''
STORED AS TEXTFILE;
-- 查看表结构
DESCRIBE employees;
DESCRIBE FORMATTED employees; -- 更详细的信息
SHOW CREATE TABLE employees; -- 显示建表语句
内部表适合存放中间结果、临时数据或 Hive 独占的数据。
外部表(External Table)
外部表只管理元数据,数据由用户管理。删除表时只删除元数据,数据保留:
-- 创建外部表
CREATE EXTERNAL TABLE access_logs (
log_time TIMESTAMP,
ip_address STRING,
request_url STRING,
status_code INT,
response_size BIGINT
)
COMMENT 'Web server access logs'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data/logs/access' -- 指定数据位置
TBLPROPERTIES (
'external.table.purge' = 'false' -- 删除表时保留数据
);
-- 外部表也可以配置为删除表时同时删除数据
-- ALTER TABLE access_logs SET TBLPROPERTIES ('external.table.purge' = 'true');
外部表适合管理已经存在的数据,或者需要与其他工具共享的数据。
内部表 vs 外部表
| 操作 | 内部表 | 外部表 |
|---|---|---|
| 删除表(DROP TABLE) | 删除元数据和数据 | 只删除元数据,数据保留 |
| 数据管理 | Hive 管理 | 用户管理 |
| 适用场景 | 中间结果、临时数据、独占数据 | 共享数据、原始数据 |
选择原则:如果数据只由 Hive 使用,选择内部表;如果数据需要与其他工具共享,或者需要保留原始数据,选择外部表。
分区表
分区表将数据按分区列的值存储在不同目录中,是 Hive 最重要的性能优化手段之一。
创建分区表
-- 创建分区表
CREATE TABLE sales (
order_id BIGINT,
product_id INT,
customer_id INT,
quantity INT,
amount DECIMAL(12, 2)
)
PARTITIONED BY (year INT, month INT, day INT)
STORED AS PARQUET;
-- 添加分区
ALTER TABLE sales ADD PARTITION (year=2024, month=1, day=1);
ALTER TABLE sales ADD PARTITION (year=2024, month=1, day=2);
-- 批量添加分区
ALTER TABLE sales ADD IF NOT EXISTS
PARTITION (year=2024, month=1, day=1)
PARTITION (year=2024, month=1, day=2)
PARTITION (year=2024, month=1, day=3);
-- 查看分区
SHOW PARTITIONS sales;
SHOW PARTITIONS sales PARTITION(year=2024);
-- 删除分区
ALTER TABLE sales DROP IF EXISTS PARTITION (year=2024, month=1, day=1);
分区查询
-- 分区裁剪:只扫描需要的分区
SELECT * FROM sales
WHERE year=2024 AND month=1;
-- 查看分区信息
DESCRIBE FORMATTED sales PARTITION (year=2024, month=1, day=1);
分区设计原则
分区设计直接影响查询性能:
- 分区列选择:选择经常用于过滤条件的列作为分区列,如日期、地区
- 分区粒度:不要过度分区,每个分区应该有足够的数据量(建议至少 1GB)
- 避免数据倾斜:分区之间的数据量应该相对均衡
分区目录结构示例:
/user/hive/warehouse/sales/
├── year=2024/
│ ├── month=1/
│ │ ├── day=1/
│ │ │ └── data.parquet
│ │ ├── day=2/
│ │ │ └── data.parquet
│ │ └── ...
│ └── month=2/
│ └── ...
└── ...
动态分区插入
当需要根据数据自动创建分区时,使用动态分区:
-- 启用动态分区
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict; -- 允许所有列都是动态分区
-- 动态分区插入
INSERT INTO TABLE sales PARTITION(year, month, day)
SELECT
order_id,
product_id,
customer_id,
quantity,
amount,
year(order_date) as year,
month(order_date) as month,
day(order_date) as day
FROM staging_sales;
动态分区的注意事项:
- 动态分区列必须放在 SELECT 语句的最后
- 动态分区会创建大量分区时,需要调整
hive.exec.max.dynamic.partitions参数
分桶表
分桶表将数据按列的哈希值分散到固定数量的文件中,用于优化 Join 和采样查询。
-- 创建分桶表
CREATE TABLE users_bucketed (
user_id BIGINT,
username STRING,
email STRING,
registration_date DATE,
country STRING
)
CLUSTERED BY (user_id) INTO 16 BUCKETS -- 按 user_id 哈希分 16 个桶
SORTED BY (user_id) -- 桶内按 user_id 排序
STORED AS ORC;
-- 插入数据到分桶表(需要启用分桶)
SET hive.enforce.bucketing=true; -- 或 SET hive.strict.checks.bucketing=false
INSERT INTO TABLE users_bucketed
SELECT * FROM users_staging;
分桶表的优势:
- 优化 Join:两个表按相同列分桶,可以进行桶间 Join,避免全表 Shuffle
- 高效采样:可以只读取部分桶进行采样分析
- 数据均匀分布:避免数据倾斜
存储格式
选择合适的存储格式对性能影响巨大:
文本格式(TEXTFILE)
-- 文本格式是默认格式,适合导入导出
CREATE TABLE text_table (
id INT,
name STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
优点:可读性好,便于调试 缺点:占用空间大,查询性能差
行列式存储(ORC/Parquet)
-- ORC 格式(Hive 原生优化格式)
CREATE TABLE orc_table (
id INT,
name STRING,
amount DECIMAL(10, 2)
)
STORED AS ORC
TBLPROPERTIES (
'orc.compress' = 'SNAPPY', -- 压缩算法
'orc.create.index' = 'true', -- 创建索引
'orc.stripe.size' = '67108864' -- 条带大小
);
-- Parquet 格式(跨平台兼容性好)
CREATE TABLE parquet_table (
id INT,
name STRING,
amount DECIMAL(10, 2)
)
STORED AS PARQUET
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY',
'parquet.block.size' = '134217728'
);
ORC 与 Parquet 对比:
| 特性 | ORC | Parquet |
|---|---|---|
| 压缩率 | 更高 | 高 |
| 查询性能 | 更优(Hive) | 优(多平台) |
| 生态支持 | Hive 原生 | Spark、Impala 等 |
| 索引支持 | 支持 | 有限 |
| 推荐场景 | Hive 主导的分析 | 多引擎混用场景 |
数据类型
基本数据类型
| 类型 | 说明 | 示例 |
|---|---|---|
| TINYINT | 1 字节整数 | 10Y |
| SMALLINT | 2 字节整数 | 1000S |
| INT | 4 字节整数 | 100000 |
| BIGINT | 8 字节整数 | 1000000000L |
| FLOAT | 单精度浮点 | 3.14F |
| DOUBLE | 双精度浮点 | 3.14159 |
| DECIMAL(p,s) | 精确数值 | DECIMAL(10,2) |
| STRING | 变长字符串 | 'hello' |
| VARCHAR(n) | 有长度限制的字符串 | VARCHAR(100) |
| CHAR(n) | 定长字符串 | CHAR(10) |
| BOOLEAN | 布尔值 | TRUE/FALSE |
| DATE | 日期 | '2024-01-01' |
| TIMESTAMP | 时间戳 | '2024-01-01 12:00:00' |
| BINARY | 二进制数据 | cast('abc' as binary) |
复杂数据类型
-- ARRAY:数组类型
CREATE TABLE array_example (
id INT,
tags ARRAY<STRING>,
scores ARRAY<INT>
);
-- 插入数据
INSERT INTO array_example VALUES
(1, array('tag1', 'tag2', 'tag3'), array(90, 85, 92));
-- 查询数组元素
SELECT id, tags[0] as first_tag, size(tags) as tag_count
FROM array_example;
-- 展开数组(LATERAL VIEW)
SELECT id, tag
FROM array_example
LATERAL VIEW explode(tags) t AS tag;
-- MAP:映射类型
CREATE TABLE map_example (
id INT,
properties MAP<STRING, STRING>,
metrics MAP<STRING, INT>
);
-- 插入数据
INSERT INTO map_example VALUES
(1, map('color', 'red', 'size', 'large'), map('views', 1000, 'clicks', 50));
-- 查询 Map 元素
SELECT id, properties['color'] as color, metrics['views'] as views
FROM map_example;
-- 获取所有 Key 和 Value
SELECT id, map_keys(properties) as keys, map_values(properties) as values
FROM map_example;
-- 展开Map
SELECT id, key, value
FROM map_example
LATERAL VIEW explode(properties) p AS key, value;
-- STRUCT:结构体类型
CREATE TABLE struct_example (
id INT,
address STRUCT<street:STRING, city:STRING, zip:STRING>,
location STRUCT<lat:DOUBLE, lon:DOUBLE>
);
-- 插入数据
INSERT INTO struct_example VALUES
(1, named_struct('street', 'Main St', 'city', 'Beijing', 'zip', '100000'),
named_struct('lat', 39.9042, 'lon', 116.4074));
-- 查询结构体字段
SELECT id, address.city, address.zip, location.lat, location.lon
FROM struct_example;
-- 嵌套复杂类型
CREATE TABLE nested_example (
id INT,
user_profile STRUCT<
name:STRING,
contacts:ARRAY<STRUCT<type:STRING, value:STRING>>,
preferences:MAP<STRING, STRING>
>
);
HiveQL 查询
基本查询
-- 选择列(列裁剪)
SELECT name, department, salary FROM employees;
-- WHERE 条件过滤
SELECT * FROM employees
WHERE salary > 50000
AND department IN ('Engineering', 'Sales');
-- DISTINCT 去重
SELECT DISTINCT department FROM employees;
-- LIMIT 限制结果
SELECT * FROM employees LIMIT 10;
-- 列别名
SELECT
name AS employee_name,
salary * 12 AS annual_salary
FROM employees;
-- CASE WHEN 条件表达式
SELECT
name,
salary,
CASE
WHEN salary >= 100000 THEN 'Senior'
WHEN salary >= 50000 THEN 'Mid-level'
ELSE 'Junior'
END AS level,
CASE department
WHEN 'Engineering' THEN 'Tech'
WHEN 'Sales' THEN 'Business'
ELSE 'Other'
END AS dept_category
FROM employees;
-- NULL 处理
SELECT
name,
COALESCE(bonus, 0) AS bonus, -- 返回第一个非空值
NVL(commission, 0) AS commission, -- NULL 替换
NULLIF(salary, 0) AS salary_check -- 如果相等返回 NULL
FROM employees;
聚合函数
-- 基本聚合
SELECT
COUNT(*) AS total_count,
COUNT(DISTINCT department) AS dept_count,
SUM(salary) AS total_salary,
AVG(salary) AS avg_salary,
MAX(salary) AS max_salary,
MIN(salary) AS min_salary,
STDDEV(salary) AS salary_stddev,
VARIANCE(salary) AS salary_var
FROM employees;
-- GROUP BY 分组
SELECT
department,
COUNT(*) AS emp_count,
AVG(salary) AS avg_salary,
MAX(salary) AS max_salary,
MIN(salary) AS min_salary
FROM employees
GROUP BY department;
-- HAVING 过滤分组结果
SELECT
department,
COUNT(*) AS emp_count,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department
HAVING COUNT(*) > 10 AND AVG(salary) > 50000;
-- GROUPING SETS:多维度分组
SELECT
department,
job_level,
COUNT(*) AS emp_count,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department, job_level
GROUPING SETS (
(department, job_level), -- 按部门和级别分组
(department), -- 只按部门分组
(job_level), -- 只按级别分组
() -- 总计
);
-- CUBE:所有可能的组合
SELECT
department,
job_level,
COUNT(*) AS emp_count
FROM employees
GROUP BY department, job_level WITH CUBE;
-- ROLLUP:层级聚合
SELECT
department,
job_level,
COUNT(*) AS emp_count
FROM employees
GROUP BY department, job_level WITH ROLLUP;
-- 等价于 GROUPING SETS ((department, job_level), (department), ())
JOIN 操作
Hive 支持多种 Join 类型,理解它们的使用场景很重要:
-- 内连接(INNER JOIN)
SELECT e.name, e.department, d.location
FROM employees e
INNER JOIN departments d ON e.department = d.dept_name;
-- 左外连接(LEFT OUTER JOIN)
SELECT e.name, d.dept_name
FROM employees e
LEFT JOIN departments d ON e.department = d.dept_name;
-- 右外连接(RIGHT OUTER JOIN)
SELECT e.name, d.dept_name
FROM employees e
RIGHT JOIN departments d ON e.department = d.dept_name;
-- 全外连接(FULL OUTER JOIN)
SELECT e.name, d.dept_name
FROM employees e
FULL OUTER JOIN departments d ON e.department = d.dept_name;
-- 左半连接(LEFT SEMI JOIN):相当于 IN 子查询
SELECT e.name, e.department
FROM employees e
LEFT SEMI JOIN departments d ON e.department = d.dept_name;
-- 等价于:WHERE e.department IN (SELECT dept_name FROM departments)
-- 交叉连接(CROSS JOIN):笛卡尔积
SELECT e.name, d.dept_name
FROM employees e
CROSS JOIN departments d;
Map Join 优化
当 Join 的一方是小表时,可以使用 Map Join 避免 Reduce 阶段:
-- 手动指定 Map Join
SELECT /*+ MAPJOIN(d) */
e.name,
e.department,
d.location
FROM employees e
JOIN departments d ON e.department = d.dept_name;
-- 自动 Map Join(推荐)
SET hive.auto.convert.join=true;
SET hive.auto.convert.join.noconditionaltask=true;
SET hive.auto.convert.join.noconditionaltask.size=10000000; -- 10MB
Join 优化技巧
-- 大表 Join 大表时的优化
-- 1. 过滤条件尽早应用
SELECT e.name, d.dept_name
FROM (SELECT * FROM employees WHERE active = 1) e
JOIN (SELECT * FROM departments WHERE status = 'active') d
ON e.department = d.dept_name;
-- 2. 使用 STREAMTABLE 提示
-- 默认 Hive 会尝试缓存右表,如果左表更大则需要指定
SELECT /*+ STREAMTABLE(e) */ e.name, d.dept_name
FROM employees e
JOIN departments d ON e.department = d.dept_name;
排序
-- ORDER BY:全局排序,只有一个 Reducer
SELECT * FROM employees ORDER BY salary DESC;
-- SORT BY:每个 Reducer 内部排序,非全局有序
SELECT * FROM employees SORT BY salary DESC;
-- DISTRIBUTE BY + SORT BY:控制分区和排序
-- 按 department 分发到不同 Reducer,每个 Reducer 内按 salary 排序
SELECT * FROM employees
DISTRIBUTE BY department
SORT BY salary DESC;
-- CLUSTER BY:等于 DISTRIBUTE BY + SORT BY(同一列)
SELECT * FROM employees CLUSTER BY department;
-- 等价于 DISTRIBUTE BY department SORT BY department
ORDER BY 与 SORT BY 的区别:
| 特性 | ORDER BY | SORT BY |
|---|---|---|
| 排序范围 | 全局排序 | Reducer 内排序 |
| Reducer 数量 | 只有 1 个 | 可以多个 |
| 性能 | 数据量大时慢 | 较快 |
| 结果有序性 | 完全有序 | 部分有序 |
子查询
-- WHERE 子查询
SELECT * FROM employees
WHERE department IN (
SELECT dept_name FROM departments WHERE location = 'Beijing'
);
-- FROM 子查询
SELECT dept, avg_salary
FROM (
SELECT
department AS dept,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department
) t
WHERE avg_salary > 50000;
-- WITH 子句(CTE,Common Table Expression)
WITH
dept_stats AS (
SELECT department, AVG(salary) AS avg_salary
FROM employees
GROUP BY department
),
high_salary_depts AS (
SELECT department FROM dept_stats WHERE avg_salary > 80000
)
SELECT e.name, e.department, e.salary
FROM employees e
JOIN high_salary_depts h ON e.department = h.department;
-- EXISTS 子查询
SELECT name FROM employees e
WHERE EXISTS (
SELECT 1 FROM departments d
WHERE d.dept_name = e.department
AND d.location = 'Beijing'
);
窗口函数
窗口函数是 Hive 中最强大的分析功能之一,它可以在不减少行数的情况下进行聚合计算。窗口函数也叫 OLAP 函数,非常适合数据分析场景。
基本语法
function_name([expression]) OVER (
[PARTITION BY partition_expression]
[ORDER BY sort_expression [ASC|DESC]]
[frame_clause]
)
- PARTITION BY:将数据分组,窗口函数在每个分组内独立计算
- ORDER BY:在分组内排序,影响某些窗口函数的计算结果
- frame_clause:定义窗口范围,如
ROWS BETWEEN ... AND ...
聚合窗口函数
-- 累计求和
SELECT
name,
department,
salary,
SUM(salary) OVER (PARTITION BY department ORDER BY hire_date) AS running_total
FROM employees;
-- 移动平均
SELECT
order_date,
amount,
AVG(amount) OVER (
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7days
FROM daily_sales;
-- 分组内聚合
SELECT
name,
department,
salary,
MAX(salary) OVER (PARTITION BY department) AS dept_max_salary,
AVG(salary) OVER (PARTITION BY department) AS dept_avg_salary,
salary - AVG(salary) OVER (PARTITION BY department) AS diff_from_avg
FROM employees;
-- 去重计数
SELECT
department,
COUNT(DISTINCT employee_id) OVER (PARTITION BY department) AS unique_employees
FROM employees;
排名函数
-- ROW_NUMBER:行号,连续递增,不重复
SELECT
name,
department,
salary,
ROW_NUMBER() OVER (ORDER BY salary DESC) AS overall_rank,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank
FROM employees;
-- RANK:排名,相同值排名相同,跳过后续排名
SELECT
name,
salary,
RANK() OVER (ORDER BY salary DESC) AS salary_rank
FROM employees;
-- 结果示例:1, 2, 2, 4, 5(两个第2名后跳到第4名)
-- DENSE_RANK:排名,相同值排名相同,不跳过
SELECT
name,
salary,
DENSE_RANK() OVER (ORDER BY salary DESC) AS salary_rank
FROM employees;
-- 结果示例:1, 2, 2, 3, 4(两个第2名后是第3名)
-- NTILE:将数据分成 N 组
SELECT
name,
salary,
NTILE(4) OVER (ORDER BY salary DESC) AS quartile -- 分成4组
FROM employees;
-- PERCENT_RANK:百分比排名
SELECT
name,
salary,
PERCENT_RANK() OVER (ORDER BY salary) AS pct_rank -- 0 到 1 之间
FROM employees;
-- CUME_DIST:累积分布
SELECT
name,
salary,
CUME_DIST() OVER (ORDER BY salary) AS cumulative_dist -- 小于等于当前值的比例
FROM employees;
排名函数对比:
| 函数 | 说明 | 示例(相同值) |
|---|---|---|
| ROW_NUMBER | 连续递增编号 | 1, 2, 3, 4, 5 |
| RANK | 相同值相同排名,跳过 | 1, 2, 2, 4, 5 |
| DENSE_RANK | 相同值相同排名,不跳过 | 1, 2, 2, 3, 4 |
值访问函数
-- LEAD:访问后续行
SELECT
order_date,
amount,
LEAD(amount, 1) OVER (ORDER BY order_date) AS next_day_amount,
LEAD(amount, 3, 0) OVER (ORDER BY order_date) AS amount_3_days_later -- 默认值为 0
FROM daily_sales;
-- LAG:访问前行
SELECT
order_date,
amount,
LAG(amount, 1) OVER (ORDER BY order_date) AS prev_day_amount,
amount - LAG(amount, 1) OVER (ORDER BY order_date) AS daily_change
FROM daily_sales;
-- FIRST_VALUE:分组内第一个值
SELECT
name,
department,
salary,
FIRST_VALUE(salary) OVER (PARTITION BY department ORDER BY hire_date) AS first_hire_salary
FROM employees;
-- LAST_VALUE:分组内最后一个值
SELECT
name,
department,
salary,
LAST_VALUE(salary) OVER (
PARTITION BY department
ORDER BY hire_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS last_hire_salary
FROM employees;
窗口范围定义
-- ROWS:基于行数
SELECT
order_date,
amount,
AVG(amount) OVER (
ORDER BY order_date
ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
) AS moving_avg
FROM daily_sales;
-- RANGE:基于值范围(通常用于日期)
SELECT
order_date,
amount,
SUM(amount) OVER (
ORDER BY order_date
RANGE BETWEEN INTERVAL '7' DAY PRECEDING AND CURRENT ROW
) AS rolling_7day_sum
FROM daily_sales;
-- 窗口范围选项
-- UNBOUNDED PRECEDING:从分区的第一行开始
-- n PRECEDING:从当前行向前 n 行
-- CURRENT ROW:当前行
-- n FOLLOWING:从当前行向后 n 行
-- UNBOUNDED FOLLOWING:到分区的最后一行结束
WINDOW 子句
当多个窗口函数使用相同的窗口定义时,可以使用 WINDOW 子句简化:
SELECT
name,
department,
salary,
SUM(salary) OVER w AS total,
AVG(salary) OVER w AS average,
ROW_NUMBER() OVER w AS row_num
FROM employees
WINDOW w AS (PARTITION BY department ORDER BY hire_date);
数据导入导出
数据导入
-- 从本地文件导入
LOAD DATA LOCAL INPATH '/local/path/data.csv'
INTO TABLE employees;
-- 从 HDFS 导入
LOAD DATA INPATH '/hdfs/path/data.csv'
INTO TABLE employees;
-- 覆盖导入
LOAD DATA LOCAL INPATH '/local/path/data.csv'
OVERWRITE INTO TABLE employees;
-- 导入到分区表
LOAD DATA LOCAL INPATH '/local/path/data.csv'
INTO TABLE sales PARTITION(year=2024, month=1);
-- INSERT 导入
INSERT INTO TABLE employees VALUES
(1, 'Alice', 'Engineering', 80000, '2024-01-15'),
(2, 'Bob', 'Sales', 70000, '2024-01-20');
-- INSERT OVERWRITE 覆盖
INSERT OVERWRITE TABLE employees
SELECT * FROM temp_employees;
-- 多表插入
FROM source_table
INSERT INTO TABLE target1 SELECT col1, col2
INSERT INTO TABLE target2 SELECT col3, col4
INSERT OVERWRITE TABLE target3 SELECT col5, col6;
-- 动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE sales PARTITION(year, month)
SELECT
order_id,
product_id,
amount,
year(order_date) as year,
month(order_date) as month
FROM staging_sales;
数据导出
-- 导出到本地
INSERT OVERWRITE LOCAL DIRECTORY '/local/path/output'
SELECT * FROM employees;
-- 导出到 HDFS
INSERT OVERWRITE DIRECTORY '/hdfs/path/output'
SELECT * FROM employees;
-- 指定格式导出
INSERT OVERWRITE LOCAL DIRECTORY '/local/path/output'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|'
MAP KEYS TERMINATED BY ':'
NULL DEFINED AS ''
SELECT * FROM employees;
-- 使用 hive 命令导出
-- hive -e "SELECT * FROM employees" > output.txt
-- hive -f query.hql > output.txt
导出工具
# 使用 sqoop 导出到关系数据库
sqoop export \
--connect jdbc:mysql://localhost:3306/target_db \
--username root \
--password password \
--table target_table \
--export-dir /user/hive/warehouse/source_table \
--input-fields-terminated-by '\001'
视图与物化视图
普通视图
视图是虚拟表,不存储数据,每次查询时动态计算:
-- 创建视图
CREATE VIEW IF NOT EXISTS high_salary_employees
COMMENT 'Employees with salary above 80000'
AS
SELECT id, name, department, salary
FROM employees
WHERE salary > 80000;
-- 创建带列别名的视图
CREATE VIEW dept_salary_stats (dept, emp_count, avg_salary, max_salary)
AS
SELECT
department,
COUNT(*),
AVG(salary),
MAX(salary)
FROM employees
GROUP BY department;
-- 查询视图
SELECT * FROM high_salary_employees WHERE department = 'Engineering';
-- 查看视图定义
SHOW CREATE TABLE high_salary_employees;
DESCRIBE EXTENDED high_salary_employees;
-- 删除视图
DROP VIEW IF EXISTS high_salary_employees;
视图的使用场景:
- 简化复杂查询
- 提供数据安全层(隐藏敏感列)
- 统一数据访问接口
物化视图
物化视图存储查询结果,可以增量更新。与普通视图不同,物化视图实际存储数据,查询时直接读取预计算结果,大幅提升复杂聚合查询的性能。
创建物化视图
-- 创建物化视图
CREATE MATERIALIZED VIEW order_summary
DISABLE REWRITE -- 初始禁用查询重写
AS
SELECT
product_id,
COUNT(*) as order_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount
FROM orders
GROUP BY product_id;
-- 查看物化视图
SHOW MATERIALIZED VIEWS;
DESCRIBE FORMATTED order_summary;
查询重写
物化视图最大的价值在于自动查询重写。当用户查询的聚合粒度比物化视图更粗时,优化器可以自动使用物化视图:
-- 启用查询重写
ALTER MATERIALIZED VIEW order_summary ENABLE REWRITE;
-- 此时,以下查询会自动使用物化视图
-- 原本需要扫描 orders 全表,现在只需扫描物化视图
SELECT
SUM(order_count) as total_orders,
SUM(total_amount) as grand_total
FROM order_summary;
-- 查看是否使用了物化视图
EXPLAIN SELECT SUM(order_count) FROM order_summary;
增量刷新
当源表数据变化时,物化视图需要刷新。Hive 支持多种刷新策略:
-- 手动完全刷新(重建整个物化视图)
ALTER MATERIALIZED VIEW order_summary REBUILD;
-- 增量刷新(仅更新变化的数据)
-- 需要源表是 ACID 表,且物化视图支持增量刷新
ALTER MATERIALIZED VIEW order_summary REBUILD
WITH INCREMENTAL;
-- 查看刷新状态
DESCRIBE FORMATTED order_summary;
增量刷新的限制:
- 源表必须是 ACID 事务表
- 物化视图的查询不能包含某些复杂操作
- 删除操作可能触发全量刷新
物化视图最佳实践
-- 1. 选择高频查询创建物化视图
-- 分析查询日志,找出重复执行的聚合查询
-- 2. 合理选择聚合粒度
-- 粒度太细:存储开销大,刷新慢
-- 粒度太粗:查询重写机会少
CREATE MATERIALIZED VIEW sales_daily_summary
AS
SELECT
DATE(order_time) as order_date,
product_category,
store_region,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
GROUP BY DATE(order_time), product_category, store_region;
-- 3. 定时刷新
-- 可以通过脚本或调度工具定期刷新
-- crontab: 0 */4 * * * hive -e "ALTER MATERIALIZED VIEW order_summary REBUILD;"
-- 4. 监控刷新时间
-- 如果刷新时间过长,考虑重新设计物化视图
使用场景
| 场景 | 说明 |
|---|---|
| BI 报表 | 预计算常见报表指标 |
| 实时仪表板 | 定期刷新的汇总数据 |
| 数据分析 | 加速探索性分析 |
| 数据服务 | 提供快速查询响应 |
Iceberg 表集成
Apache Iceberg 是一种开放的数据表格式,专为大规模数据分析设计。Hive 4.0 提供了原生 Iceberg 支持。
为什么选择 Iceberg?
与传统 Hive 表相比,Iceberg 提供了更多高级特性:
| 特性 | 传统 Hive 表 | Iceberg 表 |
|---|---|---|
| 模式演进 | 需要重写数据 | 支持添加/删除/重命名列 |
| 分区演进 | 需要重写数据 | 动态调整分区策略 |
| 时间旅行 | 不支持 | 支持快照查询 |
| 回滚 | 不支持 | 支持快照回滚 |
| 小文件 | 需要手动合并 | 自动合并 |
| 并发写入 | 有限支持 | 完整支持 |
创建 Iceberg 表
-- 创建 Iceberg 表
CREATE TABLE iceberg_orders (
order_id BIGINT,
customer_id INT,
product_id INT,
order_time TIMESTAMP,
amount DECIMAL(12, 2)
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES (
'format-version' = '2', -- Iceberg 格式版本
'write.format.default' = 'parquet', -- 数据文件格式
'write.target-file-size-bytes' = '134217728' -- 目标文件大小 128MB
);
-- 创建分区表
CREATE TABLE iceberg_events (
event_id BIGINT,
event_type STRING,
event_time TIMESTAMP,
user_id BIGINT,
event_data STRING
)
PARTITIONED BY ( -- Iceberg 支持隐藏分区
days(event_time), -- 按天分区
bucket(16, user_id) -- 按 user_id 哈希分桶
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES (
'format-version' = '2',
'write.format.default' = 'parquet'
);
时间旅行查询
Iceberg 支持查询历史快照,这对于数据审计和问题排查非常有用:
-- 查看表快照历史
SELECT * FROM iceberg_orders.snapshots;
-- 查询特定快照
SELECT * FROM iceberg_orders
FOR SYSTEM_TIME AS OF '2024-01-15 10:00:00';
-- 查询特定快照 ID
SELECT * FROM iceberg_orders
FOR SYSTEM_VERSION AS OF 1234567890123456789;
-- 比较两个快照的差异
SELECT * FROM iceberg_orders
FOR SYSTEM_TIME AS OF '2024-01-15 10:00:00'
EXCEPT
SELECT * FROM iceberg_orders
FOR SYSTEM_TIME AS OF '2024-01-14 10:00:00';
快照管理
Iceberg 表会累积历史快照,需要定期清理:
-- 过期旧快照(保留最近 7 天)
ALTER TABLE iceberg_orders EXECUTE EXPIRE_SNAPSHOTS('2024-01-08 00:00:00');
-- 删除孤立文件(没有被任何快照引用的文件)
ALTER TABLE iceberg_orders EXECUTE DELETE_ORPHAN_FILES;
-- 合并小文件
ALTER TABLE iceberg_orders EXECUTE OPTIMIZE;
-- 查看表属性
DESCRIBE EXTENDED iceberg_orders;
从 Hive 表迁移到 Iceberg
-- 方式1:创建新表并插入数据
CREATE TABLE iceberg_orders_new
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES ('format-version' = '2')
AS SELECT * FROM hive_orders;
-- 方式2:使用 LOAD DATA
CREATE TABLE iceberg_orders (
-- 列定义
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES ('format-version' = '2');
LOAD DATA INPATH '/user/hive/warehouse/hive_orders'
INTO TABLE iceberg_orders;
函数
内置函数
字符串函数
-- 字符串长度
SELECT length(name) FROM employees;
-- 字符串拼接
SELECT concat(name, '-', department) FROM employees;
SELECT concat_ws(',', name, department) FROM employees; -- 带分隔符
-- 字符串截取
SELECT substr(name, 1, 3) FROM employees; -- 从第1个字符开始,取3个
SELECT substring(name, -3) FROM employees; -- 取最后3个字符
-- 大小写转换
SELECT upper(name), lower(name), initcap(name) FROM employees; -- 首字母大写
-- 去除空格
SELECT trim(name), ltrim(name), rtrim(name) FROM employees;
-- 字符串分割
SELECT split(name, ' ') FROM employees; -- 返回数组
-- 字符串替换
SELECT replace(name, 'old', 'new') FROM employees;
-- 正则匹配
SELECT regexp_extract(name, '([A-Za-z]+)', 1) FROM employees; -- 提取匹配组
SELECT regexp_replace(name, '[0-9]+', '#') FROM employees; -- 正则替换
-- 查找位置
SELECT instr(name, 'abc') FROM employees; -- 返回位置,未找到返回 0
SELECT locate('abc', name, 1) FROM employees; -- 从位置 1 开始查找
-- 格式化
SELECT format_number(salary, 2) FROM employees; -- 数字格式化
SELECT printf('Name: %s, Salary: %.2f', name, salary) FROM employees;
日期函数
-- 当前日期和时间
SELECT current_date(); -- 当前日期
SELECT current_timestamp(); -- 当前时间戳
SELECT unix_timestamp(); -- 当前 Unix 时间戳
-- 日期加减
SELECT date_add('2024-01-01', 7); -- 加 7 天
SELECT date_sub('2024-01-01', 7); -- 减 7 天
SELECT add_months('2024-01-01', 3); -- 加 3 个月
-- 日期差
SELECT datediff('2024-01-10', '2024-01-01'); -- 相差天数
SELECT months_between('2024-03-01', '2024-01-01'); -- 相差月数
-- 日期格式化
SELECT date_format('2024-01-01', 'yyyy-MM'); -- '2024-01'
SELECT from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss');
-- 提取日期部分
SELECT year('2024-01-15'); -- 2024
SELECT month('2024-01-15'); -- 1
SELECT day('2024-01-15'); -- 15
SELECT dayofweek('2024-01-15'); -- 星期几(1=周日)
SELECT weekofyear('2024-01-15'); -- 一年中第几周
SELECT quarter('2024-01-15'); -- 季度
-- 日期转换
SELECT to_date('2024-01-01 12:00:00'); -- 转为日期
SELECT to_timestamp('2024-01-01 12:00:00'); -- 转为时间戳
-- 字符串转日期
SELECT cast('2024-01-01' as date);
SELECT from_unixtime(unix_timestamp('20240101', 'yyyyMMdd'), 'yyyy-MM-dd');
数学函数
-- 取整
SELECT round(3.14159, 2); -- 3.14
SELECT floor(3.9); -- 3(向下取整)
SELECT ceil(3.1); -- 4(向上取整)
-- 绝对值和符号
SELECT abs(-10); -- 10
SELECT sign(-10); -- -1
-- 幂运算和开方
SELECT pow(2, 10); -- 1024
SELECT sqrt(16); -- 4
SELECT cbrt(27); -- 3(立方根)
-- 对数
SELECT log(10, 100); -- 2(以 10 为底 100 的对数)
SELECT ln(2.71828); -- 自然对数
SELECT log2(8); -- 3
SELECT log10(100); -- 2
-- 三角函数
SELECT sin(0), cos(0), tan(0);
SELECT asin(0), acos(1), atan(0);
-- 随机数
SELECT rand(); -- 0 到 1 之间
SELECT rand(42); -- 固定种子的随机数
-- 其他
SELECT greatest(1, 5, 3, 2); -- 5(最大值)
SELECT least(1, 5, 3, 2); -- 1(最小值)
SELECT pmod(10, 3); -- 1(正取模)
条件函数
-- IF 函数
SELECT if(salary > 50000, 'High', 'Low') FROM employees;
-- CASE WHEN
SELECT
name,
CASE
WHEN salary >= 100000 THEN 'Senior'
WHEN salary >= 50000 THEN 'Mid'
ELSE 'Junior'
END as level
FROM employees;
-- NVL:空值替换
SELECT nvl(bonus, 0) FROM employees;
-- COALESCE:返回第一个非空值
SELECT coalesce(bonus, commission, 0) FROM employees;
-- NULLIF:如果相等返回 NULL
SELECT nullif(salary, 0) FROM employees;
集合函数
-- 数组函数
SELECT size(array(1, 2, 3)); -- 3
SELECT array_contains(array(1, 2, 3), 2); -- true
SELECT sort_array(array(3, 1, 2)); -- [1, 2, 3]
-- Map 函数
SELECT size(map('a', 1, 'b', 2)); -- 2
SELECT map_keys(map('a', 1, 'b', 2)); -- ['a', 'b']
SELECT map_values(map('a', 1, 'b', 2)); -- [1, 2]
-- 数组展开
SELECT explode(array(1, 2, 3)); -- 展开为多行
SELECT posexplode(array(1, 2, 3)); -- 展开并返回位置
-- Map 展开
SELECT explode(map('a', 1, 'b', 2)); -- 展开为多行(key, value)
自定义函数
当内置函数无法满足需求时,可以开发自定义函数。
UDF(用户定义函数)
UDF 是一进一出的函数,类似于 UPPER、LENGTH 等:
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
/**
* 自定义 UDF:去除字符串首尾引号
* 使用方式:SELECT remove_quotes('"hello"');
*/
public class RemoveQuotes extends UDF {
public Text evaluate(Text input) {
if (input == null) {
return null;
}
String str = input.toString();
// 去除首尾引号
String result = str.replaceAll("^\"|\"$", "");
return new Text(result);
}
// 支持重载
public Text evaluate(Text input, Text quoteChar) {
if (input == null) {
return null;
}
String str = input.toString();
String quote = quoteChar != null ? quoteChar.toString() : "\"";
String result = str.replaceAll("^" + quote + "|" + quote + "$", "");
return new Text(result);
}
}
编译和部署:
# 编译
javac -cp $(hadoop classpath):$(hive classpath) RemoveQuotes.java
# 打包
jar cvf my-udf.jar com/example/hive/udf/RemoveQuotes.class
# 在 Hive 中使用
ADD JAR /path/to/my-udf.jar;
CREATE TEMPORARY FUNCTION remove_quotes AS 'com.example.hive.udf.RemoveQuotes';
SELECT remove_quotes('"hello"'); -- 返回 hello
GenericUDF(通用 UDF)
GenericUDF 支持复杂类型和更灵活的参数处理:
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
/**
* 自定义 GenericUDF:字符串拼接(支持任意数量参数)
*/
public class ConcatAll extends GenericUDF {
private StringObjectInspector[] inspectors;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
if (arguments.length < 2) {
throw new UDFArgumentLengthException(
"ConcatAll requires at least 2 arguments");
}
inspectors = new StringObjectInspector[arguments.length];
for (int i = 0; i < arguments.length; i++) {
if (!(arguments[i] instanceof StringObjectInspector)) {
throw new UDFArgumentException(
"All arguments must be strings");
}
inspectors[i] = (StringObjectInspector) arguments[i];
}
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < arguments.length; i++) {
Object value = arguments[i].get();
if (value != null) {
sb.append(inspectors[i].getPrimitiveJavaObject(value));
}
}
return sb.toString();
}
@Override
public String getDisplayString(String[] children) {
return "concat_all(" + String.join(", ", children) + ")";
}
}
UDAF(用户定义聚合函数)
UDAF 是多进一出的聚合函数,类似于 SUM、AVG 等:
package com.example.hive.udaf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.DoubleWritable;
/**
* 自定义 UDAF:计算几何平均数
*/
public class GeometricMean extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws SemanticException {
if (parameters.length != 1) {
throw new UDFArgumentTypeException(
parameters.length - 1, "Exactly one argument is required");
}
return new GeometricMeanEvaluator();
}
public static class GeometricMeanEvaluator extends GenericUDAFEvaluator {
// 输入数据类型检查器
private PrimitiveObjectInspector inputInspector;
// 中间结果:存储 log_sum 和 count
static class LogSumCount implements AggregationBuffer {
double logSum;
long count;
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
super.init(m, parameters);
if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
inputInspector = (PrimitiveObjectInspector) parameters[0];
}
// 输出类型
return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
LogSumCount buffer = new LogSumCount();
reset(buffer);
return buffer;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
LogSumCount buffer = (LogSumCount) agg;
buffer.logSum = 0.0;
buffer.count = 0;
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters)
throws HiveException {
if (parameters[0] != null) {
LogSumCount buffer = (LogSumCount) agg;
double value = inputInspector.getDouble(parameters[0]);
if (value > 0) {
buffer.logSum += Math.log(value);
buffer.count++;
}
}
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
LogSumCount buffer = (LogSumCount) agg;
double[] result = new double[2];
result[0] = buffer.logSum;
result[1] = buffer.count;
return result;
}
@Override
public void merge(AggregationBuffer agg, Object partial)
throws HiveException {
if (partial != null) {
LogSumCount buffer = (LogSumCount) agg;
double[] partialResult = (double[]) partial;
buffer.logSum += partialResult[0];
buffer.count += (long) partialResult[1];
}
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
LogSumCount buffer = (LogSumCount) agg;
if (buffer.count == 0) {
return null;
}
// 几何平均数 = exp(sum(log(x)) / n)
return new DoubleWritable(Math.exp(buffer.logSum / buffer.count));
}
}
}
UDTF(用户定义表生成函数)
UDTF 是一进多出的函数,类似于 explode:
package com.example.hive.udtf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import java.util.ArrayList;
import java.util.List;
/**
* 自定义 UDTF:将字符串按分隔符分割为多行
* 使用方式:SELECT split_to_rows('a,b,c', ',');
* 结果:三行,分别是 a, b, c
*/
public class SplitToRows extends GenericUDTF {
private StringObjectInspector stringInspector;
@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {
if (args.length != 2) {
throw new UDFArgumentLengthException("SplitToRows takes exactly 2 arguments");
}
stringInspector = (StringObjectInspector) args[0];
// 定义输出列
List<String> fieldNames = new ArrayList<>();
fieldNames.add("value");
List<ObjectInspector> fieldOIs = new ArrayList<>();
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(
fieldNames, fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
String input = stringInspector.getPrimitiveJavaObject(args[0]);
String delimiter = stringInspector.getPrimitiveJavaObject(args[1]);
if (input == null || delimiter == null) {
return;
}
String[] parts = input.split(delimiter);
for (String part : parts) {
String[] result = new String[1];
result[0] = part;
forward(result);
}
}
@Override
public void close() throws HiveException {
// 清理资源
}
}
注册和永久使用 UDF
-- 临时函数(当前会话有效)
ADD JAR /path/to/my-udf.jar;
CREATE TEMPORARY FUNCTION my_func AS 'com.example.hive.udf.MyUDF';
-- 永久函数(所有会话可用)
CREATE FUNCTION my_db.my_func AS 'com.example.hive.udf.MyUDF'
USING JAR 'hdfs:///user/hive/lib/my-udf.jar';
-- 查看函数
SHOW FUNCTIONS LIKE 'my_*';
DESCRIBE FUNCTION my_func;
DESCRIBE FUNCTION EXTENDED my_func;
-- 删除函数
DROP TEMPORARY FUNCTION IF EXISTS my_func;
DROP FUNCTION IF EXISTS my_db.my_func;
性能调优
执行引擎优化
-- 使用 Tez 引擎(推荐)
SET hive.execution.engine=tez;
-- 使用 Spark 引擎
SET hive.execution.engine=spark;
-- Tez 优化配置
SET tez.grouping.min-size=134217728; -- 最小分组大小 128MB
SET tez.grouping.max-size=268435456; -- 最大分组大小 256MB
SET tez.runtime.unordered.output.buffer.size-mb=100;
并行执行
-- 启用并行执行
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=16; -- 并行线程数
-- 向量化执行(批量处理)
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled=true;
Cost-Based Optimizer(CBO)
-- 启用 CBO
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;
-- 收集统计信息
ANALYZE TABLE employees COMPUTE STATISTICS;
ANALYZE TABLE employees COMPUTE STATISTICS FOR COLUMNS;
ANALYZE TABLE sales PARTITION(year=2024, month=1) COMPUTE STATISTICS;
存储优化
-- 使用列式存储
CREATE TABLE optimized_table (...)
STORED AS ORC
TBLPROPERTIES (
'orc.compress' = 'SNAPPY',
'orc.create.index' = 'true'
);
-- 启用压缩
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
-- 合并小文件
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000; -- 每个任务合并的文件大小
SET hive.merge.smallfiles.avgsize=16000000; -- 触发合并的阈值
Join 优化
-- 自动 Map Join
SET hive.auto.convert.join=true;
SET hive.auto.convert.join.noconditionaltask=true;
SET hive.auto.convert.join.noconditionaltask.size=20000000; -- 20MB
-- Skew Join(处理数据倾斜)
SET hive.optimize.skewjoin=true;
SET hive.skewjoin.key=100000; -- 倾斜阈值
-- 大表 Join 大表优化
-- 1. 使用分桶表
-- 2. 使用分区裁剪
-- 3. 使用 STREAMTABLE 提示
SELECT /*+ STREAMTABLE(large_table) */ *
FROM large_table l
JOIN small_table s ON l.id = s.id;
查询优化
-- 列裁剪:只选择需要的列
-- 好
SELECT id, name FROM employees WHERE department = 'Engineering';
-- 差
SELECT * FROM employees WHERE department = 'Engineering';
-- 分区裁剪:使用分区过滤
SELECT * FROM sales WHERE year=2024 AND month=1;
-- 谓词下推:尽早过滤
-- 好
SELECT * FROM (
SELECT * FROM orders WHERE status = 'completed'
) t JOIN customers c ON t.customer_id = c.id;
-- 差
SELECT * FROM orders t JOIN customers c ON t.customer_id = c.id
WHERE t.status = 'completed';
-- 使用 SORT BY 替代 ORDER BY(当不需要全局排序时)
SELECT * FROM employees SORT BY salary DESC;
-- 限制返回行数
SELECT * FROM large_table LIMIT 1000;
-- 避免 SELECT DISTINCT(GROUP BY 通常更快)
SELECT department FROM employees GROUP BY department;
-- 比 SELECT DISTINCT department FROM employees; 更快
内存和资源配置
-- Map 任务配置
SET mapreduce.map.memory.mb=4096;
SET mapreduce.map.java.opts=-Xmx3072m;
-- Reduce 任务配置
SET mapreduce.reduce.memory.mb=8192;
SET mapreduce.reduce.java.opts=-Xmx6144m;
-- Reduce 任务数量
SET mapreduce.job.reduces=10;
-- 或根据数据量自动设置
SET hive.exec.reducers.bytes.per.reducer=256000000; -- 每个 reducer 处理的数据量
常见问题排查
-- 1. 查看执行计划
EXPLAIN SELECT * FROM employees WHERE department = 'Engineering';
EXPLAIN EXTENDED SELECT * FROM employees WHERE department = 'Engineering';
-- 2. 查看 Stage 依赖
EXPLAIN DEPENDENCY SELECT * FROM employees e JOIN departments d;
-- 3. 锁问题排查
SHOW LOCKS;
SHOW LOCKS employees;
-- 禁用锁(开发环境)
SET hive.support.concurrency=false;
-- 4. 小文件问题
-- 查看文件数量
DESCRIBE FORMATTED employees;
-- 合并小文件
INSERT OVERWRITE TABLE employees SELECT * FROM employees;
数据仓库建模
维度建模基础
维度建模是一种专门用于数据仓库的设计方法,由 Ralph Kimball 提出。核心概念包括:
事实表(Fact Table)
事实表存储业务过程的度量数据,包含:
- 度量列:可聚合的数值数据,如销售额、订单数量
- 维度外键:关联到维度表的外键
-- 销售事实表
CREATE TABLE fact_sales (
date_key INT, -- 日期维度外键
product_key INT, -- 产品维度外键
store_key INT, -- 门店维度外键
customer_key INT, -- 客户维度外键
sales_amount DECIMAL(12,2), -- 销售金额(可加度量)
quantity INT, -- 销售数量(可加度量)
unit_price DECIMAL(10,2), -- 单价(不可加度量)
transaction_count INT -- 交易次数(可加度量)
)
PARTITIONED BY (year INT, month INT)
STORED AS ORC;
维度表(Dimension Table)
维度表存储业务实体的描述性信息:
-- 产品维度表
CREATE TABLE dim_product (
product_key INT, -- 代理键
product_id STRING, -- 业务键
product_name STRING,
category_name STRING,
brand_name STRING,
supplier_name STRING,
is_active BOOLEAN,
effective_date DATE,
expiry_date DATE,
current_flag BOOLEAN -- 当前有效标识
)
STORED AS ORC;
-- 日期维度表
CREATE TABLE dim_date (
date_key INT,
full_date DATE,
day_of_week INT,
day_name STRING,
week_of_year INT,
month INT,
month_name STRING,
quarter INT,
year INT,
is_weekend BOOLEAN,
is_holiday BOOLEAN
)
STORED AS ORC;
星型模型与雪花模型
星型模型(Star Schema)
星型模型是最简单的维度模型,所有维度表直接与事实表关联:
-- 星型模型示例
-- 事实表
CREATE TABLE fact_order (
order_id BIGINT,
customer_key INT,
product_key INT,
time_key INT,
store_key INT,
order_amount DECIMAL(12,2),
quantity INT
);
-- 维度表直接关联事实表
-- dim_customer、dim_product、dim_time、dim_store 都直接关联 fact_order
-- 查询示例
SELECT
p.category_name,
t.year,
t.month,
SUM(f.order_amount) as total_amount
FROM fact_order f
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_time t ON f.time_key = t.time_key
GROUP BY p.category_name, t.year, t.month;
优点:
- 查询简单,Join 次数少
- 适合 OLAP 分析
- 性能好
雪花模型(Snowflake Schema)
雪花模型是星型模型的规范化形式,维度表可以进一步分解:
-- 雪花模型示例
-- 事实表
CREATE TABLE fact_order_snowflake (
order_id BIGINT,
customer_key INT,
product_key INT,
time_key INT,
store_key INT,
order_amount DECIMAL(12,2),
quantity INT
);
-- 产品维度关联品牌维度
CREATE TABLE dim_product_snowflake (
product_key INT,
product_name STRING,
brand_key INT, -- 关联品牌维度
category_key INT -- 关联类别维度
);
CREATE TABLE dim_brand (
brand_key INT,
brand_name STRING,
brand_country STRING
);
CREATE TABLE dim_category (
category_key INT,
category_name STRING,
parent_category_key INT
);
-- 查询需要更多 Join
SELECT
c.category_name,
b.brand_name,
SUM(f.order_amount) as total_amount
FROM fact_order_snowflake f
JOIN dim_product_snowflake p ON f.product_key = p.product_key
JOIN dim_brand b ON p.brand_key = b.brand_key
JOIN dim_category c ON p.category_key = c.category_key
GROUP BY c.category_name, b.brand_name;
优缺点对比:
| 特性 | 星型模型 | 雪花模型 |
|---|---|---|
| 查询复杂度 | 简单 | 较复杂 |
| Join 次数 | 少 | 多 |
| 查询性能 | 好 | 较差 |
| 存储空间 | 较大(冗余) | 较小(规范化) |
| 维护难度 | 简单 | 复杂 |
| 推荐场景 | 大多数数仓场景 | 维度层级复杂场景 |
缓慢变化维度(SCD)
维度数据会随时间变化,处理变化的方式称为缓慢变化维度(Slowly Changing Dimension):
SCD Type 1:覆盖旧值
直接更新维度记录,不保留历史:
-- 更新客户信息(覆盖)
INSERT OVERWRITE TABLE dim_customer
SELECT
customer_key,
customer_id,
customer_name,
'新地址' as address, -- 新值覆盖旧值
phone,
email
FROM dim_customer
WHERE customer_id = 'C001';
SCD Type 2:新增历史记录
新增一条记录保留历史,使用代理键区分:
-- 新增历史记录
-- 1. 设置旧记录失效
UPDATE dim_customer
SET current_flag = false, expiry_date = CURRENT_DATE
WHERE customer_id = 'C001' AND current_flag = true;
-- 2. 插入新记录
INSERT INTO dim_customer VALUES
(next_key, 'C001', '新名称', '新地址', ..., CURRENT_DATE, NULL, true);
SCD Type 3:新增字段
新增字段存储上一个值:
CREATE TABLE dim_customer_scd3 (
customer_key INT,
customer_id STRING,
customer_name STRING,
current_address STRING, -- 当前地址
previous_address STRING, -- 上一个地址
address_change_date DATE -- 变更日期
);
数据分层架构
现代数据仓库通常采用分层架构:
ODS(原始数据层)
↓ 清洗、转换
DWD(明细数据层)
↓ 汇总
DWS(汇总数据层)
↓ 面向应用
ADS(应用数据层)
-- ODS:原始数据层,保持数据原貌
CREATE TABLE ods_orders (
order_id STRING,
customer_id STRING,
product_id STRING,
order_time STRING,
amount STRING,
raw_data STRING
)
PARTITIONED BY (dt STRING)
STORED AS TEXTFILE;
-- DWD:明细数据层,清洗后的明细数据
CREATE TABLE dwd_order_detail (
order_id BIGINT,
customer_key INT,
product_key INT,
order_time TIMESTAMP,
amount DECIMAL(12,2),
quantity INT
)
PARTITIONED BY (dt DATE)
STORED AS ORC;
-- DWS:汇总数据层,按主题汇总
CREATE TABLE dws_order_summary_daily (
stat_date DATE,
customer_key INT,
order_count BIGINT,
total_amount DECIMAL(15,2),
avg_order_amount DECIMAL(10,2)
)
PARTITIONED BY (dt DATE)
STORED AS ORC;
-- ADS:应用数据层,面向业务
CREATE TABLE ads_customer_order_stats (
customer_id STRING,
customer_name STRING,
total_orders_30d BIGINT,
total_amount_30d DECIMAL(15,2),
last_order_date DATE
)
STORED AS ORC;
Hive 事务
Hive 从 0.13 版本开始支持 ACID 事务,但有一定限制。
启用事务支持
-- 启用事务支持
SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on=true;
SET hive.compactor.worker.threads=1;
创建事务表
-- 创建支持 ACID 的事务表
CREATE TABLE transactional_table (
id INT,
name STRING,
value DECIMAL(10,2)
)
CLUSTERED BY (id) INTO 4 BUCKETS -- 必须分桶
STORED AS ORC -- 必须是 ORC 格式
TBLPROPERTIES (
'transactional' = 'true',
'transactional_properties' = 'default'
);
CRUD 操作
-- 插入数据
INSERT INTO TABLE transactional_table VALUES
(1, 'Alice', 100.00),
(2, 'Bob', 200.00);
-- 更新数据
UPDATE transactional_table
SET value = 150.00
WHERE id = 1;
-- 删除数据
DELETE FROM transactional_table WHERE id = 2;
-- 合并操作(Merge)
MERGE INTO transactional_table t
USING source_table s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.value = s.value
WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.name, s.value);
事务限制
- 表必须是分桶的 ORC 表
- 不支持子查询、JOIN 等复杂语句中的 UPDATE/DELETE
- 事务会产生增量文件,需要定期压缩(Compaction)
Hive on Spark
将 Hive 查询运行在 Spark 上可以显著提升性能:
-- 切换到 Spark 引擎
SET hive.execution.engine=spark;
-- Spark 资源配置
SET spark.executor.memory=4g;
SET spark.executor.cores=2;
SET spark.executor.instances=10;
SET spark.driver.memory=2g;
-- Spark 动态分配
SET spark.dynamicAllocation.enabled=true;
SET spark.dynamicAllocation.minExecutors=1;
SET spark.dynamicAllocation.maxExecutors=20;
SET spark.dynamicAllocation.initialExecutors=5;
-- Spark 优化配置
SET spark.sql.shuffle.partitions=200;
SET spark.sql.adaptive.enabled=true;
实践案例
案例1:构建用户行为分析数仓
假设需要分析电商网站的用户行为,包括浏览、点击、购买等事件。
数据分层设计
-- === ODS 层:原始数据 ===
-- 保持数据原貌,按天分区
CREATE TABLE ods_user_events (
event_id STRING,
user_id STRING,
event_type STRING, -- view, click, cart, purchase
product_id STRING,
category_id STRING,
event_time STRING,
platform STRING,
device_info STRING
)
COMMENT '用户行为原始数据'
PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
-- === DWD 层:明细数据 ===
-- 清洗转换后的明细数据
CREATE TABLE dwd_user_event_detail (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
product_id BIGINT,
category_id BIGINT,
event_time TIMESTAMP,
platform STRING COMMENT 'web/app/miniprogram',
device_type STRING,
province STRING,
city STRING
)
COMMENT '用户行为明细数据'
PARTITIONED BY (dt DATE)
STORED AS ORC
TBLPROPERTIES ('orc.compress' = 'SNAPPY');
-- 数据清洗
INSERT OVERWRITE TABLE dwd_user_event_detail PARTITION(dt)
SELECT
CAST(event_id AS BIGINT),
CAST(user_id AS BIGINT),
event_type,
CAST(product_id AS BIGINT),
CAST(category_id AS BIGINT),
FROM_UNIXTIME(CAST(event_time AS BIGINT)/1000) as event_time,
platform,
GET_JSON_OBJECT(device_info, '$.device_type') as device_type,
GET_JSON_OBJECT(device_info, '$.province') as province,
GET_JSON_OBJECT(device_info, '$.city') as city,
TO_DATE(FROM_UNIXTIME(CAST(event_time AS BIGINT)/1000)) as dt
FROM ods_user_events
WHERE dt = '2024-01-15';
-- === DWS 层:汇总数据 ===
-- 用户日行为汇总
CREATE TABLE dws_user_action_daily (
user_id BIGINT,
dt DATE,
view_count BIGINT,
click_count BIGINT,
cart_count BIGINT,
purchase_count BIGINT,
purchase_amount DECIMAL(12, 2),
unique_products BIGINT,
unique_categories BIGINT
)
COMMENT '用户日行为汇总'
PARTITIONED BY (dt DATE)
STORED AS ORC;
-- 汇总计算
INSERT OVERWRITE TABLE dws_user_action_daily PARTITION(dt)
SELECT
user_id,
dt,
SUM(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END) as view_count,
SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) as click_count,
SUM(CASE WHEN event_type = 'cart' THEN 1 ELSE 0 END) as cart_count,
SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchase_count,
SUM(CASE WHEN event_type = 'purchase' THEN
GET_JSON_OBJECT(device_info, '$.amount') ELSE 0 END) as purchase_amount,
COUNT(DISTINCT product_id) as unique_products,
COUNT(DISTINCT category_id) as unique_categories,
dt
FROM dwd_user_event_detail
GROUP BY user_id, dt;
-- === ADS 层:应用数据 ===
-- 活跃用户指标
CREATE TABLE ads_active_users (
stat_date DATE,
dau BIGINT COMMENT '日活用户数',
wau BIGINT COMMENT '周活用户数',
mau BIGINT COMMENT '月活用户数',
avg_session_duration DOUBLE COMMENT '平均停留时长',
avg_pages_per_user DOUBLE COMMENT '人均浏览页数'
)
STORED AS ORC;
-- 计算 DAU/WAU/MAU
INSERT OVERWRITE TABLE ads_active_users
SELECT
'2024-01-15' as stat_date,
COUNT(DISTINCT CASE WHEN dt = '2024-01-15' THEN user_id END) as dau,
COUNT(DISTINCT CASE WHEN dt >= '2024-01-09' THEN user_id END) as wau,
COUNT(DISTINCT CASE WHEN dt >= '2024-01-01' THEN user_id END) as mau,
AVG(session_duration) as avg_session_duration,
AVG(page_views) as avg_pages_per_user
FROM (
SELECT
user_id,
dt,
SUM(view_count) as page_views,
-- 估算停留时长(假设每次浏览平均 30 秒)
SUM(view_count) * 30 as session_duration
FROM dws_user_action_daily
WHERE dt >= '2024-01-01'
GROUP BY user_id, dt
) t;
案例2:处理数据倾斜
数据倾斜是 Hive 查询中最常见的问题之一,表现为某些任务执行时间远超其他任务。
识别数据倾斜
-- 方法1:查看任务执行情况
-- 在 YARN UI 中观察任务进度,如果某个任务卡在 99%
-- 方法2:检查 Key 分布
SELECT
CASE
WHEN cnt < 1000 THEN '<1K'
WHEN cnt < 10000 THEN '1K-10K'
WHEN cnt < 100000 THEN '10K-100K'
WHEN cnt < 1000000 THEN '100K-1M'
ELSE '>1M'
END as bucket,
COUNT(*) as key_count
FROM (
SELECT key, COUNT(*) as cnt
FROM large_table
GROUP BY key
) t
GROUP BY
CASE
WHEN cnt < 1000 THEN '<1K'
WHEN cnt < 10000 THEN '1K-10K'
WHEN cnt < 100000 THEN '10K-100K'
WHEN cnt < 1000000 THEN '100K-1M'
ELSE '>1M'
END;
解决方案
-- 问题:按 user_id 分组时出现倾斜
SELECT user_id, COUNT(*) as cnt
FROM events
GROUP BY user_id;
-- 方案1:加盐(Salt)
-- 将倾斜 Key 分散到多个 Reduce
SELECT
CASE
WHEN user_id IN ('skew_key1', 'skew_key2')
THEN CONCAT(user_id, '_', CAST(RAND() * 10 AS INT))
ELSE user_id
END as salted_user_id,
COUNT(*) as cnt
FROM events
GROUP BY
CASE
WHEN user_id IN ('skew_key1', 'skew_key2')
THEN CONCAT(user_id, '_', CAST(RAND() * 10 AS INT))
ELSE user_id
END;
-- 方案2:Map Join 处理小表
-- 如果一方是小表
SET hive.auto.convert.join=true;
SET hive.auto.convert.join.noconditionaltask.size=100000000;
SELECT /*+ MAPJOIN(small_dim) */
e.user_id,
d.user_name,
COUNT(*) as cnt
FROM events e
JOIN user_dim d ON e.user_id = d.user_id
GROUP BY e.user_id, d.user_name;
-- 方案3:Skew Join
-- 启用倾斜 Join 优化
SET hive.optimize.skewjoin=true;
SET hive.skewjoin.key=100000; -- 超过 10 万条记录视为倾斜
-- 方案4:两阶段聚合
-- 先局部聚合,再全局聚合
SELECT user_id, SUM(cnt) as total_cnt
FROM (
SELECT user_id, COUNT(*) as cnt
FROM events
GROUP BY user_id, CAST(RAND() * 100 AS INT) -- 局部聚合
) t
GROUP BY user_id; -- 全局聚合
案例3:小文件治理
大量小文件会严重影响 HDFS 性能和 Hive 查询效率。
检测小文件
-- 查看表的文件数量和大小
DESCRIBE FORMATTED table_name;
-- 查看分区文件数量
SELECT
partition_name,
COUNT(*) as file_count,
SUM(file_size) / 1024 / 1024 as total_size_mb,
AVG(file_size) / 1024 / 1024 as avg_file_size_mb
FROM (
SELECT
CONCAT(year, '-', month) as partition_name,
file_name,
file_size
FROM table_name -- 这里需要使用系统表
) t
GROUP BY partition_name;
解决方案
-- 方案1:启用合并
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000; -- 目标文件大小 256MB
SET hive.merge.smallfiles.avgsize=16000000; -- 小于 16MB 触发合并
-- 方案2:使用 DISTRIBUTE BY 控制输出
INSERT OVERWRITE TABLE target_table
SELECT * FROM source_table
DISTRIBUTE BY CAST(RAND() * 10 AS INT); -- 控制输出文件数量
-- 方案3:定期合并
-- 对于已存在的小文件,使用 INSERT OVERWRITE 重建
INSERT OVERWRITE TABLE table_name PARTITION(year, month)
SELECT * FROM table_name;
-- 方案4:使用 Tez 或 Spark 引擎
-- Tez/Spark 对小文件处理更优化
SET hive.execution.engine=tez;
案例4:增量数据处理
处理每天新增的增量数据,确保数据一致性。
-- 场景:每日增量订单数据合并到主表
-- 1. 创建增量表
CREATE TABLE orders_incremental (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DECIMAL(12, 2),
status STRING,
update_time TIMESTAMP
)
PARTITIONED BY (dt DATE)
STORED AS ORC;
-- 2. 创建主表(支持事务)
CREATE TABLE orders_master (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DECIMAL(12, 2),
status STRING,
update_time TIMESTAMP
)
CLUSTERED BY (order_id) INTO 16 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');
-- 3. 合并增量数据
-- 方式 A:使用 MERGE 语句(Hive 3.0+)
MERGE INTO orders_master m
USING (
SELECT * FROM orders_incremental
WHERE dt = '2024-01-15'
) i
ON m.order_id = i.order_id
WHEN MATCHED THEN UPDATE SET
m.status = i.status,
m.amount = i.amount,
m.update_time = i.update_time
WHEN NOT MATCHED THEN INSERT VALUES (
i.order_id, i.customer_id, i.order_time,
i.amount, i.status, i.update_time
);
-- 方式 B:使用 UNION ALL + ROW_NUMBER 去重(适用非事务表)
INSERT OVERWRITE TABLE orders_master
SELECT order_id, customer_id, order_time, amount, status, update_time
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY update_time DESC) as rn
FROM (
SELECT * FROM orders_master
UNION ALL
SELECT * FROM orders_incremental WHERE dt = '2024-01-15'
) t
) t2
WHERE rn = 1;
小结
本章详细介绍了 Hive 数据仓库的核心概念和使用方法:
- Hive 架构:用户接口、元数据存储、驱动器、执行引擎的协作
- 数据模型:内部表、外部表、分区表、分桶表的设计和使用
- 存储格式:文本格式、ORC、Parquet 的特点和选择
- HiveQL 查询:基本查询、聚合、Join、排序、子查询
- 窗口函数:聚合窗口、排名函数、值访问函数的详细使用
- 自定义函数:UDF、GenericUDF、UDAF、UDTF 的开发方法
- 性能调优:执行引擎、并行执行、存储优化、查询优化
- 数据仓库建模:维度建模、星型模型、缓慢变化维度、分层架构
- 事务支持:ACID 事务的配置和使用
Hive 是大数据离线分析的核心工具,掌握 Hive 对于构建企业级数据仓库至关重要。在实际项目中,Hive 常与 Spark、Flink、Kafka 等组件配合使用,构建完整的批流一体数据平台。