跳到主要内容

Hive 数据仓库

Apache Hive是构建在Hadoop之上的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。Hive让不熟悉MapReduce的开发人员也能进行大数据分析。

Hive 概述

什么是 Hive?

Hive最初由Facebook开发,用于处理海量的社交网络数据。它提供了一种类似SQL的查询语言——HiveQL(HQL),将SQL语句转换为MapReduce或Tez、Spark任务执行。

Hive的核心特点:

  1. 类SQL语法:降低大数据分析门槛
  2. 可扩展:支持自定义函数(UDF、UDAF、UDTF)
  3. 多引擎支持:可运行在MapReduce、Tez、Spark上
  4. 元数据管理:统一的元数据存储

Hive vs 传统数据库

维度Hive传统数据库
数据规模TB到PB级GB到TB级
查询延迟分钟到小时毫秒到秒
数据更新不支持频繁更新支持事务
适用场景离线分析、数据仓库OLTP事务处理
扩展性水平扩展垂直扩展为主

Hive 架构

Hive的架构主要包括以下组件:

用户接口

  • CLI:命令行接口
  • JDBC/ODBC:标准数据库连接接口
  • Web GUI:Web管理界面(如Hue)

元数据存储(Metastore)

存储Hive的元数据信息:

  • 表名、列名、数据类型
  • 表存储位置、分区信息
  • 表属性、SerDe信息

驱动器(Driver)

核心组件,负责:

  • 解析器:解析SQL语句
  • 编译器:编译生成执行计划
  • 优化器:优化执行计划
  • 执行器:执行任务

执行引擎

实际执行任务的引擎:

  • MapReduce:默认引擎
  • Tez:DAG执行引擎,性能更好
  • Spark:内存计算引擎

Hive 数据模型

数据库

Hive中的数据库是表的命名空间:

-- 创建数据库
CREATE DATABASE IF NOT EXISTS mydb
COMMENT 'My database'
LOCATION '/user/hive/warehouse/mydb.db';

-- 查看数据库
SHOW DATABASES;
DESCRIBE DATABASE mydb;

-- 使用数据库
USE mydb;

-- 删除数据库
DROP DATABASE IF EXISTS mydb CASCADE;

Hive表是数据映射的逻辑结构:

内部表(管理表)

内部表由Hive管理数据生命周期,删除表时数据也会被删除:

-- 创建内部表
CREATE TABLE IF NOT EXISTS employees (
id INT,
name STRING,
department STRING,
salary DECIMAL(10, 2)
)
COMMENT 'Employee information'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

-- 查看表结构
DESCRIBE employees;
DESCRIBE FORMATTED employees;

-- 删除表(数据也会删除)
DROP TABLE employees;

外部表

外部表只管理元数据,删除表时数据不会被删除:

-- 创建外部表
CREATE EXTERNAL TABLE IF NOT EXISTS logs (
log_time STRING,
level STRING,
message STRING
)
COMMENT 'Application logs'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION '/data/logs';

-- 删除表(数据保留)
DROP TABLE logs;

分区表

分区表将数据按分区列存储在不同目录,提高查询效率:

