跳到主要内容

HBase 分布式数据库

Apache HBase是一个分布式、可扩展、面向列的NoSQL数据库,基于Google BigTable论文实现,运行在HDFS之上,提供海量数据的实时随机读写能力。

HBase 概述

什么是 HBase?

HBase(Hadoop Database)是Apache Hadoop项目的子项目,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。它适合存储PB级别的海量数据,并能在毫秒级别完成随机读写操作。

HBase的核心特点:

  1. 海量存储:支持数十亿行、数百万列的大表
  2. 实时读写:毫秒级的随机读写响应
  3. 稀疏存储:空值不占用存储空间
  4. 高可靠性:数据多副本存储,自动故障转移
  5. 可扩展:支持在线扩容,线性扩展

HBase vs 关系型数据库

维度HBase关系型数据库
数据模型列族存储行存储
Schema灵活,列可动态增加固定,需预定义
事务单行事务ACID事务
索引只有行键索引多种索引
JOIN不支持支持
扩展性水平扩展垂直扩展为主
适用场景海量数据随机读写复杂事务处理

HBase 应用场景

场景说明
用户画像存储用户属性和行为标签
消息存储即时通讯消息、社交动态
时序数据IoT设备数据、监控指标
日志存储应用日志、访问日志
推荐系统用户行为特征存储

HBase 数据模型

逻辑视图

HBase的数据模型可以理解为多维排序的Map:

(RowKey, Column Family, Column Qualifier, Timestamp) -> Value

关键概念

概念说明
RowKey行键,唯一标识一行数据,按字典序排序
Column Family列族,列的逻辑分组,需预先定义
Column Qualifier列限定符,列族内的具体列,可动态添加
Timestamp时间戳,标识数据的版本,默认按时间倒序
Cell单元格,由RowKey、列族、列限定符、时间戳唯一确定

数据示例

假设有一个用户表user,包含列族infodata

RowKeyColumn FamilyColumn QualifierTimestampValue
user001infoname1704067200000Alice
user001infoage170406720000025
user001infocity1704067200000Beijing
user001datalogin_count1704067200000100
user001datalogin_count170398080000095

物理存储

Region

Region是HBase数据分片的基本单位:

  • 表按RowKey范围划分为多个Region
  • 每个Region由一个RegionServer管理
  • Region大小达到阈值时自动分裂

Store

每个Region由多个Store组成,每个Store对应一个列族:

  • Store由MemStore和多个HFile组成
  • MemStore是内存中的写缓存
  • HFile是磁盘上的数据文件

HFile

HFile是HBase的数据存储格式:

  • 基于LSM树设计
  • 数据按RowKey排序
  • 支持快速查找

HBase 架构

系统架构

HBase采用主从架构,主要组件包括:

HMaster

HMaster是主节点,负责:

  • 表管理:创建、删除、修改表
  • Region分配:将Region分配给RegionServer
  • 负载均衡:调整Region分布
  • 故障恢复:处理RegionServer故障

HRegionServer

HRegionServer是工作节点,负责:

  • 数据处理:处理客户端的读写请求
  • Region管理:管理分配给自己的Region
  • 数据持久化:将MemStore刷写到HFile
  • 合并压缩:合并小的HFile

ZooKeeper

ZooKeeper是协调服务,负责:

  • 集群管理:维护集群状态
  • Master选举:HMaster高可用
  • 元数据位置:存储meta表位置
  • 故障检测:监控RegionServer状态

数据读写流程

写入流程

  1. Client向ZooKeeper获取meta表位置
  2. 查询meta表获取目标RegionServer
  3. 向RegionServer发送写请求
  4. 数据写入WAL(预写日志)
  5. 数据写入MemStore
  6. 返回写入成功

读取流程

  1. Client向ZooKeeper获取meta表位置
  2. 查询meta表获取目标RegionServer
  3. 向RegionServer发送读请求
  4. 查询BlockCache(读缓存)
  5. 查询MemStore
  6. 查询HFile
  7. 合并结果返回

HBase Shell 操作

表操作

# 进入HBase Shell
hbase shell

# 创建表
create 'user', 'info', 'data'

# 创建表(指定版本数)
create 'user', {NAME => 'info', VERSIONS => 3}, {NAME => 'data', VERSIONS => 5}

# 查看所有表
list

# 查看表结构
describe 'user'

# 禁用表
disable 'user'

# 启用表
enable 'user'

# 删除表(需先禁用)
disable 'user'
drop 'user'

# 修改表结构
disable 'user'
alter 'user', {NAME => 'info', VERSIONS => 5}
enable 'user'

数据操作

