HBase 分布式数据库
Apache HBase是一个分布式、可扩展、面向列的NoSQL数据库,基于Google BigTable论文实现,运行在HDFS之上,提供海量数据的实时随机读写能力。
HBase 概述
什么是 HBase?
HBase(Hadoop Database)是Apache Hadoop项目的子项目,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。它适合存储PB级别的海量数据,并能在毫秒级别完成随机读写操作。
HBase的核心特点:
- 海量存储:支持数十亿行、数百万列的大表
- 实时读写:毫秒级的随机读写响应
- 稀疏存储:空值不占用存储空间
- 高可靠性:数据多副本存储,自动故障转移
- 可扩展:支持在线扩容,线性扩展
HBase vs 关系型数据库
| 维度 | HBase | 关系型数据库 |
|---|---|---|
| 数据模型 | 列族存储 | 行存储 |
| Schema | 灵活,列可动态增加 | 固定,需预定义 |
| 事务 | 单行事务 | ACID事务 |
| 索引 | 只有行键索引 | 多种索引 |
| JOIN | 不支持 | 支持 |
| 扩展性 | 水平扩展 | 垂直扩展为主 |
| 适用场景 | 海量数据随机读写 | 复杂事务处理 |
HBase 应用场景
| 场景 | 说明 |
|---|---|
| 用户画像 | 存储用户属性和行为标签 |
| 消息存储 | 即时通讯消息、社交动态 |
| 时序数据 | IoT设备数据、监控指标 |
| 日志存储 | 应用日志、访问日志 |
| 推荐系统 | 用户行为特征存储 |
HBase 数据模型
逻辑视图
HBase的数据模型可以理解为多维排序的Map:
(RowKey, Column Family, Column Qualifier, Timestamp) -> Value
关键概念
| 概念 | 说明 |
|---|---|
| RowKey | 行键,唯一标识一行数据,按字典序排序 |
| Column Family | 列族,列的逻辑分组,需预先定义 |
| Column Qualifier | 列限定符,列族内的具体列,可动态添加 |
| Timestamp | 时间戳,标识数据的版本,默认按时间倒序 |
| Cell | 单元格,由RowKey、列族、列限定符、时间戳唯一确定 |
数据示例
假设有一个用户表user,包含列族info和data:
| RowKey | Column Family | Column Qualifier | Timestamp | Value |
|---|---|---|---|---|
| user001 | info | name | 1704067200000 | Alice |
| user001 | info | age | 1704067200000 | 25 |
| user001 | info | city | 1704067200000 | Beijing |
| user001 | data | login_count | 1704067200000 | 100 |
| user001 | data | login_count | 1703980800000 | 95 |
物理存储
Region
Region是HBase数据分片的基本单位:
- 表按RowKey范围划分为多个Region
- 每个Region由一个RegionServer管理
- Region大小达到阈值时自动分裂
Store
每个Region由多个Store组成,每个Store对应一个列族:
- Store由MemStore和多个HFile组成
- MemStore是内存中的写缓存
- HFile是磁盘上的数据文件
HFile
HFile是HBase的数据存储格式:
- 基于LSM树设计
- 数据按RowKey排序
- 支持快速查找
HBase 架构
系统架构
HBase采用主从架构,主要组件包括:
HMaster
HMaster是主节点,负责:
- 表管理:创建、删除、修改表
- Region分配:将Region分配给RegionServer
- 负载均衡:调整Region分布
- 故障恢复:处理RegionServer故障
HRegionServer
HRegionServer是工作节点,负责:
- 数据处理:处理客户端的读写请求
- Region管理:管理分配给自己的Region
- 数据持久化:将MemStore刷写到HFile
- 合并压缩:合并小的HFile
ZooKeeper
ZooKeeper是协调服务,负责:
- 集群管理:维护集群状态
- Master选举:HMaster高可用
- 元数据位置:存储meta表位置
- 故障检测:监控RegionServer状态
数据读写流程
写入流程
- Client向ZooKeeper获取meta表位置
- 查询meta表获取目标RegionServer
- 向RegionServer发送写请求
- 数据写入WAL(预写日志)
- 数据写入MemStore
- 返回写入成功
读取流程
- Client向ZooKeeper获取meta表位置
- 查询meta表获取目标RegionServer
- 向RegionServer发送读请求
- 查询BlockCache(读缓存)
- 查询MemStore
- 查询HFile
- 合并结果返回
HBase Shell 操作
表操作
# 进入HBase Shell
hbase shell
# 创建表
create 'user', 'info', 'data'
# 创建表(指定版本数)
create 'user', {NAME => 'info', VERSIONS => 3}, {NAME => 'data', VERSIONS => 5}
# 查看所有表
list
# 查看表结构
describe 'user'
# 禁用表
disable 'user'
# 启用表
enable 'user'
# 删除表(需先禁用)
disable 'user'
drop 'user'
# 修改表结构
disable 'user'
alter 'user', {NAME => 'info', VERSIONS => 5}
enable 'user'
数据操作
# 插入数据
put 'user', 'user001', 'info:name', 'Alice'
put 'user', 'user001', 'info:age', '25'
put 'user', 'user001', 'info:city', 'Beijing'
put 'user', 'user001', 'data:login_count', '100'
# 获取单行数据
get 'user', 'user001'
# 获取指定列族
get 'user', 'user001', 'info'
# 获取指定列
get 'user', 'user001', 'info:name'
# 获取多个版本
get 'user', 'user001', {COLUMN => 'info:name', VERSIONS => 3}
# 扫描全表
scan 'user'
# 扫描指定范围
scan 'user', {STARTROW => 'user001', STOPROW => 'user003'}
# 扫描指定列
scan 'user', {COLUMNS => ['info:name', 'info:age']}
# 过滤器查询
scan 'user', {FILTER => "ValueFilter(=, 'binary:Alice')"}
# 删除数据
delete 'user', 'user001', 'info:age'
# 删除整行
deleteall 'user', 'user001'
# 统计行数
count 'user'
# 清空表
truncate 'user'
常用过滤器
# 值过滤器
scan 'user', {FILTER => "ValueFilter(=, 'substring:Ali')"}
# 行键过滤器
scan 'user', {FILTER => "RowFilter(=, 'substring:user')"}
# 列名过滤器
scan 'user', {FILTER => "QualifierFilter(=, 'substring:name')"}
# 比较过滤器
scan 'user', {FILTER => "SingleColumnValueFilter('info', 'age', >, 'binary:20')"}
# 组合过滤器
scan 'user', {FILTER => "SingleColumnValueFilter('info', 'age', >, 'binary:20') AND SingleColumnValueFilter('info', 'city', =, 'binary:Beijing')"}
Java API 操作
连接 HBase
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
// 创建配置
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "localhost");
config.set("hbase.zookeeper.property.clientPort", "2181");
// 创建连接
Connection connection = ConnectionFactory.createConnection(config);
// 获取Admin对象
Admin admin = connection.getAdmin();
// 获取Table对象
Table table = connection.getTable(TableName.valueOf("user"));
表管理
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
// 创建表
TableName tableName = TableName.valueOf("user");
if (!admin.tableExists(tableName)) {
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(tableName);
tableBuilder.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.setMaxVersions(3)
.build()
);
tableBuilder.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("data"))
.setMaxVersions(5)
.build()
);
admin.createTable(tableBuilder.build());
}
// 删除表
admin.disableTable(tableName);
admin.deleteTable(tableName);
// 判断表是否存在
boolean exists = admin.tableExists(tableName);
// 判断表是否启用
boolean enabled = admin.isTableEnabled(tableName);
数据写入
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Put;
// 单条写入
Put put = new Put(Bytes.toBytes("user001"));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Alice"));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("25"));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("city"), Bytes.toBytes("Beijing"));
table.put(put);
// 批量写入
List<Put> puts = new ArrayList<>();
Put put1 = new Put(Bytes.toBytes("user001"));
put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Alice"));
puts.add(put1);
Put put2 = new Put(Bytes.toBytes("user002"));
put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Bob"));
puts.add(put2);
table.put(puts);
数据读取
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;
// 单行读取
Get get = new Get(Bytes.toBytes("user001"));
get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
Result result = table.get(get);
// 获取值
byte[] nameBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"));
String name = Bytes.toString(nameBytes);
// 遍历所有列
for (Cell cell : result.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(family + ":" + qualifier + " = " + value);
}
// 扫描查询
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes("user001"));
scan.setStopRow(Bytes.toBytes("user003"));
scan.addFamily(Bytes.toBytes("info"));
ResultScanner scanner = table.getScanner(scan);
for (Result r : scanner) {
String rowKey = Bytes.toString(r.getRow());
String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
System.out.println(rowKey + ": " + name);
}
scanner.close();
数据删除
import org.apache.hadoop.hbase.client.Delete;
// 删除指定列
Delete delete = new Delete(Bytes.toBytes("user001"));
delete.addColumns(Bytes.toBytes("info"), Bytes.toBytes("age"));
table.delete(delete);
// 删除整行
Delete deleteAll = new Delete(Bytes.toBytes("user001"));
table.delete(deleteAll);
RowKey 设计
RowKey设计是HBase性能优化的关键,好的RowKey设计可以提高查询效率并避免热点问题。
设计原则
- 唯一性:RowKey必须唯一标识一行数据
- 长度适中:建议16-50字节
- 散列分布:避免数据倾斜
- 有序性:利用排序特性优化查询
常见设计模式
反转时间戳
适用于按时间范围查询的场景:
// 原始:userId + timestamp
// 问题:时间递增导致数据写入热点
// 优化:反转时间戳
long reverseTimestamp = Long.MAX_VALUE - System.currentTimeMillis();
String rowKey = userId + "_" + reverseTimestamp;
加盐
在RowKey前添加随机前缀,分散写入:
// 添加随机前缀
int salt = Math.abs(rowKey.hashCode()) % buckets;
String saltedRowKey = salt + "_" + rowKey;
哈希
使用哈希值作为前缀:
// MD5哈希前缀
String hash = MD5Hash.digest(rowKey).toString().substring(0, 4);
String hashedRowKey = hash + "_" + rowKey;
组合键
多个字段组合成RowKey:
// 用户ID + 日期 + 序号
String rowKey = userId + "_" + date + "_" + sequence;
HBase 优化
预分区
建表时预先创建多个Region,避免单Region热点:
# 指定分区点
create 'user', 'info', SPLITS => ['10', '20', '30', '40']
# 使用分区文件
create 'user', 'info', SPLITS_FILE => '/path/to/splits.txt'
// Java API预分区
byte[][] splitKeys = new byte[4][];
splitKeys[0] = Bytes.toBytes("10");
splitKeys[1] = Bytes.toBytes("20");
splitKeys[2] = Bytes.toBytes("30");
splitKeys[3] = Bytes.toBytes("40");
admin.createTable(tableDescriptor, splitKeys);
列族设计
// 列族配置优化
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.setMaxVersions(3) // 版本数
.setMinVersions(1) // 最小版本数
.setTTL(86400) // 生存时间(秒)
.setBlocksize(65536) // 块大小
.setBlockCacheEnabled(true) // 启用块缓存
.setBloomFilterType(BloomType.ROW) // 布隆过滤器类型
.setCompressionType(Compression.Algorithm.SNAPPY) // 压缩算法
.build();
内存优化
# 调整MemStore大小
hbase.regionserver.global.memstore.size=0.4
# 调整BlockCache大小
hfile.block.cache.size=0.4
# MemStore刷写阈值
hbase.hregion.memstore.flush.size=128m
压缩优化
# 启用压缩
create 'user', {NAME => 'info', COMPRESSION => 'SNAPPY'}
# 常用压缩算法
# SNAPPY:压缩比中等,速度快
# GZIP:压缩比高,速度慢
# LZO:压缩比低,速度最快
# LZ4:压缩比中等,速度快
HBase 与其他系统集成
与 MapReduce 集成
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
// MapReduce读取HBase
Configuration config = HBaseConfiguration.create();
Job job = Job.getInstance(config, "HBaseMR");
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(
"user", // 输入表
scan, // Scan实例
MyMapper.class, // Mapper类
Text.class, // 输出Key类型
IntWritable.class, // 输出Value类型
job
);
TableMapReduceUtil.initTableReducerJob(
"output", // 输出表
MyReducer.class, // Reducer类
job
);
与 Spark 集成
import org.apache.hadoop.hbase.spark.HBaseContext
val spark = SparkSession.builder().appName("HBaseSpark").getOrCreate()
val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum", "localhost")
val hbaseContext = new HBaseContext(spark.sparkContext, config)
// 批量读取
val rdd = hbaseContext.hbaseRDD(TableName.valueOf("user"), new Scan())
// 批量写入
hbaseContext.bulkPut(rdd, TableName.valueOf("output"), put => put)
与 Phoenix 集成
Phoenix是HBase的SQL层,提供SQL查询能力:
-- 创建表
CREATE TABLE user (
id VARCHAR PRIMARY KEY,
info.name VARCHAR,
info.age INTEGER,
info.city VARCHAR
);
-- 插入数据
UPSERT INTO user VALUES('user001', 'Alice', 25, 'Beijing');
-- 查询数据
SELECT * FROM user WHERE age > 20;
-- 创建索引
CREATE INDEX idx_age ON user(info.age);
小结
本章介绍了HBase的核心概念和使用方法:
- 数据模型:RowKey、列族、列限定符、时间戳、单元格
- 架构组件:HMaster、HRegionServer、ZooKeeper
- Shell操作:表操作、数据操作、过滤器
- Java API:连接、表管理、数据读写
- RowKey设计:反转时间戳、加盐、哈希、组合键
- 性能优化:预分区、列族设计、内存优化、压缩
HBase是大数据场景下实时随机读写的首选数据库,掌握HBase对于构建实时数据应用至关重要。在实际项目中,HBase常与Spark、Flink、Phoenix等组件配合使用。