-- 创建分区表
CREATE TABLE IF NOT EXISTS sales (
product_id INT,
quantity INT,
amount DECIMAL(10, 2)
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET;

-- 添加分区
ALTER TABLE sales ADD PARTITION (year=2024, month=1);
ALTER TABLE sales ADD PARTITION (year=2024, month=2);

-- 查看分区
SHOW PARTITIONS sales;

-- 按分区查询(只扫描相关分区)
SELECT * FROM sales WHERE year=2024 AND month=1;

分桶表

分桶表将数据按列的哈希值分散到固定数量的文件中:

-- 创建分桶表
CREATE TABLE IF NOT EXISTS users_bucketed (
user_id INT,
username STRING,
email STRING
)
CLUSTERED BY (user_id) INTO 4 BUCKETS
STORED AS ORC;

-- 分桶表用于优化JOIN和采样

数据类型

基本数据类型

类型说明示例
TINYINT1字节整数10
SMALLINT2字节整数1000
INT4字节整数100000
BIGINT8字节整数1000000000L
FLOAT单精度浮点3.14f
DOUBLE双精度浮点3.14159
DECIMAL精确数值DECIMAL(10,2)
STRING字符串'hello'
VARCHAR可变长字符串VARCHAR(100)
CHAR定长字符串CHAR(10)
BOOLEAN布尔值true/false
DATE日期'2024-01-01'
TIMESTAMP时间戳'2024-01-01 12:00:00'

复杂数据类型

-- ARRAY:数组
CREATE TABLE array_table (
id INT,
tags ARRAY<STRING>
);

-- MAP:映射
CREATE TABLE map_table (
id INT,
properties MAP<STRING, STRING>
);

-- STRUCT:结构体
CREATE TABLE struct_table (
id INT,
address STRUCT<street:STRING, city:STRING, zip:INT>
);

-- 访问复杂类型
SELECT tags[0] FROM array_table;
SELECT properties['key'] FROM map_table;
SELECT address.city FROM struct_table;

HiveQL 查询

基本查询

-- 选择列
SELECT id, name, salary FROM employees;

-- WHERE条件
SELECT * FROM employees WHERE salary > 50000;

-- DISTINCT去重
SELECT DISTINCT department FROM employees;

-- LIMIT限制结果
SELECT * FROM employees LIMIT 10;

-- 别名
SELECT id AS emp_id, name AS emp_name FROM employees;

-- CASE WHEN
SELECT
name,
CASE
WHEN salary >= 100000 THEN 'High'
WHEN salary >= 50000 THEN 'Medium'
ELSE 'Low'
END AS salary_level
FROM employees;

聚合函数

-- 基本聚合
SELECT
COUNT(*) AS total_count,
COUNT(DISTINCT department) AS dept_count,
SUM(salary) AS total_salary,
AVG(salary) AS avg_salary,
MAX(salary) AS max_salary,
MIN(salary) AS min_salary
FROM employees;

-- GROUP BY分组
SELECT
department,
COUNT(*) AS emp_count,
AVG(salary) AS avg_salary
FROM employees
GROUP BY department;

-- HAVING过滤分组结果
SELECT
department,
COUNT(*) AS emp_count
FROM employees
GROUP BY department
HAVING COUNT(*) > 10;

JOIN 操作

-- 内连接
SELECT e.name, d.dept_name
FROM employees e
JOIN departments d ON e.department = d.dept_id;

-- 左外连接
SELECT e.name, d.dept_name
FROM employees e
LEFT JOIN departments d ON e.department = d.dept_id;

-- 右外连接
SELECT e.name, d.dept_name
FROM employees e
RIGHT JOIN departments d ON e.department = d.dept_id;

-- 全外连接
SELECT e.name, d.dept_name
FROM employees e
FULL OUTER JOIN departments d ON e.department = d.dept_id;

-- 多表连接
SELECT e.name, d.dept_name, p.project_name
FROM employees e
JOIN departments d ON e.department = d.dept_id
JOIN projects p ON e.project_id = p.project_id;

-- Map Join(小表广播)
SELECT /*+ MAPJOIN(d) */ e.name, d.dept_name
FROM employees e
JOIN departments d ON e.department = d.dept_id;

排序

-- ORDER BY:全局排序(只有一个Reducer)
SELECT * FROM employees ORDER BY salary DESC;

-- SORT BY:每个Reducer内部排序
SELECT * FROM employees SORT BY salary DESC;

-- DISTRIBUTE BY:控制分区
SELECT * FROM employees DISTRIBUTE BY department SORT BY salary DESC;

-- CLUSTER BY:等于DISTRIBUTE BY + SORT BY(同一列)
SELECT * FROM employees CLUSTER BY department;

子查询

-- WHERE子查询
SELECT * FROM employees
WHERE department IN (SELECT dept_id FROM departments WHERE location = 'Beijing');

-- FROM子查询
SELECT dept, avg_salary
FROM (
SELECT department AS dept, AVG(salary) AS avg_salary
FROM employees
GROUP BY department
) t
WHERE avg_salary > 50000;

-- WITH子句(CTE)
WITH dept_stats AS (
SELECT department, AVG(salary) AS avg_salary
FROM employees
GROUP BY department
)
SELECT * FROM dept_stats WHERE avg_salary > 50000;

数据导入导出

数据导入

-- 从本地文件导入
LOAD DATA LOCAL INPATH '/local/path/data.txt'
INTO TABLE employees;

-- 从HDFS导入
LOAD DATA INPATH '/hdfs/path/data.txt'
INTO TABLE employees;

-- 覆盖导入
LOAD DATA LOCAL INPATH '/local/path/data.txt'
OVERWRITE INTO TABLE employees;

-- INSERT导入
INSERT INTO TABLE employees VALUES (1, 'Alice', 'IT', 80000);

-- INSERT OVERWRITE覆盖
INSERT OVERWRITE TABLE employees
SELECT * FROM temp_employees;

-- 多表插入
FROM source_table
INSERT INTO TABLE target1 SELECT col1, col2
INSERT INTO TABLE target2 SELECT col3, col4;

-- 动态分区插入
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO TABLE sales PARTITION(year, month)
SELECT product_id, quantity, amount, year, month FROM temp_sales;

数据导出

-- 导出到本地
INSERT OVERWRITE LOCAL DIRECTORY '/local/path/output'
SELECT * FROM employees;

-- 导出到HDFS
INSERT OVERWRITE DIRECTORY '/hdfs/path/output'
SELECT * FROM employees;

-- 指定格式导出
INSERT OVERWRITE LOCAL DIRECTORY '/local/path/output'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
SELECT * FROM employees;

-- 使用hive命令导出
-- hive -e "SELECT * FROM employees" > output.txt

视图

视图是虚拟表,不存储数据:

-- 创建视图
CREATE VIEW IF NOT EXISTS high_salary_employees AS
SELECT id, name, salary
FROM employees
WHERE salary > 100000;

-- 查询视图
SELECT * FROM high_salary_employees;

-- 查看视图定义
SHOW CREATE TABLE high_salary_employees;

-- 删除视图
DROP VIEW IF EXISTS high_salary_employees;

索引

Hive索引可以加速查询:

-- 创建索引
CREATE INDEX emp_salary_idx
ON TABLE employees (salary)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
WITH DEFERRED REBUILD;

-- 重建索引
ALTER INDEX emp_salary_idx ON employees REBUILD;

-- 查看索引
SHOW INDEX ON employees;

-- 删除索引
DROP INDEX IF EXISTS emp_salary_idx ON employees;

函数

内置函数

字符串函数

-- 字符串长度
SELECT length(name) FROM employees;

-- 字符串拼接
SELECT concat(name, '-', department) FROM employees;

-- 字符串截取
SELECT substr(name, 1, 3) FROM employees;

-- 大小写转换
SELECT upper(name), lower(name) FROM employees;

-- 去除空格
SELECT trim(name) FROM employees;

-- 字符串分割
SELECT split(name, ' ') FROM employees;

-- 正则匹配
SELECT regexp_extract(name, '(\\w+)', 1) FROM employees;

日期函数

-- 当前日期
SELECT current_date();

-- 当前时间戳
SELECT current_timestamp();

-- 日期加减
SELECT date_add('2024-01-01', 7);
SELECT date_sub('2024-01-01', 7);

-- 日期差
SELECT datediff('2024-01-10', '2024-01-01');

-- 日期格式化
SELECT date_format('2024-01-01', 'yyyy-MM');

-- 提取日期部分
SELECT year('2024-01-01');
SELECT month('2024-01-01');
SELECT day('2024-01-01');

数学函数

-- 取整
SELECT round(3.14159, 2); -- 3.14
SELECT floor(3.9); -- 3
SELECT ceil(3.1); -- 4

-- 绝对值
SELECT abs(-10);

-- 幂运算
SELECT pow(2, 10);

-- 随机数
SELECT rand();

条件函数

-- IF函数
SELECT if(salary > 50000, 'High', 'Low') FROM employees;

-- NVL函数(空值处理)
SELECT nvl(bonus, 0) FROM employees;

-- COALESCE(返回第一个非空值)
SELECT coalesce(bonus, salary * 0.1, 0) FROM employees;

窗口函数

窗口函数在不减少行数的情况下进行聚合计算:

-- ROW_NUMBER:行号
SELECT
name,
salary,
ROW_NUMBER() OVER (ORDER BY salary DESC) AS rank
FROM employees;

-- RANK:排名(有并列,跳过)
SELECT
name,
salary,
RANK() OVER (ORDER BY salary DESC) AS rank
FROM employees;

-- DENSE_RANK:排名(有并列,不跳过)
SELECT
name,
salary,
DENSE_RANK() OVER (ORDER BY salary DESC) AS rank
FROM employees;

-- 分区内排名
SELECT
name,
department,
salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS dept_rank
FROM employees;

-- 聚合窗口函数
SELECT
name,
salary,
SUM(salary) OVER (PARTITION BY department) AS dept_total,
AVG(salary) OVER (PARTITION BY department) AS dept_avg
FROM employees;

-- LAG/LEAD:前后行
SELECT
name,
salary,
LAG(salary, 1) OVER (ORDER BY salary) AS prev_salary,
LEAD(salary, 1) OVER (ORDER BY salary) AS next_salary
FROM employees;

-- FIRST_VALUE/LAST_VALUE
SELECT
name,
department,
salary,
FIRST_VALUE(salary) OVER (PARTITION BY department ORDER BY salary) AS min_in_dept
FROM employees;

自定义函数

// UDF:用户定义函数(一进一出)
public class MyUDF extends UDF {
public Text evaluate(Text input) {
if (input == null) return null;
return new Text(input.toString().toUpperCase());
}
}

// 注册UDF
CREATE TEMPORARY FUNCTION my_upper AS 'com.example.MyUDF';
SELECT my_upper(name) FROM employees;

Hive 优化

查询优化

-- 列裁剪:只选择需要的列
SELECT name, salary FROM employees;

-- 分区裁剪:使用分区过滤
SELECT * FROM sales WHERE year=2024 AND month=1;

-- Map Join:小表广播
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize=50000000;

-- 合并小文件
SET hive.merge.mapfiles=true;
SET hive.merge.mapredfiles=true;
SET hive.merge.size.per.task=256000000;

存储格式优化

-- 使用列式存储
CREATE TABLE orc_table (...) STORED AS ORC;
CREATE TABLE parquet_table (...) STORED AS PARQUET;

-- 启用压缩
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

执行引擎优化

-- 使用Tez引擎
SET hive.execution.engine=tez;

-- 使用Spark引擎
SET hive.execution.engine=spark;

并行执行

-- 启用并行执行
SET hive.exec.parallel=true;
SET hive.exec.parallel.thread.number=16;

小结

本章介绍了Hive的核心概念和使用方法:

  1. Hive架构:用户接口、元数据存储、驱动器、执行引擎
  2. 数据模型:数据库、表(内部表、外部表、分区表、分桶表)
  3. HiveQL查询:基本查询、聚合、JOIN、排序、子查询
  4. 数据导入导出:LOAD、INSERT、导出命令
  5. 函数:内置函数、窗口函数、自定义函数
  6. 优化:查询优化、存储优化、执行引擎优化

Hive是大数据离线分析的核心工具,掌握Hive对于构建数据仓库至关重要。在实际项目中,Hive常与Spark、Flink等计算引擎配合使用。