# 插入数据
put 'user', 'user001', 'info:name', 'Alice'
put 'user', 'user001', 'info:age', '25'
put 'user', 'user001', 'info:city', 'Beijing'
put 'user', 'user001', 'data:login_count', '100'

# 获取单行数据
get 'user', 'user001'

# 获取指定列族
get 'user', 'user001', 'info'

# 获取指定列
get 'user', 'user001', 'info:name'

# 获取多个版本
get 'user', 'user001', {COLUMN => 'info:name', VERSIONS => 3}

# 扫描全表
scan 'user'

# 扫描指定范围
scan 'user', {STARTROW => 'user001', STOPROW => 'user003'}

# 扫描指定列
scan 'user', {COLUMNS => ['info:name', 'info:age']}

# 过滤器查询
scan 'user', {FILTER => "ValueFilter(=, 'binary:Alice')"}

# 删除数据
delete 'user', 'user001', 'info:age'

# 删除整行
deleteall 'user', 'user001'

# 统计行数
count 'user'

# 清空表
truncate 'user'

常用过滤器

# 值过滤器
scan 'user', {FILTER => "ValueFilter(=, 'substring:Ali')"}

# 行键过滤器
scan 'user', {FILTER => "RowFilter(=, 'substring:user')"}

# 列名过滤器
scan 'user', {FILTER => "QualifierFilter(=, 'substring:name')"}

# 比较过滤器
scan 'user', {FILTER => "SingleColumnValueFilter('info', 'age', >, 'binary:20')"}

# 组合过滤器
scan 'user', {FILTER => "SingleColumnValueFilter('info', 'age', >, 'binary:20') AND SingleColumnValueFilter('info', 'city', =, 'binary:Beijing')"}

Java API 操作

连接 HBase

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;

// 创建配置
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "localhost");
config.set("hbase.zookeeper.property.clientPort", "2181");

// 创建连接
Connection connection = ConnectionFactory.createConnection(config);

// 获取Admin对象
Admin admin = connection.getAdmin();

// 获取Table对象
Table table = connection.getTable(TableName.valueOf("user"));

表管理

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;

// 创建表
TableName tableName = TableName.valueOf("user");
if (!admin.tableExists(tableName)) {
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(tableName);
tableBuilder.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.setMaxVersions(3)
.build()
);
tableBuilder.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("data"))
.setMaxVersions(5)
.build()
);
admin.createTable(tableBuilder.build());
}

// 删除表
admin.disableTable(tableName);
admin.deleteTable(tableName);

// 判断表是否存在
boolean exists = admin.tableExists(tableName);

// 判断表是否启用
boolean enabled = admin.isTableEnabled(tableName);

数据写入

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Put;

// 单条写入
Put put = new Put(Bytes.toBytes("user001"));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Alice"));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("25"));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("city"), Bytes.toBytes("Beijing"));
table.put(put);

// 批量写入
List<Put> puts = new ArrayList<>();
Put put1 = new Put(Bytes.toBytes("user001"));
put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Alice"));
puts.add(put1);

Put put2 = new Put(Bytes.toBytes("user002"));
put2.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("Bob"));
puts.add(put2);

table.put(puts);

数据读取

import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;

// 单行读取
Get get = new Get(Bytes.toBytes("user001"));
get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
Result result = table.get(get);

// 获取值
byte[] nameBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"));
String name = Bytes.toString(nameBytes);

// 遍历所有列
for (Cell cell : result.rawCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(family + ":" + qualifier + " = " + value);
}

// 扫描查询
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes("user001"));
scan.setStopRow(Bytes.toBytes("user003"));
scan.addFamily(Bytes.toBytes("info"));

