Hive 数据仓库
Apache Hive是构建在Hadoop之上的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。Hive让不熟悉MapReduce的开发人员也能进行大数据分析。
Hive 概述
什么是 Hive?
Hive最初由Facebook开发,用于处理海量的社交网络数据。它提供了一种类似SQL的查询语言——HiveQL(HQL),将SQL语句转换为MapReduce或Tez、Spark任务执行。
Hive的核心特点:
- 类SQL语法:降低大数据分析门槛
- 可扩展:支持自定义函数(UDF、UDAF、UDTF)
- 多引擎支持:可运行在MapReduce、Tez、Spark上
- 元数据管理:统一的元数据存储
Hive vs 传统数据库
| 维度 | Hive | 传统数据库 |
|---|---|---|
| 数据规模 | TB到PB级 | GB到TB级 |
| 查询延迟 | 分钟到小时 | 毫秒到秒 |
| 数据更新 | 不支持频繁更新 | 支持事务 |
| 适用场景 | 离线分析、数据仓库 | OLTP事务处理 |
| 扩展性 | 水平扩展 | 垂直扩展为主 |
Hive 架构
Hive的架构主要包括以下组件:
用户接口
- CLI:命令行接口
- JDBC/ODBC:标准数据库连接接口
- Web GUI:Web管理界面(如Hue)
元数据存储(Metastore)
存储Hive的元数据信息:
- 表名、列名、数据类型
- 表存储位置、分区信息
- 表属性、SerDe信息
驱动器(Driver)
核心组件,负责:
- 解析器:解析SQL语句
- 编译器:编译生成执行计划
- 优化器:优化执行计划
- 执行器:执行任务
执行引擎
实际执行任务的引擎:
- MapReduce:默认引擎
- Tez:DAG执行引擎,性能更好
- Spark:内存计算引擎
Hive 数据模型
数据库
Hive中的数据库是表的命名空间:
-- 创建数据库
CREATE DATABASE IF NOT EXISTS mydb
COMMENT 'My database'
LOCATION '/user/hive/warehouse/mydb.db';
-- 查看数据库
SHOW DATABASES;
DESCRIBE DATABASE mydb;
-- 使用数据库
USE mydb;
-- 删除数据库
DROP DATABASE IF EXISTS mydb CASCADE;
表
Hive表是数据映射的逻辑结构:
内部表(管理表)
内部表由Hive管理数据生命周期,删除表时数据也会被删除:
-- 创建内部表
CREATE TABLE IF NOT EXISTS employees (
id INT,
name STRING,
department STRING,
salary DECIMAL(10, 2)
)
COMMENT 'Employee information'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
-- 查看表结构
DESCRIBE employees;
DESCRIBE FORMATTED employees;
-- 删除表(数据也会删除)
DROP TABLE employees;
外部表
外部表只管理元数据,删除表时数据不会被删除:
-- 创建外部表
CREATE EXTERNAL TABLE IF NOT EXISTS logs (
log_time STRING,
level STRING,
message STRING
)
COMMENT 'Application logs'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data/logs';
-- 删除表(数据保留)
DROP TABLE logs;
分区表
分区表将数据按分区列存储在不同目录,提高查询效率:
-- 创建分区表
CREATE TABLE IF NOT EXISTS sales (
product_id INT,
quantity INT,
amount DECIMAL(10, 2)
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;
-- 添加分区
ALTER TABLE sales ADD PARTITION (year=2024, month=1);
ALTER TABLE sales ADD PARTITION (year=2024, month=2);
-- 查看分区
SHOW PARTITIONS sales;
-- 按分区查询(只扫描相关分区)
SELECT * FROM sales WHERE year=2024 AND month=1;
分桶表
分桶表将数据按列的哈希值分散到固定数量的文件中:
-- 创建分桶表
CREATE TABLE IF NOT EXISTS users_bucketed (
user_id INT,
username STRING,
email STRING
)
CLUSTERED BY (user_id) INTO 4 BUCKETS
STORED AS ORC;
-- 分桶表用于优化JOIN和采样
数据类型
基本数据类型
| 类型 | 说明 | 示例 |
|---|---|---|
| TINYINT | 1字节整数 | 10 |
| SMALLINT | 2字节整数 | 1000 |
| INT | 4字节整数 | 100000 |
| BIGINT | 8字节整数 | 1000000000L |
| FLOAT | 单精度浮点 | 3.14f |
| DOUBLE | 双精度浮点 | 3.14159 |
| DECIMAL | 精确数值 | DECIMAL(10,2) |
| STRING | 字符串 | 'hello' |
| VARCHAR | 可变长字符串 | VARCHAR(100) |
| CHAR | 定长字符串 | CHAR(10) |
| BOOLEAN | 布尔值 | true/false |
| DATE | 日期 | '2024-01-01' |
| TIMESTAMP | 时间戳 | '2024-01-01 12:00:00' |
复杂数据类型
-- ARRAY:数组
CREATE TABLE array_table (
id INT,
tags ARRAY<STRING>
);
-- MAP:映射
CREATE TABLE map_table (
id INT,
properties MAP<STRING, STRING>
);
-- STRUCT:结构体
CREATE TABLE struct_table (
id INT,
address STRUCT<street:STRING, city:STRING, zip:INT>
);
-- 访问复杂类型
SELECT tags[0] FROM array_table;
SELECT properties['key'] FROM map_table;
SELECT address.city FROM struct_table;
HiveQL 查询
基本查询
-- 选择列
SELECT id, name, salary FROM employees;
-- WHERE条件
SELECT * FROM employees WHERE salary > 50000;
-- DISTINCT去重
SELECT DISTINCT department FROM employees;
-- LIMIT限制结果
SELECT * FROM employees LIMIT 10;
-- 别名
SELECT id AS emp_id, name AS emp_name FROM employees;
-- CASE WHEN
SELECT
name,
CASE
WHEN salary >= 100000 THEN 'High'
WHEN salary >= 50000 THEN 'Medium'
ELSE 'Low'
END AS salary_level
FROM employees;
聚合函数
-- 基本聚合
SELECT
COUNT(*) AS total_count,
COUNT(DISTINCT department) AS dept_count,
SUM(salary) AS total_salary,
AVG(salary) AS avg_salary,
MAX(salary) AS max_salary,
MIN(salary) AS min_salary
FROM employees;
-- GROUP BY分组
SELECT
department,
COUNT(*) AS emp_count,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department;
-- HAVING过滤分组结果
SELECT
department,
COUNT(*) AS emp_count
FROM employees
GROUP BY department
HAVING COUNT(*) > 10;
JOIN 操作
-- 内连接
SELECT e.name, d.dept_name
FROM employees e
JOIN departments d ON e.department = d.dept_id;
-- 左外连接
SELECT e.name, d.dept_name
FROM employees e
LEFT JOIN departments d ON e.department = d.dept_id;
-- 右外连接
SELECT e.name, d.dept_name
FROM employees e
RIGHT JOIN departments d ON e.department = d.dept_id;
-- 全外连接
SELECT e.name, d.dept_name
FROM employees e
FULL OUTER JOIN departments d ON e.department = d.dept_id;
-- 多表连接
SELECT e.name, d.dept_name, p.project_name
FROM employees e
JOIN departments d ON e.department = d.dept_id
JOIN projects p ON e.project_id = p.project_id;
-- Map Join(小表广播)
SELECT /*+ MAPJOIN(d) */ e.name, d.dept_name
FROM employees e
JOIN departments d ON e.department = d.dept_id;
排序
-- ORDER BY:全局排序(只有一个Reducer)
SELECT * FROM employees ORDER BY salary DESC;
-- SORT BY:每个Reducer内部排序
SELECT * FROM employees SORT BY salary DESC;
-- DISTRIBUTE BY:控制分区
SELECT * FROM employees DISTRIBUTE BY department SORT BY salary DESC;
-- CLUSTER BY:等于DISTRIBUTE BY + SORT BY(同一列)
SELECT * FROM employees CLUSTER BY department;
子查询
-- WHERE子查询
SELECT * FROM employees
WHERE department IN (SELECT dept_id FROM departments WHERE location = 'Beijing');
-- FROM子查询
SELECT dept, avg_salary
FROM (
SELECT department AS dept, AVG(salary) AS avg_salary
FROM employees
GROUP BY department
) t
WHERE avg_salary > 50000;
-- WITH子句(CTE)
WITH dept_stats AS (
SELECT department, AVG(salary) AS avg_salary
FROM employees
GROUP BY department
)
SELECT * FROM dept_stats WHERE avg_salary > 50000;
数据导入导出
数据导入
-- 从本地文件导入
LOAD DATA LOCAL INPATH '/local/path/data.txt'
INTO TABLE employees;
-- 从HDFS导入
LOAD DATA INPATH '/hdfs/path/data.txt'
INTO TABLE employees;
-- 覆盖导入
LOAD DATA LOCAL INPATH '/local/path/data.txt'
OVERWRITE INTO TABLE employees;
-- INSERT导入
INSERT INTO TABLE employees VALUES (1, 'Alice', 'IT', 80000);
-- INSERT OVERWRITE覆盖
INSERT OVERWRITE TABLE employees
SELECT * FROM temp_employees;
-- 多表插入
FROM source_table
INSERT INTO TABLE target1 SELECT col1, col2
INSERT INTO TABLE target2 SELECT col3, col4;
-- 动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE sales PARTITION(year, month)
SELECT product_id, quantity, amount, year, month FROM temp_sales;
数据导出
-- 导出到本地
INSERT OVERWRITE LOCAL DIRECTORY '/local/path/output'
SELECT * FROM employees;
-- 导出到HDFS
INSERT OVERWRITE DIRECTORY '/hdfs/path/output'
SELECT * FROM employees;
-- 指定格式导出
INSERT OVERWRITE LOCAL DIRECTORY '/local/path/output'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT * FROM employees;
-- 使用hive命令导出
-- hive -e "SELECT * FROM employees" > output.txt
视图
视图是虚拟表,不存储数据:
-- 创建视图
CREATE VIEW IF NOT EXISTS high_salary_employees AS
SELECT id, name, salary
FROM employees
WHERE salary > 100000;
-- 查询视图
SELECT * FROM high_salary_employees;
-- 查看视图定义
SHOW CREATE TABLE high_salary_employees;
-- 删除视图
DROP VIEW IF EXISTS high_salary_employees;
索引
Hive索引可以加速查询:
-- 创建索引
CREATE INDEX emp_salary_idx
ON TABLE employees (salary)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
WITH DEFERRED REBUILD;
-- 重建索引
ALTER INDEX emp_salary_idx ON employees REBUILD;
-- 查看索引
SHOW INDEX ON employees;
-- 删除索引
DROP INDEX IF EXISTS emp_salary_idx ON employees;
函数
内置函数
字符串函数
-- 字符串长度
SELECT length(name) FROM employees;
-- 字符串拼接
SELECT concat(name, '-', department) FROM employees;
-- 字符串截取
SELECT substr(name, 1, 3) FROM employees;
-- 大小写转换
SELECT upper(name), lower(name) FROM employees;
-- 去除空格
SELECT trim(name) FROM employees;
-- 字符串分割
SELECT split(name, ' ') FROM employees;
-- 正则匹配
SELECT regexp_extract(name, '(\\w+)', 1) FROM employees;
日期函数
-- 当前日期
SELECT current_date();
-- 当前时间戳
SELECT current_timestamp();
-- 日期加减
SELECT date_add('2024-01-01', 7);
SELECT date_sub('2024-01-01', 7);
-- 日期差
SELECT datediff('2024-01-10', '2024-01-01');
-- 日期格式化
SELECT date_format('2024-01-01', 'yyyy-MM');
-- 提取日期部分
SELECT year('2024-01-01');
SELECT month('2024-01-01');
SELECT day('2024-01-01');
数学函数
-- 取整
SELECT round(3.14159, 2); -- 3.14
SELECT floor(3.9); -- 3
SELECT ceil(3.1); -- 4
-- 绝对值
SELECT abs(-10);
-- 幂运算
SELECT pow(2, 10);
-- 随机数
SELECT rand();
条件函数
-- IF函数
SELECT if(salary > 50000, 'High', 'Low') FROM employees;
-- NVL函数(空值处理)
SELECT nvl(bonus, 0) FROM employees;
-- COALESCE(返回第一个非空值)
SELECT coalesce(bonus, salary * 0.1, 0) FROM employees;
窗口函数
窗口函数在不减少行数的情况下进行聚合计算:
-- ROW_NUMBER:行号
SELECT
name,
salary,
ROW_NUMBER() OVER (ORDER BY salary DESC) AS rank
FROM employees;
-- RANK:排名(有并列,跳过)
SELECT
name,
salary,
RANK() OVER (ORDER BY salary DESC) AS rank
FROM employees;
-- DENSE_RANK:排名(有并列,不跳过)
SELECT
name,
salary,
DENSE_RANK() OVER (ORDER BY salary DESC) AS rank
FROM employees;
-- 分区内排名
SELECT
name,
department,
salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank
FROM employees;
-- 聚合窗口函数
SELECT
name,
salary,
SUM(salary) OVER (PARTITION BY department) AS dept_total,
AVG(salary) OVER (PARTITION BY department) AS dept_avg
FROM employees;
-- LAG/LEAD:前后行
SELECT
name,
salary,
LAG(salary, 1) OVER (ORDER BY salary) AS prev_salary,
LEAD(salary, 1) OVER (ORDER BY salary) AS next_salary
FROM employees;
-- FIRST_VALUE/LAST_VALUE
SELECT
name,
department,
salary,
FIRST_VALUE(salary) OVER (PARTITION BY department ORDER BY salary) AS min_in_dept
FROM employees;
自定义函数
// UDF:用户定义函数(一进一出)
public class MyUDF extends UDF {
public Text evaluate(Text input) {
if (input == null) return null;
return new Text(input.toString().toUpperCase());
}
}
// 注册UDF
CREATE TEMPORARY FUNCTION my_upper AS 'com.example.MyUDF';
SELECT my_upper(name) FROM employees;
Hive 优化
查询优化
-- 列裁剪:只选择需要的列
SELECT name, salary FROM employees;
-- 分区裁剪:使用分区过滤
SELECT * FROM sales WHERE year=2024 AND month=1;
-- Map Join:小表广播
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize=50000000;
-- 合并小文件
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000;
存储格式优化
-- 使用列式存储
CREATE TABLE orc_table (...) STORED AS ORC;
CREATE TABLE parquet_table (...) STORED AS PARQUET;
-- 启用压缩
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
执行引擎优化
-- 使用Tez引擎
SET hive.execution.engine=tez;
-- 使用Spark引擎
SET hive.execution.engine=spark;
并行执行
-- 启用并行执行
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=16;
小结
本章介绍了Hive的核心概念和使用方法:
- Hive架构:用户接口、元数据存储、驱动器、执行引擎
- 数据模型:数据库、表(内部表、外部表、分区表、分桶表)
- HiveQL查询:基本查询、聚合、JOIN、排序、子查询
- 数据导入导出:LOAD、INSERT、导出命令
- 函数:内置函数、窗口函数、自定义函数
- 优化:查询优化、存储优化、执行引擎优化
Hive是大数据离线分析的核心工具,掌握Hive对于构建数据仓库至关重要。在实际项目中,Hive常与Spark、Flink等计算引擎配合使用。