HBase 分布式数据库
Apache HBase 是一个分布式、可扩展、面向列的 NoSQL 数据库,基于 Google BigTable 论文实现,运行在 HDFS 之上,提供海量数据的实时随机读写能力。
HBase 概述
什么是 HBase?
HBase(Hadoop Database)是 Apache Hadoop 项目的子项目,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。它适合存储 PB 级别的海量数据,并能在毫秒级别完成随机读写操作。
HBase 的设计目标是解决传统关系型数据库在处理海量数据时的瓶颈问题。传统数据库在面对数十亿行、数百万列的大表时,往往会遇到性能瓶颈、扩展困难等问题。HBase 通过分布式架构和列式存储,能够轻松处理这类大规模数据场景。
HBase 的核心特点:
- 海量存储:支持数十亿行、数百万列的大表,单表可达 PB 级别
- 实时读写:毫秒级的随机读写响应,适合实时查询场景
- 稀疏存储:空值(null)不占用存储空间,节省存储资源
- 高可靠性:数据多副本存储(基于 HDFS),自动故障转移
- 可扩展:支持在线扩容,线性扩展性能
- 无模式:列可以动态添加,同一表中不同行可以有不同的列
HBase vs 关系型数据库
理解 HBase 与传统关系型数据库的区别,有助于正确选择技术方案:
| 维度 | HBase | 关系型数据库 |
|---|---|---|
| 数据模型 | 列族存储,稀疏矩阵 | 行存储,固定模式 |
| Schema | 灵活,列可动态增加 | 固定,需预定义所有列 |
| 事务 | 单行事务(行级原子性) | ACID 事务(跨行、跨表) |
| 索引 | 只有行键(RowKey)索引 | 多种索引(B+树、哈希等) |
| JOIN | 不支持,需应用层实现 | 支持,优化器自动优化 |
| SQL | 不原生支持(需 Phoenix) | 原生 SQL 支持 |
| 扩展性 | 水平扩展,线性增长 | 垂直扩展为主,有限水平扩展 |
| 数据量 | TB 到 PB 级别 | GB 到 TB 级别 |
| 适用场景 | 海量数据随机读写、时序数据 | 复杂事务处理、结构化数据 |
选择建议:
- 选择 HBase:海量数据(TB/PB 级)、需要随机读写、数据结构灵活、写入密集型场景
- 选择关系型数据库:需要复杂事务、复杂查询(JOIN、子查询)、数据量可控、查询模式多样
HBase 应用场景
| 场景 | 说明 | 典型应用 |
|---|---|---|
| 用户画像 | 存储用户属性和行为标签,支持实时查询 | 电商、社交平台的用户画像系统 |
| 消息存储 | 存储即时通讯消息、社交动态,支持时间范围查询 | 微信、微博等消息系统 |
| 时序数据 | 存储 IoT 设备数据、监控指标,支持高效写入 | 工业物联网、监控告警系统 |
| 日志存储 | 存储应用日志、访问日志,支持快速检索 | 日志分析平台 |
| 推荐系统 | 存储用户行为特征,支持实时特征提取 | 电商推荐、广告投放系统 |
| 图谱数据 | 存储社交关系、知识图谱的边数据 | 社交网络、知识图谱 |
HBase 与 BigTable 的关系
HBase 是 Google BigTable 论文的开源实现。理解 BigTable 的设计理念有助于更好地使用 HBase:
BigTable 论文的核心思想:
- 分布式多维排序映射:数据按(行键、列族、列限定符、时间戳)四元组组织
- LSM-Tree 存储模型:写入先入内存,再批量刷写到磁盘,优化写入性能
- 区域划分:表按行键范围划分为多个区域(Region),分布式存储
- 列族存储:同一列族的数据存储在一起,提高查询效率
HBase 版本演进
| 版本 | 发布时间 | 重要特性 |
|---|---|---|
| HBase 0.x | 2008-2012 | 基础分布式存储、HDFS 集成 |
| HBase 1.x | 2015 | Phoenix 集成增强、协处理器改进 |
| HBase 2.x | 2018 | Procedure Store、Region 分配优化 |
| HBase 2.4 | 2022 | 多集群客户端支持、META 副本增强 |
| HBase 3.0 | 2024+ | Dual File Compaction、可靠性改进 |
HBase 3.0 重要更新:
- 新的 Dual File Compaction 算法
- Master 过程执行可靠性改进
- 更好的与 Hadoop 3.x 集成
- 性能和稳定性优化
HBase 数据模型
理解 HBase 的数据模型是使用 HBase 的基础。HBase 的数据模型与关系型数据库有本质区别。
逻辑视图
HBase 的数据模型可以理解为多维排序的映射(Map):
(RowKey, Column Family, Column Qualifier, Timestamp) → Value
关键概念详解
RowKey(行键)
RowKey 是数据的主键,具有以下特点:
- 唯一标识:每个 RowKey 唯一标识一行数据
- 字典序排序:数据按 RowKey 的字典序存储,影响查询性能
- 不可修改:RowKey 一旦设置不能修改,只能删除重建
- 最大长度:理论无限制,但建议 16-50 字节
- 设计关键:RowKey 的设计直接影响查询性能和数据分布
# RowKey 示例
user001 # 用户ID
20240101_user001 # 日期 + 用户ID(适合时间范围查询)
md5_user001 # 哈希后的用户ID(分散热点)
Column Family(列族)
列族是列的逻辑分组,具有以下特点:
- 预定义:建表时必须指定列族,不能动态添加
- 存储分离:不同列族的数据存储在不同目录中
- 配置独立:每个列族可以独立配置压缩、版本数、TTL 等
- 数量限制:建议不超过 2-3 个列族,过多影响性能
- 访问优化:同一列族的数据一起存储,查询时只扫描需要的列族
# 列族示例
create 'user', 'info', 'data' # info 存储基本信息,data 存储行为数据
Column Qualifier(列限定符)
列限定符是列族内的具体列,具有以下特点:
- 动态添加:可以随时添加新列,无需预定义
- 类型灵活:以字节数组存储,可以存储任意类型数据
- 命名规范:通常使用有意义的字符串命名
# 列限定符示例
info:name # 列族 info 下的 name 列
info:age # 列族 info 下的 age 列
data:login # 列族 data 下的 login 列
Timestamp(时间戳)
时间戳标识数据的版本,具有以下特点:
- 版本控制:同一单元格可以存储多个版本
- 默认值:写入时自动生成(毫秒级时间戳),也可手动指定
- 排序规则:同一单元格的版本按时间戳倒序排列(最新的在前)
- 版本数限制:可配置保留的版本数量,默认为 1
Cell(单元格)
单元格是数据存储的基本单位,由四元组唯一确定:
Cell = (RowKey, Column Family, Column Qualifier, Timestamp) → Value
数据示例
假设有一个用户表 user,包含列族 info 和 data:
| RowKey | Column Family | Column Qualifier | Timestamp | Value |
|---|---|---|---|---|
| user001 | info | name | 1704067200000 | Alice |
| user001 | info | age | 1704067200000 | 25 |
| user001 | info | city | 1704067200000 | Beijing |
| user001 | info | city | 1703980800000 | Shanghai |
| user001 | data | login_count | 1704067200000 | 100 |
| user001 | data | last_login | 1704067200000 | 2024-01-01 |
| user002 | info | name | 1704067200000 | Bob |
解读:
user001行有多个版本的城市信息(从 Shanghai 迁移到 Beijing)user001行有两个列族的数据:info和data- 不同行可以有不同的列(稀疏存储)
物理存储模型
理解 HBase 的物理存储模型有助于性能调优。
Region(区域)
Region 是 HBase 数据分片的基本单位:
- 范围划分:表按 RowKey 范围划分为多个 Region
- 分布式存储:每个 Region 由一个 RegionServer 管理
- 自动分裂:Region 大小达到阈值时自动分裂
- 负载均衡:HMaster 可以在 RegionServer 之间迁移 Region
表按 RowKey 范围划分:
Region 1: [ , row100) → RegionServer 1
Region 2: [row100, row200) → RegionServer 2
Region 3: [row200, row300) → RegionServer 3
Region 4: [row300, ) → RegionServer 1
Region 大小配置:
<property>
<name>hbase.hregion.max.filesize</name>
<value>10737418240</value> <!-- 10GB,默认值 -->
</property>
Store(存储)
每个 Region 由多个 Store 组成,每个 Store 对应一个列族:
Region
├── Store (列族 info)
│ ├── MemStore(内存写缓存)
│ └── HFile(磁盘数据文件)
│ ├── 000001.hfile
│ ├── 000002.hfile
│ └── ...
├── Store (列族 data)
│ ├── MemStore
│ └── HFile
│ └── ...
MemStore:
- 写缓存:数据先写入 MemStore,在内存中排序
- 刷写机制:MemStore 满时刷写到磁盘生成 HFile
- 刷写触发条件:
- MemStore 大小达到阈值(默认 128MB)
- RegionServer 全局 MemStore 内存达到阈值
- 手动触发刷写
HFile:
- 磁盘存储:MemStore 刷写后生成的磁盘文件
- 有序存储:数据按 RowKey、列族、列限定符排序
- 不可修改:HFile 一旦生成不可修改,只能通过合并(Compaction)清理
- 多版本:一个 HFile 可以包含同一单元格的多个版本
HFile 结构
HFile 是 HBase 的核心存储格式,基于 LSM-Tree 设计:
HFile 结构:
┌─────────────────────────────────────────┐
│ File Trailer │ 文件尾(索引信息)
├─────────────────────────────────────────┤
│ Data Block Index │ 数据块索引
├─────────────────────────────────────────┤
│ Meta Block Index │ 元数据块索引
├─────────────────────────────────────────┤
│ Load-on-open Section │ 启动时加载区
├─────────────────────────────────────────┤
│ Data Blocks │ 数据块(实际数据)
│ ┌─────────────────────────────────────┐│
│ │ Key1 → Value1 ││
│ │ Key2 → Value2 ││
│ │ ... ││
│ └─────────────────────────────────────┘│
├─────────────────────────────────────────┤
│ Meta Blocks │ 元数据块(布隆过滤器等)
├─────────────────────────────────────────┤
│ File Info │ 文件信息
└─────────────────────────────────────────┘
Data Block(数据块):
- 数据存储的基本单位
- 默认大小 64KB
- 包含多个 KeyValue 对
- 支持压缩(SNAPPY、LZO、GZIP 等)
Data Block Index(数据块索引):
- 快速定位 Data Block
- 存储每个 Data Block 的起始 RowKey 和偏移量
- 支持二分查找
WAL(Write-Ahead Log)
WAL 是 HBase 保证数据可靠性的关键机制:
- 预写日志:数据写入 MemStore 之前先写入 WAL
- 故障恢复:RegionServer 故障时,通过 WAL 恢复未持久化的数据
- 顺序写入:WAL 采用顺序写入,性能较高
写入流程:
Client → WAL(顺序写入)→ MemStore(内存排序)→ 返回成功
↓(MemStore 满)
HFile(持久化)
WAL 配置:
<!-- WAL 类型 -->
<property>
<name>hbase.wal.provider</name>
<value>asyncfs</value> <!-- 默认,支持异步写入 -->
</property>
<!-- WAL 复制策略 -->
<property>
<name>hbase.regionserver.hlog.replication</name>
<value>1</value> <!-- WAL 副本数 -->
</property>
HBase 架构
系统架构
HBase 采用主从架构(Master-Slave),主要组件包括:
┌─────────────────────────────────────────────────────────────────────┐
│ Client │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Application │ │ HBase Shell │ │ Java API │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
└─────────┼────────────────┼────────────────┼──────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ ZooKeeper │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ - 集群状态维护 │ │
│ │ - Master 选举 │ │
│ │ - Meta 表位置 │ │
│ │ - RegionServer 状态监控 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ HMaster │ │ HRegionServer │ │ HRegionServer │
│ (主节点) │ │ (工作节点1) │ │ (工作节点2) │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ 表管理 │ │ │ │ Region 1 │ │ │ │ Region 3 │ │
│ │ Region分配 │ │ │ │ Region 2 │ │ │ │ Region 4 │ │
│ │ 负载均衡 │ │ │ │ WAL │ │ │ │ WAL │ │
│ │ 故障恢复 │ │ │ │ MemStore │ │ │ │ MemStore │ │
│ └─────────────┘ │ │ │ BlockCache │ │ │ │ BlockCache │ │
└─────────────────┘ │ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘
│ │ │
└────────────────┼────────────────┘
▼
┌─────────────────────────────────────────────────────────────────────┐
│ HDFS │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ - WAL 文件 │ │
│ │ - HFile 文件 │ │
│ │ - 数据多副本存储 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
HMaster(主节点)
HMaster 是 HBase 的主节点,负责集群的管理和协调:
核心职责:
- 表管理:创建、删除、修改表结构
- Region 分配:将 Region 分配给 RegionServer
- 负载均衡:调整 Region 在 RegionServer 之间的分布
- 故障恢复:处理 RegionServer 故障,重新分配 Region
- 元数据管理:管理 meta 表(Region 位置信息)
高可用配置:
HMaster 支持多实例部署,实现高可用:
<!-- backup-masters 文件配置 -->
<!-- 在 conf/backup-masters 中添加备份 Master -->
master2.example.com
master3.example.com
注意:HMaster 不是单点故障,即使 HMaster 宕机,数据读写仍可正常进行(只是无法进行表管理等操作)。
HRegionServer(区域服务器)
HRegionServer 是 HBase 的工作节点,负责实际的数据存储和处理:
核心职责:
- 数据处理:处理客户端的读写请求
- Region 管理:管理分配给自己的 Region
- 数据持久化:将 MemStore 刷写到 HFile
- 合并压缩:合并小的 HFile,清理过期数据
- WAL 管理:维护 WAL 文件
内部组件:
- WAL:预写日志,保证数据可靠性
- MemStore:内存写缓存,每个列族一个
- BlockCache:读缓存,缓存频繁访问的数据块
- HFile:磁盘数据文件
ZooKeeper 的作用
ZooKeeper 在 HBase 架构中扮演重要角色:
核心功能:
- 集群管理:维护集群状态信息
- Master 选举:多个 HMaster 竞争成为 Active Master
- 元数据位置:存储 hbase:meta 表的位置
- 故障检测:监控 RegionServer 状态
- 分布式锁:表操作、Region 分配等需要分布式锁
ZooKeeper 节点结构:
/hbase
├── /master # 当前 Active Master
├── /meta-region-server # meta 表位置
├── /rs # RegionServer 列表
│ ├── /rs/server1
│ ├── /rs/server2
│ └── /rs/server3
├── /table # 表状态
├── /table-lock # 表锁
└── /region-in-transition # Region 迁移状态
Meta 表
hbase:meta 表是 HBase 的系统表,存储所有 Region 的位置信息:
Meta 表结构:
| RowKey | Column Family:info | 说明 |
|---|---|---|
| 表名,起始RowKey,时间戳 | regioninfo | Region 信息(表名、起始键、结束键) |
| server | RegionServer 地址 | |
| serverstartcode | RegionServer 启动时间 |
查询流程:
- Client 从 ZooKeeper 获取 meta 表位置
- Client 查询 meta 表获取目标 Region 的 RegionServer
- Client 直接与目标 RegionServer 通信
数据读写流程
写入流程详解
HBase 的写入流程设计为高吞吐量优化:
写入流程:
Client
│
│ 1. 获取 Region 位置
▼
ZooKeeper + Meta Table
│
│ 2. 返回 RegionServer 地址
▼
Client ─────────────────────────────────────────────┐
│ │
│ 3. 发送写入请求 │
▼ │
RegionServer │
│ │
│ 4. 写入 WAL(持久化保证) │
▼ │
WAL (HDFS) │
│ │
│ 5. 写入 MemStore(内存排序) │
▼ │
MemStore │
│ │
│ 6. 返回写入成功 │
◄────────────────────────────────────────────────┘
详细步骤:
- 定位 Region:Client 根据 RowKey 定位目标 Region 和 RegionServer
- 写入 WAL:数据先写入 WAL,保证持久性
- 写入 MemStore:数据写入 MemStore,在内存中按 RowKey 排序
- 返回成功:写入完成,返回客户端成功
- 异步刷写:MemStore 达到阈值后,异步刷写到 HFile
为什么写入这么快?
- 顺序写入:WAL 采用顺序写入,避免随机 I/O
- 内存写入:数据先写入内存,不直接操作磁盘
- 异步刷写:刷写操作异步进行,不阻塞写入
刷写触发条件:
<!-- MemStore 大小阈值 -->
<property>
<name>hbase.hregion.memstore.flush.size</name>
<value>134217728</value> <!-- 128MB -->
</property>
<!-- RegionServer 全局 MemStore 内存占比 -->
<property>
<name>hbase.regionserver.global.memstore.size</name>
<value>0.4</value> <!-- 堆内存的 40% -->
</property>
<!-- 阻塞写入的内存占比 -->
<property>
<name>hbase.regionserver.global.memstore.size.lower.limit</name>
<value>0.95</value> <!-- 达到 95% 时阻塞写入 -->
</property>
读取流程详解
HBase 的读取流程需要合并多个数据源:
读取流程:
Client
│
│ 1. 获取 Region 位置
▼
ZooKeeper + Meta Table
│
│ 2. 返回 RegionServer 地址
▼
Client ─────────────────────────────────────────────┐
│ │
│ 3. 发送读取请求 │
▼ │
RegionServer │
│ │
│ 4. 查询 BlockCache(读缓存) │
▼ │
BlockCache ─────────────────────────────────────────┤
│ 未命中 │
│ 5. 查询 MemStore(写缓存) │
▼ │
MemStore ───────────────────────────────────────────┤
│ 未命中 │
│ 6. 查询 HFile(磁盘文件) │
▼ │
HFile (with Bloom Filter) │
│ │
│ 7. 合并结果,返回客户端 │
◄────────────────────────────────────────────────┘
详细步骤:
- 定位 Region:Client 根据 RowKey 定位目标 Region
- 查询 BlockCache:先检查读缓存
- 查询 MemStore:检查写缓存中是否有最新数据
- 查询 HFile:从磁盘文件读取数据(使用布隆过滤器加速)
- 合并结果:合并多个数据源的结果,返回最新版本
读取优化:BlockCache
BlockCache 是 HBase 的读缓存,缓存频繁访问的数据块:
<!-- BlockCache 占堆内存比例 -->
<property>
<name>hfile.block.cache.size</name>
<value>0.4</value> <!-- 堆内存的 40% -->
</property>
<!-- BucketCache(堆外内存缓存) -->
<property>
<name>hbase.bucketcache.ioengine</name>
<value>offheap</value>
</property>
<property>
<name>hbase.bucketcache.size</name>
<value>8192</value> <!-- 8GB -->
</property>
MemStore 刷写机制
MemStore 刷写是 HBase 的重要机制:
刷写触发条件:
| 触发条件 | 说明 | 行为 |
|---|---|---|
| MemStore 大小达到阈值 | 默认 128MB | 触发该 Region 的 MemStore 刷写 |
| RegionServer 全局内存达到阈值 | 默认堆内存 40% | 触发所有 Region 的 MemStore 刷写 |
| RegionServer 全局内存达到高水位 | 默认堆内存 40% * 95% | 阻塞写入,强制刷写 |
| 手动触发 | 执行 flush 命令 | 立即刷写指定 Region |
| 定时刷写 | 默认 1 小时 | 定期刷写避免数据丢失 |
刷写流程:
- 准备阶段:锁住 MemStore,创建快照
- 刷写阶段:将快照写入 HDFS 生成 HFile
- 完成阶段:更新元数据,释放内存
Compaction(合并)
Compaction 是 HBase 的后台优化机制,用于合并 HFile 和清理过期数据:
Minor Compaction(小合并):
- 合并相邻的小 HFile
- 清理过期版本数据
- 不阻塞读写
- 触发条件:HFile 数量达到阈值
Minor Compaction:
[HFile1] [HFile2] [HFile3] [HFile4]
↓ Minor Compaction
[HFile_merged]
Major Compaction(大合并):
- 合并一个 Store 的所有 HFile
- 清理过期和标记删除的数据
- 可能阻塞读写
- 建议在业务低峰期执行
Major Compaction:
[HFile1] [HFile2] [HFile3] [HFile4] [删除标记]
↓ Major Compaction
[HFile_merged(无删除标记)]
Compaction 配置:
<!-- 触发 Minor Compaction 的 HFile 数量 -->
<property>
<name>hbase.hstore.compactionThreshold</name>
<value>3</value>
</property>
<!-- Major Compaction 周期(0 表示禁用自动执行) -->
<property>
<name>hbase.hregion.majorcompaction</name>
<value>604800000</value> <!-- 7天,建议设为0手动触发 -->
</property>
<!-- Compaction 线程数 -->
<property>
<name>hbase.regionserver.thread.compaction.throttle</name>
<value>2</value>
</property>
Region Split(Region 分裂)
当 Region 过大时,HBase 会自动分裂 Region:
分裂流程:
- 触发判断:Region 大小超过
hbase.hregion.max.filesize(默认 10GB) - 计算分裂点:找到 Region 中间位置的 RowKey
- 创建子 Region:创建两个新的子 Region
- 数据迁移:原 Region 的数据按分裂点分配给子 Region
- 更新 Meta:更新 meta 表中的 Region 信息
- 通知 HMaster:HMaster 将新 Region 分配给 RegionServer
Region Split:
原 Region: [row001 - row999]
↓ Split
子 Region 1: [row001 - row500]
子 Region 2: [row501 - row999]
预分区:
为避免频繁分裂和数据热点,建议建表时预分区:
# 指定分区点
create 'user', 'info', SPLITS => ['10', '20', '30', '40']
# 使用分区文件
create 'user', 'info', SPLITS_FILE => '/path/to/splits.txt'
# 使用 HexStringSplit 算法
create 'user', 'info', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
HBase Shell 操作
表操作
# 进入 HBase Shell
hbase shell
# 创建表
create 'user', 'info', 'data'
# 创建表(指定列族属性)
create 'user',
{NAME => 'info', VERSIONS => 3, TTL => 86400},
{NAME => 'data', VERSIONS => 5, COMPRESSION => 'SNAPPY'}
# 创建表(指定分区)
create 'user', 'info', SPLITS => ['user100', 'user200', 'user300']
# 查看所有表
list
# 查看表结构
describe 'user'
# 检查表是否存在
exists 'user'
# 检查表是否启用
is_enabled 'user'
is_disabled 'user'
# 禁用表
disable 'user'
# 启用表
enable 'user'
# 删除表(需先禁用)
disable 'user'
drop 'user'
# 修改表结构
disable 'user'
# 修改列族版本数
alter 'user', {NAME => 'info', VERSIONS => 5}
# 添加新列族
alter 'user', {NAME => 'extra', VERSIONS => 1}
# 删除列族
alter 'user', {NAME => 'extra', METHOD => 'delete'}
enable 'user'
数据操作
# 插入数据
put 'user', 'user001', 'info:name', 'Alice'
put 'user', 'user001', 'info:age', '25'
put 'user', 'user001', 'info:city', 'Beijing'
put 'user', 'user001', 'data:login_count', '100'
# 插入数据(指定时间戳)
put 'user', 'user001', 'info:city', 'Shanghai', 1703980800000
# 获取单行数据
get 'user', 'user001'
# 获取指定列族
get 'user', 'user001', 'info'
# 获取指定列
get 'user', 'user001', 'info:name', 'info:age'
# 获取多个版本
get 'user', 'user001', {COLUMN => 'info:name', VERSIONS => 3}
# 获取指定时间范围
get 'user', 'user001', {COLUMN => 'info:city', TIMERANGE => [1703900000000, 1704100000000]}
# 扫描全表
scan 'user'
# 扫描指定范围
scan 'user', {STARTROW => 'user001', STOPROW => 'user003'}
# 扫描指定列族
scan 'user', {COLUMNS => 'info'}
# 扫描指定列
scan 'user', {COLUMNS => ['info:name', 'info:age']}
# 限制返回行数
scan 'user', {LIMIT => 10}
# 反向扫描
scan 'user', {REVERSED => true}
# 删除指定列
delete 'user', 'user001', 'info:age'
# 删除指定版本
delete 'user', 'user001', 'info:city', 1703980800000
# 删除整行
deleteall 'user', 'user001'
# 统计行数
count 'user'
count 'user', {INTERVAL => 1000} # 每1000行显示一次进度
# 追加数据
append 'user', 'user001', 'info:name', '_suffix'
# 原子递增
incr 'user', 'user001', 'data:login_count', 1
# 清空表
truncate 'user'
truncate_preserve 'table' # 保留分区
过滤器
HBase 提供丰富的过滤器用于数据查询:
# 值过滤器
scan 'user', {FILTER => "ValueFilter(=, 'binary:Alice')"}
scan 'user', {FILTER => "ValueFilter(=, 'substring:Ali')"}
# 行键过滤器
scan 'user', {FILTER => "RowFilter(=, 'binary:user001')"}
scan 'user', {FILTER => "RowFilter(=, 'substring:user')"}
scan 'user', {FILTER => "RowFilter(<=, 'binary:user005')"}
# 列名过滤器
scan 'user', {FILTER => "QualifierFilter(=, 'substring:name')"}
# 列族过滤器
scan 'user', {FILTER => "FamilyFilter(=, 'binary:info')"}
# 单列值过滤器
scan 'user', {FILTER => "SingleColumnValueFilter('info', 'age', >, 'binary:20')"}
# 单列值排除过滤器
scan 'user', {FILTER => "SingleColumnValueExcludeFilter('info', 'age', >, 'binary:20')"}
# 分页过滤器
scan 'user', {FILTER => "PageFilter(10)"}
# 时间戳过滤器
scan 'user', {FILTER => "TimestampsFilter(1703980800000, 1704067200000)"}
# 组合过滤器(AND)
scan 'user', {FILTER => "SingleColumnValueFilter('info', 'age', >, 'binary:20') AND SingleColumnValueFilter('info', 'city', =, 'binary:Beijing')"}
# 组合过滤器(OR)
scan 'user', {FILTER => "SingleColumnValueFilter('info', 'city', =, 'binary:Beijing') OR SingleColumnValueFilter('info', 'city', =, 'binary:Shanghai')"}
# 过滤器列表
scan 'user', {FILTER => "FilterList(MUST_PASS_ALL,
SingleColumnValueFilter('info', 'age', >, 'binary:20'),
SingleColumnValueFilter('info', 'city', =, 'binary:Beijing'))"}
快照操作
# 创建快照
snapshot 'user', 'user_snapshot_20240101'
# 列出快照
list_snapshots
list_snapshots 'user.*' # 正则匹配
# 查看快照详情
describe_snapshot 'user_snapshot_20240101'
# 从快照恢复
disable 'user'
restore_snapshot 'user_snapshot_20240101'
enable 'user'
# 克隆快照为新表
clone_snapshot 'user_snapshot_20240101', 'user_clone'
# 删除快照
delete_snapshot 'user_snapshot_20240101'
Java API 操作
连接 HBase
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
public class HBaseConnection {
private static Connection connection;
/**
* 获取 HBase 连接(单例模式)
* HBase 的 Connection 是重量级对象,应该复用
*/
public static synchronized Connection getConnection() throws Exception {
if (connection == null || connection.isClosed()) {
Configuration config = HBaseConfiguration.create();
// 设置 ZooKeeper 地址
config.set("hbase.zookeeper.quorum", "zk1,zk2,zk3");
config.set("hbase.zookeeper.property.clientPort", "2181");
// 可选:设置超时时间
config.setInt("hbase.rpc.timeout", 60000);
config.setInt("hbase.client.operation.timeout", 60000);
config.setInt("hbase.client.scanner.timeout.period", 60000);
connection = ConnectionFactory.createConnection(config);
}
return connection;
}
/**
* 获取 Admin 对象
* Admin 用于执行管理操作(创建表、删除表等)
*/
public static Admin getAdmin() throws Exception {
return getConnection().getAdmin();
}
/**
* 获取 Table 对象
* Table 用于执行数据操作(增删改查)
*/
public static Table getTable(String tableName) throws Exception {
return getConnection().getTable(TableName.valueOf(tableName));
}
/**
* 关闭连接
*/
public static void close() {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
表管理
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseTableAdmin {
/**
* 创建表
*/
public static void createTable(String tableName, String[] columnFamilies) throws Exception {
try (Admin admin = HBaseConnection.getAdmin()) {
TableName table = TableName.valueOf(tableName);
if (admin.tableExists(table)) {
System.out.println("Table " + tableName + " already exists");
return;
}
// 构建表描述符
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(table);
// 添加列族
for (String cf : columnFamilies) {
ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(cf))
.setMaxVersions(3) // 版本数
.setMinVersions(1) // 最小版本数
.setTTL(86400) // TTL(秒)
.setBlocksize(65536) // 块大小
.setBlockCacheEnabled(true) // 启用块缓存
.setBloomFilterType(BloomType.ROW) // 布隆过滤器类型
.setCompressionType(Compression.Algorithm.SNAPPY); // 压缩算法
tableBuilder.setColumnFamily(cfBuilder.build());
}
// 创建表
admin.createTable(tableBuilder.build());
System.out.println("Table " + tableName + " created successfully");
}
}
/**
* 创建预分区表
*/
public static void createTableWithSplits(String tableName, String[] columnFamilies,
byte[][] splitKeys) throws Exception {
try (Admin admin = HBaseConnection.getAdmin()) {
TableName table = TableName.valueOf(tableName);
if (admin.tableExists(table)) {
System.out.println("Table " + tableName + " already exists");
return;
}
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(table);
for (String cf : columnFamilies) {
tableBuilder.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf))
.setMaxVersions(3)
.build()
);
}
// 使用预分区创建表
admin.createTable(tableBuilder.build(), splitKeys);
System.out.println("Table " + tableName + " created with " + splitKeys.length + " regions");
}
}
/**
* 删除表
*/
public static void deleteTable(String tableName) throws Exception {
try (Admin admin = HBaseConnection.getAdmin()) {
TableName table = TableName.valueOf(tableName);
if (admin.tableExists(table)) {
// 先禁用表
admin.disableTable(table);
// 再删除表
admin.deleteTable(table);
System.out.println("Table " + tableName + " deleted successfully");
}
}
}
/**
* 修改表结构
*/
public static void modifyTable(String tableName) throws Exception {
try (Admin admin = HBaseConnection.getAdmin()) {
TableName table = TableName.valueOf(tableName);
// 获取当前表描述
TableDescriptor descriptor = admin.getDescriptor(table);
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(descriptor);
// 修改列族属性
ColumnFamilyDescriptor cfDescriptor = descriptor.getColumnFamily(Bytes.toBytes("info"));
ColumnFamilyDescriptor newCfDescriptor = ColumnFamilyDescriptorBuilder
.newBuilder(cfDescriptor)
.setMaxVersions(5) // 修改版本数
.build();
builder.modifyColumnFamily(newCfDescriptor);
// 禁用表后修改
admin.disableTable(table);
admin.modifyTable(builder.build());
admin.enableTable(table);
System.out.println("Table " + tableName + " modified successfully");
}
}
}
数据写入
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
public class HBaseDataWriter {
/**
* 单条写入
*/
public static void put(String tableName, String rowKey, String columnFamily,
String column, String value) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(
Bytes.toBytes(columnFamily),
Bytes.toBytes(column),
Bytes.toBytes(value)
);
table.put(put);
}
}
/**
* 单条写入(带时间戳)
*/
public static void putWithTimestamp(String tableName, String rowKey,
String columnFamily, String column, String value, long timestamp) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(
Bytes.toBytes(columnFamily),
Bytes.toBytes(column),
timestamp,
Bytes.toBytes(value)
);
table.put(put);
}
}
/**
* 批量写入
*/
public static void batchPut(String tableName, List<Put> puts) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
table.put(puts);
}
}
/**
* 原子操作:检查并写入(CAS)
* 用于实现乐观锁
*/
public static boolean checkAndPut(String tableName, String rowKey,
String columnFamily, String column, String expectedValue,
Put put) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
return table.checkAndPut(
Bytes.toBytes(rowKey),
Bytes.toBytes(columnFamily),
Bytes.toBytes(column),
expectedValue == null ? null : Bytes.toBytes(expectedValue),
put
);
}
}
/**
* 示例:使用 checkAndPut 实现乐观锁更新
*/
public static boolean updateWithVersion(String tableName, String rowKey,
String columnFamily, String column, String newValue,
long expectedVersion) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
// 先获取当前版本号
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("version"));
Result result = table.get(get);
byte[] versionBytes = result.getValue(
Bytes.toBytes(columnFamily), Bytes.toBytes("version"));
long currentVersion = versionBytes == null ? 0 : Bytes.toLong(versionBytes);
// 检查版本号
if (currentVersion != expectedVersion) {
return false; // 版本冲突
}
// 原子更新
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column),
Bytes.toBytes(newValue));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("version"),
Bytes.toBytes(currentVersion + 1));
return table.checkAndPut(
Bytes.toBytes(rowKey),
Bytes.toBytes(columnFamily),
Bytes.toBytes("version"),
Bytes.toBytes(expectedVersion),
put
);
}
}
}
数据读取
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import java.util.List;
public class HBaseDataReader {
/**
* 单行读取
*/
public static Result get(String tableName, String rowKey) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
}
}
/**
* 获取指定列族
*/
public static Result get(String tableName, String rowKey, String columnFamily) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Get get = new Get(Bytes.toBytes(rowKey));
get.addFamily(Bytes.toBytes(columnFamily));
return table.get(get);
}
}
/**
* 获取多个版本
*/
public static Result getWithVersions(String tableName, String rowKey,
String columnFamily, String column, int versions) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
get.readVersions(versions);
return table.get(get);
}
}
/**
* 解析 Result
*/
public static void printResult(Result result) {
if (result == null || result.isEmpty()) {
System.out.println("No data found");
return;
}
// 遍历所有 Cell
for (Cell cell : result.rawCells()) {
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long timestamp = cell.getTimestamp();
System.out.printf("RowKey: %s, CF: %s, Col: %s, Value: %s, TS: %d%n",
rowKey, family, qualifier, value, timestamp);
}
}
/**
* 扫描查询
*/
public static void scan(String tableName, String startRow, String stopRow) throws Exception {
try (Table table = HBaseConnection.getTable(tableName);
ResultScanner scanner = table.getScanner(
new Scan()
.withStartRow(Bytes.toBytes(startRow))
.withStopRow(Bytes.toBytes(stopRow))
)) {
for (Result result : scanner) {
printResult(result);
}
}
}
/**
* 带过滤器的扫描
*/
public static void scanWithFilter(String tableName, String startRow, String stopRow,
Filter filter) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Scan scan = new Scan()
.withStartRow(Bytes.toBytes(startRow))
.withStopRow(Bytes.toBytes(stopRow))
.setFilter(filter)
.setCaching(100) // 每次RPC返回的行数
.setBatch(10); // 每行返回的列数(大行时使用)
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
printResult(result);
}
}
}
}
/**
* 分页扫描
*/
public static List<Result> scanByPage(String tableName, String startRow,
int pageSize) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Scan scan = new Scan()
.withStartRow(Bytes.toBytes(startRow))
.setLimit(pageSize)
.setCaching(pageSize);
List<Result> results = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
results.add(result);
}
}
return results;
}
}
}
数据删除
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseDataDeleter {
/**
* 删除指定列
*/
public static void deleteColumn(String tableName, String rowKey,
String columnFamily, String column) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
table.delete(delete);
}
}
/**
* 删除指定版本
*/
public static void deleteVersion(String tableName, String rowKey,
String columnFamily, String column, long timestamp) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), timestamp);
table.delete(delete);
}
}
/**
* 删除整行
*/
public static void deleteRow(String tableName, String rowKey) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
}
}
/**
* 删除指定列族
*/
public static void deleteFamily(String tableName, String rowKey,
String columnFamily) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addFamily(Bytes.toBytes(columnFamily));
table.delete(delete);
}
}
}
RowKey 设计
RowKey 设计是 HBase 性能优化的关键,好的 RowKey 设计可以提高查询效率并避免热点问题。
设计原则
1. 唯一性原则
RowKey 必须唯一标识一行数据:
// 用户ID
String rowKey = userId;
// 复合键
String rowKey = userId + "_" + timestamp;
// 哈希键
String rowKey = MD5Hash.digest(userId);
2. 长度原则
RowKey 长度建议控制在 16-50 字节:
- 太短:影响唯一性
- 太长:增加存储开销,降低索引效率
// 不推荐:过长的 RowKey
String rowKey = "user_" + userId + "_" + timestamp + "_" + UUID.randomUUID();
// 推荐:合理长度
String rowKey = userId + "_" + (Long.MAX_VALUE - timestamp);
3. 散列原则
避免数据倾斜,让数据均匀分布:
// 问题:时间递增导致写入热点
String rowKey = timestamp + "_" + userId;
// 解决方案1:反转时间戳
long reverseTimestamp = Long.MAX_VALUE - System.currentTimeMillis();
String rowKey = userId + "_" + reverseTimestamp;
// 解决方案2:加盐
int salt = Math.abs(rowKey.hashCode()) % buckets;
String saltedRowKey = salt + "_" + rowKey;
// 解决方案3:哈希前缀
String hashPrefix = MD5Hash.digest(rowKey).toString().substring(0, 4);
String hashedRowKey = hashPrefix + "_" + rowKey;
4. 排序原则
利用字典序排序优化范围查询:
// 场景:按时间范围查询用户行为
// RowKey: userId_reverseTimestamp
// 查询用户 user001 最近一天的行为
long endTime = System.currentTimeMillis();
long startTime = endTime - 86400000; // 1天前
String startRow = "user001_" + (Long.MAX_VALUE - endTime);
String stopRow = "user001_" + (Long.MAX_VALUE - startTime);
// 这样可以高效地进行范围扫描
常见设计模式
反转时间戳模式
适用于按时间范围查询的场景:
/**
* 反转时间戳 RowKey 设计
* 优点:相同用户的数据按时间倒序排列,便于查询最近数据
*
* RowKey 格式: userId_reverseTimestamp
* 例如: user001_9223372036854775807
*/
public class ReverseTimestampRowKey {
/**
* 生成 RowKey
*/
public static String generateRowKey(String userId) {
long reverseTimestamp = Long.MAX_VALUE - System.currentTimeMillis();
return userId + "_" + reverseTimestamp;
}
/**
* 生成时间范围查询的起止 RowKey
*/
public static String[] generateTimeRange(String userId,
long startTime, long endTime) {
// 注意:反转后,结束时间对应的 RowKey 更小
String startRow = userId + "_" + (Long.MAX_VALUE - endTime);
String stopRow = userId + "_" + (Long.MAX_VALUE - startTime);
return new String[]{startRow, stopRow};
}
}
加盐模式
分散写入热点:
/**
* 加盐 RowKey 设计
* 优点:数据均匀分布,避免热点
* 缺点:范围查询需要扫描多个 Region
*
* RowKey 格式: salt_originalKey
* 例如: 3_user001
*/
public class SaltedRowKey {
private final int buckets;
public SaltedRowKey(int buckets) {
this.buckets = buckets;
}
/**
* 生成加盐 RowKey
*/
public String generateRowKey(String originalKey) {
int salt = Math.abs(originalKey.hashCode()) % buckets;
return salt + "_" + originalKey;
}
/**
* 获取所有可能的盐值前缀(用于范围查询)
*/
public String[] getAllSaltPrefixes() {
String[] prefixes = new String[buckets];
for (int i = 0; i < buckets; i++) {
prefixes[i] = i + "_";
}
return prefixes;
}
/**
* 范围查询时需要并行查询所有桶
*/
public List<Result> scanWithSalt(Table table, String originalKeyPrefix)
throws Exception {
List<Result> results = new ArrayList<>();
for (int i = 0; i < buckets; i++) {
String saltPrefix = i + "_";
Scan scan = new Scan()
.withStartRow(Bytes.toBytes(saltPrefix + originalKeyPrefix))
.withStopRow(Bytes.toBytes(saltPrefix + originalKeyPrefix + "~"));
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
results.add(result);
}
}
}
return results;
}
}
哈希模式
使用哈希值作为前缀:
/**
* 哈希前缀 RowKey 设计
* 优点:数据均匀分布,相同原始 Key 的数据在同一 Region
* 缺点:需要知道原始 Key 才能查询
*
* RowKey 格式: hashPrefix_originalKey
* 例如: a1b2_user001
*/
public class HashPrefixRowKey {
private final int prefixLength;
public HashPrefixRowKey(int prefixLength) {
this.prefixLength = prefixLength;
}
/**
* 生成哈希前缀 RowKey
*/
public String generateRowKey(String originalKey) {
String hash = MD5Hash.digest(originalKey).toString()
.substring(0, prefixLength);
return hash + "_" + originalKey;
}
}
组合键模式
多个字段组合成 RowKey:
/**
* 组合键 RowKey 设计
* 优点:支持多维度查询
* 设计关键:查询频率高的字段放在前面
*
* 场景:订单数据
* 查询模式1:按用户查询订单
* 查询模式2:按商户查询订单
*/
public class CompositeRowKey {
/**
* 按用户查询为主的设计
* RowKey: userId_orderId
*/
public static String generateUserOrderKey(String userId, String orderId) {
return userId + "_" + orderId;
}
/**
* 按商户查询为主的设计
* RowKey: merchantId_orderId
*/
public static String generateMerchantOrderKey(String merchantId, String orderId) {
return merchantId + "_" + orderId;
}
/**
* 双向索引设计:建立两个表
* user_order 表:userId_orderId -> 订单详情
* merchant_order 表:merchantId_orderId -> 订单详情
*/
}
设计案例分析
场景1:时序数据存储
/**
* 场景:存储设备传感器数据
* 需求:按设备查询,按时间范围过滤
*
* 设计方案:deviceId_reverseTimestamp
*
* 优点:
* 1. 同一设备的数据在同一 Region,便于按设备查询
* 2. 反转时间戳使数据按时间倒序,便于查询最近数据
* 3. 避免时间递增导致的写入热点
*/
public class TimeSeriesRowKey {
public static String generate(String deviceId) {
long reverseTimestamp = Long.MAX_VALUE - System.currentTimeMillis();
return deviceId + "_" + reverseTimestamp;
}
/**
* 查询某设备最近N小时的数据
*/
public static Scan createScan(String deviceId, int hours) {
long endTime = System.currentTimeMillis();
long startTime = endTime - hours * 3600000L;
return new Scan()
.withStartRow(Bytes.toBytes(deviceId + "_" + (Long.MAX_VALUE - endTime)))
.withStopRow(Bytes.toBytes(deviceId + "_" + (Long.MAX_VALUE - startTime)));
}
}
场景2:社交动态存储
/**
* 场景:存储用户社交动态(类似微博、朋友圈)
* 需求:按用户查看自己的动态,按时间排序
*
* 设计方案:userId_reverseTimestamp
*
* 问题:大V用户动态多,可能产生热点
* 优化:动态分表或使用时间分区
*/
public class SocialFeedRowKey {
public static String generate(String userId) {
long reverseTimestamp = Long.MAX_VALUE - System.currentTimeMillis();
return userId + "_" + reverseTimestamp;
}
/**
* 查询用户的动态列表
*/
public static Scan createScan(String userId, int limit) {
return new Scan()
.withStartRow(Bytes.toBytes(userId + "_"))
.withStopRow(Bytes.toBytes(userId + "_~"))
.setLimit(limit)
.setReversed(true); // 反向扫描,获取最新动态
}
}
Bloom Filter(布隆过滤器)
布隆过滤器是 HBase 优化读取性能的重要机制。
原理
布隆过滤器是一种空间效率很高的概率型数据结构,用于判断一个元素是否在集合中:
- 可能存在:元素可能在集合中
- 一定不存在:元素一定不在集合中
布隆过滤器原理:
┌─────────────────────────────────┐
│ Bit Array (m bits) │
│ 0 1 0 1 0 0 1 0 1 0 0 1 0 1 0 0 │
└─────────────────────────────────┘
↑ ↑ ↑
│ │ │
Key "Alice" ───┘ │ │
h1(Alice) = 2 │ │
h2(Alice) = 4 ────┘ │
h3(Alice) = 11 ───────────────┘
查询 "Bob":
h1(Bob) = 3 → bit[3] = 1 ✓
h2(Bob) = 7 → bit[7] = 0 ✗ → Bob 一定不存在
HBase 中的布隆过滤器
HBase 在每个 HFile 中维护布隆过滤器,加速 RowKey 查找:
Row 类型(默认):
- 对每个 RowKey 创建布隆过滤器
- 查询时先检查布隆过滤器
- 如果布隆过滤器判断 RowKey 不存在,跳过该 HFile
RowCol 类型:
- 对每个 RowKey + Column 创建布隆过滤器
- 更精确的过滤,但占用更多内存
NONE:
- 不使用布隆过滤器
配置与使用
// 创建表时指定布隆过滤器类型
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.setBloomFilterType(BloomType.ROW) // 行级布隆过滤器
// .setBloomFilterType(BloomType.ROWCOL) // 行列级布隆过滤器
// .setBloomFilterType(BloomType.NONE) // 不使用
.build();
# Shell 中创建表时指定
create 'user', {NAME => 'info', BLOOMFILTER => 'ROW'}
create 'user', {NAME => 'info', BLOOMFILTER => 'ROWCOL'}
选择建议:
| 场景 | 推荐类型 | 原因 |
|---|---|---|
| 随机读取,按 RowKey 查询 | ROW | 减少不必要的 HFile 扫描 |
| 随机读取,指定列查询 | ROWCOL | 更精确的列级过滤 |
| 顺序扫描 | NONE | 布隆过滤器对扫描无帮助 |
| 写入密集型 | ROW 或 NONE | ROWCOL 增加写入开销 |
Coprocessor(协处理器)
协处理器允许在 RegionServer 上执行用户代码,减少数据传输。
类型
Observer(观察者):
- 类似于数据库触发器
- 在特定事件前后执行回调
- 类型:
- RegionObserver:Region 级别事件(Put、Get、Delete 等)
- RegionServerObserver:RegionServer 级别事件
- MasterObserver:Master 级别事件(创建表、删除表等)
- WALObserver:WAL 级别事件
Endpoint(端点):
- 类似于存储过程
- 在 RegionServer 上执行计算,返回结果
- 用于实现自定义聚合函数
RegionObserver 示例
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import java.io.IOException;
import java.util.Optional;
/**
* 自定义 RegionObserver:自动维护 updated_time 字段
*/
public class UpdatedTimeCoprocessor implements RegionCoprocessor, RegionObserver {
private static final byte[] CF = Bytes.toBytes("info");
private static final byte[] UPDATED_TIME_COL = Bytes.toBytes("updated_time");
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
/**
* 在 Put 操作前自动设置 updated_time
*/
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c,
Put put, WALEdit edit) throws IOException {
// 如果 Put 不包含 updated_time,自动添加
if (!put.has(CF, UPDATED_TIME_COL)) {
put.addColumn(CF, UPDATED_TIME_COL,
Bytes.toBytes(System.currentTimeMillis()));
}
}
/**
* 在 Get 操作后可以修改返回结果
*/
@Override
public void postGet(ObserverContext<RegionCoprocessorEnvironment> c,
Get get, Result result) throws IOException {
// 可以在这里添加额外的处理逻辑
}
}
部署协处理器
方式一:通过配置文件加载(全局)
<!-- hbase-site.xml -->
<property>
<name>hbase.coprocessor.region.classes</name>
<value>com.example.UpdatedTimeCoprocessor</value>
</property>
方式二:通过表属性加载(表级)
# 禁用表
disable 'user'
# 添加协处理器
alter 'user', 'coprocessor' => 'hdfs:///user/hadoop/coprocessor.jar|com.example.UpdatedTimeCoprocessor|1001|'
# 启用表
enable 'user'
# 查看协处理器
describe 'user'
# 删除协处理器
alter 'user', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
Endpoint 示例:自定义聚合
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionService;
import org.apache.hadoop.hbase.protobuf.generated.RowCountProtos;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* 自定义 Endpoint:计算 Region 行数
*/
public class RowCountEndpoint extends RowCountProtos.RowCountService
implements RegionCoprocessor, RegionService {
private RegionCoprocessorEnvironment env;
@Override
public void start(CoprocessorEnvironment env) throws IOException {
this.env = (RegionCoprocessorEnvironment) env;
}
@Override
public void getRowCount(RpcController controller,
RowCountProtos.RowCountRequest request,
RpcCallback<RowCountProtos.RowCountResponse> done) {
// 计算当前 Region 的行数
long count = 0;
try {
Region region = env.getRegion();
InternalScanner scanner = region.getScanner(new Scan());
List<Cell> results = new ArrayList<>();
boolean hasMore;
do {
hasMore = scanner.next(results);
count += results.size();
results.clear();
} while (hasMore);
scanner.close();
} catch (IOException e) {
// 处理异常
}
// 返回结果
RowCountProtos.RowCountResponse response =
RowCountProtos.RowCountResponse.newBuilder()
.setCount(count)
.build();
done.run(response);
}
@Override
public Service getService() {
return this;
}
}
HBase 性能调优
内存配置
<!-- RegionServer 堆内存 -->
<!-- 建议:16-32GB,根据数据量和负载调整 -->
export HBASE_REGIONSERVER_OPTS="-Xms16g -Xmx16g"
<!-- MemStore 内存占比 -->
<property>
<name>hbase.regionserver.global.memstore.size</name>
<value>0.4</value> <!-- 堆内存的 40% -->
</property>
<!-- BlockCache 内存占比 -->
<property>
<name>hfile.block.cache.size</name>
<value>0.4</value> <!-- 堆内存的 40% -->
</property>
<!-- 注意:MemStore + BlockCache 不要超过堆内存的 80% -->
列族优化
<!-- 列族配置 -->
<property>
<name>hbase.hregion.memstore.flush.size</name>
<value>134217728</value> <!-- 128MB,MemStore 刷写阈值 -->
</property>
<property>
<name>hbase.hstore.blockingStoreFiles</name>
<value>10</value> <!-- 触发阻塞的 HFile 数量 -->
</property>
<property>
<name>hbase.hstore.compactionThreshold</name>
<value>3</value> <!-- 触发 Minor Compaction 的 HFile 数量 -->
</property>
压缩配置
// 创建表时指定压缩
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.setCompressionType(Compression.Algorithm.SNAPPY) // 推荐
// .setCompressionType(Compression.Algorithm.LZO) // 更快
// .setCompressionType(Compression.Algorithm.GZ) // 压缩率最高
.build();
压缩算法对比:
| 算法 | 压缩率 | 压缩速度 | 解压速度 | 适用场景 |
|---|---|---|---|---|
| NONE | 1:1 | - | - | 数据已压缩或对延迟敏感 |
| SNAPPY | 2:1 | 快 | 快 | 推荐,平衡压缩率和性能 |
| LZO | 2.5:1 | 最快 | 最快 | 写入密集型场景 |
| GZIP | 4:1 | 慢 | 慢 | 存储密集型,冷数据 |
| LZ4 | 2.5:1 | 快 | 最快 | 需要快速解压的场景 |
| ZSTD | 3:1 | 中 | 快 | 较新的算法,压缩率和速度平衡 |
预分区
/**
* 预分区示例
* 避免 Region 自动分裂导致的数据迁移和性能下降
*/
public class PreSplitExample {
/**
* 使用预定义分区点
*/
public static void createWithSplits() throws Exception {
byte[][] splits = new byte[][] {
Bytes.toBytes("10"),
Bytes.toBytes("20"),
Bytes.toBytes("30"),
Bytes.toBytes("40"),
Bytes.toBytes("50")
};
Admin admin = HBaseConnection.getAdmin();
TableDescriptorBuilder builder = TableDescriptorBuilder
.newBuilder(TableName.valueOf("user"))
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.build()
);
admin.createTable(builder.build(), splits);
}
/**
* 使用十六进制分区算法
* 适用于 RowKey 是十六进制字符串的场景
*/
public static void createWithHexSplits() throws Exception {
Admin admin = HBaseConnection.getAdmin();
// 生成十六进制分区点
int numRegions = 16;
byte[][] splits = new byte[numRegions - 1][];
for (int i = 1; i < numRegions; i++) {
splits[i - 1] = Bytes.toBytes(String.format("%02x", i * 16));
}
TableDescriptorBuilder builder = TableDescriptorBuilder
.newBuilder(TableName.valueOf("user"))
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.build()
);
admin.createTable(builder.build(), splits);
}
}
客户端优化
/**
* HBase 客户端优化配置
*/
public class HBaseClientOptimization {
public static Configuration getOptimizedConfig() {
Configuration config = HBaseConfiguration.create();
// 写缓冲区大小
config.set("hbase.client.write.buffer", "2097152"); // 2MB
// 扫描器缓存(每次 RPC 返回的行数)
config.set("hbase.client.scanner.caching", "100");
// 操作超时时间
config.setInt("hbase.rpc.timeout", 60000); // 60秒
config.setInt("hbase.client.operation.timeout", 120000); // 2分钟
config.setInt("hbase.client.scanner.timeout.period", 120000); // 2分钟
// 重试次数
config.setInt("hbase.client.retries.number", 3);
// 重试间隔
config.setLong("hbase.client.pause", 100); // 100ms
return config;
}
/**
* 批量写入优化
*/
public static void batchPutOptimized(Table table, List<Put> puts) throws Exception {
// 设置写入缓冲区
table.put(puts);
// 或者使用批量提交
Object[] results = new Object[puts.size()];
table.batch(puts, results);
}
/**
* 扫描优化
*/
public static void scanOptimized(Table table, Scan scan) throws Exception {
// 设置扫描缓存
scan.setCaching(100); // 每次 RPC 返回 100 行
scan.setBatch(10); // 每行返回 10 列(大行时使用)
scan.setCacheBlocks(true); // 缓存数据块
scan.setReversed(false); // 正向扫描
// 只查询需要的列
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));
// 使用过滤器减少数据传输
scan.setFilter(new PageFilter(1000));
}
}
HBase 与其他系统集成
与 MapReduce 集成
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
/**
* MapReduce 读取 HBase 数据
*/
public class HBaseMapReduceExample {
/**
* Mapper:从 HBase 读取数据
*/
public static class MyMapper extends TableMapper<Text, IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
// 获取列值
byte[] nameBytes = value.getValue(
Bytes.toBytes("info"), Bytes.toBytes("name"));
String name = Bytes.toString(nameBytes);
text.set(name);
context.write(text, ONE);
}
}
/**
* Reducer:写入 HBase
*/
public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// 创建 Put 对象
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("stats"), Bytes.toBytes("count"),
Bytes.toBytes(sum));
context.write(null, put);
}
}
/**
* 配置 Job
*/
public static Job createJob(Configuration conf) throws Exception {
Job job = Job.getInstance(conf, "HBase MapReduce Example");
// 配置 Mapper 从 HBase 读取
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(
"source_table", // 输入表
scan, // Scan 配置
MyMapper.class, // Mapper 类
Text.class, // Mapper 输出 Key 类型
IntWritable.class, // Mapper 输出 Value 类型
job
);
// 配置 Reducer 写入 HBase
TableMapReduceUtil.initTableReducerJob(
"target_table", // 输出表
MyReducer.class, // Reducer 类
job
);
return job;
}
}
与 Spark 集成
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/**
* Spark 读取 HBase
*/
object HBaseSparkExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("HBase Spark Example")
.getOrCreate()
// 配置 HBase
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
conf.set(TableInputFormat.INPUT_TABLE, "user")
// 读取 HBase 数据
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] =
spark.sparkContext.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
// 处理数据
val result = hbaseRDD.map { case (_, result) =>
val rowKey = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue(
Bytes.toBytes("info"), Bytes.toBytes("name")))
(rowKey, name)
}
result.foreach(println)
spark.stop()
}
}
与 Phoenix 集成
Phoenix 是 HBase 的 SQL 层,提供 SQL 查询能力:
-- 创建表
CREATE TABLE user (
rowkey VARCHAR PRIMARY KEY,
info.name VARCHAR,
info.age INTEGER,
info.city VARCHAR,
data.login_count INTEGER
);
-- 插入数据
UPSERT INTO user VALUES('user001', 'Alice', 25, 'Beijing', 100);
-- 查询数据
SELECT * FROM user WHERE age > 20;
-- 创建索引
CREATE INDEX idx_age ON user(info.age);
-- 创建覆盖索引
CREATE INDEX idx_age_city ON user(info.age) INCLUDE(info.name, info.city);
-- 聚合查询
SELECT city, COUNT(*) as cnt, AVG(age) as avg_age
FROM user
GROUP BY city;
Phoenix 适用场景:
- 需要 SQL 查询能力
- 二级索引需求
- OLAP 分析场景
注意:Phoenix 适合中小规模数据查询,大规模数据扫描性能不如原生 HBase API。
HBase 运维管理
集群监控
关键监控指标:
| 类别 | 指标 | 说明 | 告警阈值 |
|---|---|---|---|
| RegionServer | 区域数量 | 每个 RS 管理的 Region 数 | > 1000 |
| 请求队列长度 | 待处理请求数 | > 100 | |
| 内存使用率 | MemStore + BlockCache | > 80% | |
| Compaction | Compaction 队列 | 待合并的 Store 数 | > 10 |
| Flush 队列 | 待刷写的 MemStore 数 | > 5 | |
| 读写性能 | 读延迟 | P99 读延迟 | > 100ms |
| 写延迟 | P99 写延迟 | > 50ms | |
| GC 时间 | GC 时间占比 | > 10% |
常用运维命令
# RegionServer 状态
hbase regionserver
# HMaster 状态
hbase master
# 手动触发 Compaction
hbase org.apache.hadoop.hbase.regionserver.CompactionTool \
--compact /hbase/data/default/user
# 手动触发 Major Compaction
major_compact 'user'
# 手动触发 Flush
flush 'user'
flush 'region_name'
# 移动 Region
move 'region_encoded_name', 'target_server'
# 分裂 Region
split 'region_name'
split 'table_name', 'split_key'
# 合并相邻 Region
merge_region 'region1', 'region2'
# 查看集群状态
hbase hbck
hbase hbck -details
# 修复集群不一致
hbase hbck -fix
# 查看表磁盘使用
du 'user'
# WAL 工具
hbase wal
故障排查
问题1:RegionServer 宕机
排查步骤:
1. 检查 RegionServer 日志
tail -f /var/log/hbase/hbase-hbase-regionserver-*.log
2. 检查 JVM GC 情况
jstat -gcutil <pid> 1000
3. 检查系统资源
top, iostat, vmstat
4. 检查网络连接
netstat -an | grep 16020
常见原因:
- GC 时间过长
- 内存不足
- 磁盘空间不足
- 网络问题
问题2:读写延迟高
排查步骤:
1. 检查 Compaction 队列
hbase shell: status 'detailed'
2. 检查 BlockCache 命中率
HBase Web UI -> RegionServer -> BlockCache
3. 检查热点 Region
HBase Web UI -> RegionServer -> Regions
4. 检查 RowKey 设计
是否存在数据倾斜
优化方案:
- 增加 RegionServer 数量
- 调整 Compaction 参数
- 优化 RowKey 设计
- 增加缓存大小
问题3:数据不一致
排查步骤:
1. 运行 hbck 检查
hbase hbck -details
2. 检查 Region 状态
HBase Web UI -> Table -> Regions
修复命令:
# 修复 Region 分配不一致
hbase hbck -fixAssignments
# 修复 Meta 表不一致
hbase hbck -fixMeta
# 修复 HDFS 上不一致的 Region
hbase hbck -fixHdfsHoles
小结
本章详细介绍了 HBase 分布式数据库的核心概念和使用方法:
- 数据模型:深入理解 RowKey、列族、列限定符、时间戳、单元格等概念
- 物理存储:Region、Store、MemStore、HFile、WAL 的组织结构
- 架构设计:HMaster、HRegionServer、ZooKeeper 的协作关系
- 读写流程:写入 WAL → MemStore → HFile 的流程,读取时多数据源合并
- Shell 操作:表管理、数据操作、过滤器、快照
- Java API:连接管理、表操作、数据读写
- RowKey 设计:反转时间戳、加盐、哈希、组合键等设计模式
- 性能优化:布隆过滤器、协处理器、内存配置、预分区
- 系统集成:MapReduce、Spark、Phoenix 集成方案
- 运维管理:监控指标、常用命令、故障排查
HBase 是大数据场景下实时随机读写的首选数据库,特别适合海量数据的键值查询场景。在实际项目中,HBase 常与 Spark、Flink、Phoenix 等组件配合使用,构建完整的大数据平台。