ResultScanner scanner = table.getScanner(scan);
for (Result r : scanner) {
String rowKey = Bytes.toString(r.getRow());
String name = Bytes.toString(r.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")));
System.out.println(rowKey + ": " + name);
}
scanner.close();

数据删除

import org.apache.hadoop.hbase.client.Delete;

// 删除指定列
Delete delete = new Delete(Bytes.toBytes("user001"));
delete.addColumns(Bytes.toBytes("info"), Bytes.toBytes("age"));
table.delete(delete);

// 删除整行
Delete deleteAll = new Delete(Bytes.toBytes("user001"));
table.delete(deleteAll);

RowKey 设计

RowKey设计是HBase性能优化的关键,好的RowKey设计可以提高查询效率并避免热点问题。

设计原则

  1. 唯一性:RowKey必须唯一标识一行数据
  2. 长度适中:建议16-50字节
  3. 散列分布:避免数据倾斜
  4. 有序性:利用排序特性优化查询

常见设计模式

反转时间戳

适用于按时间范围查询的场景:

// 原始:userId + timestamp
// 问题:时间递增导致数据写入热点

// 优化:反转时间戳
long reverseTimestamp = Long.MAX_VALUE - System.currentTimeMillis();
String rowKey = userId + "_" + reverseTimestamp;

加盐

在RowKey前添加随机前缀,分散写入:

// 添加随机前缀
int salt = Math.abs(rowKey.hashCode()) % buckets;
String saltedRowKey = salt + "_" + rowKey;

哈希

使用哈希值作为前缀:

// MD5哈希前缀
String hash = MD5Hash.digest(rowKey).toString().substring(0, 4);
String hashedRowKey = hash + "_" + rowKey;

组合键

多个字段组合成RowKey:

// 用户ID + 日期 + 序号
String rowKey = userId + "_" + date + "_" + sequence;

HBase 优化

预分区

建表时预先创建多个Region,避免单Region热点:

# 指定分区点
create 'user', 'info', SPLITS => ['10', '20', '30', '40']

# 使用分区文件
create 'user', 'info', SPLITS_FILE => '/path/to/splits.txt'
// Java API预分区
byte[][] splitKeys = new byte[4][];
splitKeys[0] = Bytes.toBytes("10");
splitKeys[1] = Bytes.toBytes("20");
splitKeys[2] = Bytes.toBytes("30");
splitKeys[3] = Bytes.toBytes("40");
admin.createTable(tableDescriptor, splitKeys);

列族设计

// 列族配置优化
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.setMaxVersions(3) // 版本数
.setMinVersions(1) // 最小版本数
.setTTL(86400) // 生存时间(秒)
.setBlocksize(65536) // 块大小
.setBlockCacheEnabled(true) // 启用块缓存
.setBloomFilterType(BloomType.ROW) // 布隆过滤器类型
.setCompressionType(Compression.Algorithm.SNAPPY) // 压缩算法
.build();

内存优化

# 调整MemStore大小
hbase.regionserver.global.memstore.size=0.4

# 调整BlockCache大小
hfile.block.cache.size=0.4

# MemStore刷写阈值
hbase.hregion.memstore.flush.size=128m

压缩优化

# 启用压缩
create 'user', {NAME => 'info', COMPRESSION => 'SNAPPY'}

# 常用压缩算法
# SNAPPY:压缩比中等,速度快
# GZIP:压缩比高,速度慢
# LZO:压缩比低,速度最快
# LZ4:压缩比中等,速度快

HBase 与其他系统集成

与 MapReduce 集成

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;

// MapReduce读取HBase
Configuration config = HBaseConfiguration.create();
Job job = Job.getInstance(config, "HBaseMR");

Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(
"user", // 输入表
scan, // Scan实例
MyMapper.class, // Mapper类
Text.class, // 输出Key类型
IntWritable.class, // 输出Value类型
job
);

TableMapReduceUtil.initTableReducerJob(
"output", // 输出表
MyReducer.class, // Reducer类
job
);

与 Spark 集成

import org.apache.hadoop.hbase.spark.HBaseContext

val spark = SparkSession.builder().appName("HBaseSpark").getOrCreate()
val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum", "localhost")

val hbaseContext = new HBaseContext(spark.sparkContext, config)

// 批量读取
val rdd = hbaseContext.hbaseRDD(TableName.valueOf("user"), new Scan())

// 批量写入
hbaseContext.bulkPut(rdd, TableName.valueOf("output"), put => put)

与 Phoenix 集成

Phoenix是HBase的SQL层,提供SQL查询能力:

-- 创建表
CREATE TABLE user (
id VARCHAR PRIMARY KEY,
info.name VARCHAR,
info.age INTEGER,
info.city VARCHAR
);

-- 插入数据
UPSERT INTO user VALUES('user001', 'Alice', 25, 'Beijing');

-- 查询数据
SELECT * FROM user WHERE age > 20;

-- 创建索引
CREATE INDEX idx_age ON user(info.age);

小结

本章介绍了HBase的核心概念和使用方法:

  1. 数据模型:RowKey、列族、列限定符、时间戳、单元格
  2. 架构组件:HMaster、HRegionServer、ZooKeeper
  3. Shell操作:表操作、数据操作、过滤器
  4. Java API:连接、表管理、数据读写
  5. RowKey设计:反转时间戳、加盐、哈希、组合键
  6. 性能优化:预分区、列族设计、内存优化、压缩

HBase是大数据场景下实时随机读写的首选数据库,掌握HBase对于构建实时数据应用至关重要。在实际项目中,HBase常与Spark、Flink、Phoenix等组件配合使用。