跳到主要内容

HBase 分布式数据库

Apache HBase 是一个分布式、可扩展、面向列的 NoSQL 数据库,基于 Google BigTable 论文实现,运行在 HDFS 之上,提供海量数据的实时随机读写能力。

HBase 概述

什么是 HBase?

HBase(Hadoop Database)是 Apache Hadoop 项目的子项目,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统。它适合存储 PB 级别的海量数据,并能在毫秒级别完成随机读写操作。

HBase 的设计目标是解决传统关系型数据库在处理海量数据时的瓶颈问题。传统数据库在面对数十亿行、数百万列的大表时,往往会遇到性能瓶颈、扩展困难等问题。HBase 通过分布式架构和列式存储,能够轻松处理这类大规模数据场景。

HBase 的核心特点:

  1. 海量存储:支持数十亿行、数百万列的大表,单表可达 PB 级别
  2. 实时读写:毫秒级的随机读写响应,适合实时查询场景
  3. 稀疏存储:空值(null)不占用存储空间,节省存储资源
  4. 高可靠性:数据多副本存储(基于 HDFS),自动故障转移
  5. 可扩展:支持在线扩容,线性扩展性能
  6. 无模式:列可以动态添加,同一表中不同行可以有不同的列

HBase vs 关系型数据库

理解 HBase 与传统关系型数据库的区别,有助于正确选择技术方案:

维度HBase关系型数据库
数据模型列族存储,稀疏矩阵行存储,固定模式
Schema灵活,列可动态增加固定,需预定义所有列
事务单行事务(行级原子性)ACID 事务(跨行、跨表)
索引只有行键(RowKey)索引多种索引(B+树、哈希等)
JOIN不支持,需应用层实现支持,优化器自动优化
SQL不原生支持(需 Phoenix)原生 SQL 支持
扩展性水平扩展,线性增长垂直扩展为主,有限水平扩展
数据量TB 到 PB 级别GB 到 TB 级别
适用场景海量数据随机读写、时序数据复杂事务处理、结构化数据

选择建议

  • 选择 HBase:海量数据(TB/PB 级)、需要随机读写、数据结构灵活、写入密集型场景
  • 选择关系型数据库:需要复杂事务、复杂查询(JOIN、子查询)、数据量可控、查询模式多样

HBase 应用场景

场景说明典型应用
用户画像存储用户属性和行为标签,支持实时查询电商、社交平台的用户画像系统
消息存储存储即时通讯消息、社交动态,支持时间范围查询微信、微博等消息系统
时序数据存储 IoT 设备数据、监控指标,支持高效写入工业物联网、监控告警系统
日志存储存储应用日志、访问日志,支持快速检索日志分析平台
推荐系统存储用户行为特征,支持实时特征提取电商推荐、广告投放系统
图谱数据存储社交关系、知识图谱的边数据社交网络、知识图谱

HBase 与 BigTable 的关系

HBase 是 Google BigTable 论文的开源实现。理解 BigTable 的设计理念有助于更好地使用 HBase:

BigTable 论文的核心思想

  1. 分布式多维排序映射:数据按(行键、列族、列限定符、时间戳)四元组组织
  2. LSM-Tree 存储模型:写入先入内存,再批量刷写到磁盘,优化写入性能
  3. 区域划分:表按行键范围划分为多个区域(Region),分布式存储
  4. 列族存储:同一列族的数据存储在一起,提高查询效率

HBase 版本演进

版本发布时间重要特性
HBase 0.x2008-2012基础分布式存储、HDFS 集成
HBase 1.x2015Phoenix 集成增强、协处理器改进
HBase 2.x2018Procedure Store、Region 分配优化
HBase 2.42022多集群客户端支持、META 副本增强
HBase 3.02024+Dual File Compaction、可靠性改进

HBase 3.0 重要更新

  • 新的 Dual File Compaction 算法
  • Master 过程执行可靠性改进
  • 更好的与 Hadoop 3.x 集成
  • 性能和稳定性优化

HBase 数据模型

理解 HBase 的数据模型是使用 HBase 的基础。HBase 的数据模型与关系型数据库有本质区别。

逻辑视图

HBase 的数据模型可以理解为多维排序的映射(Map)

(RowKey, Column Family, Column Qualifier, Timestamp) → Value

关键概念详解

RowKey(行键)

RowKey 是数据的主键,具有以下特点:

  • 唯一标识:每个 RowKey 唯一标识一行数据
  • 字典序排序:数据按 RowKey 的字典序存储,影响查询性能
  • 不可修改:RowKey 一旦设置不能修改,只能删除重建
  • 最大长度:理论无限制,但建议 16-50 字节
  • 设计关键:RowKey 的设计直接影响查询性能和数据分布
# RowKey 示例
user001 # 用户ID
20240101_user001 # 日期 + 用户ID(适合时间范围查询)
md5_user001 # 哈希后的用户ID(分散热点)

Column Family(列族)

列族是列的逻辑分组,具有以下特点:

  • 预定义:建表时必须指定列族,不能动态添加
  • 存储分离:不同列族的数据存储在不同目录中
  • 配置独立:每个列族可以独立配置压缩、版本数、TTL 等
  • 数量限制:建议不超过 2-3 个列族,过多影响性能
  • 访问优化:同一列族的数据一起存储,查询时只扫描需要的列族
# 列族示例
create 'user', 'info', 'data' # info 存储基本信息,data 存储行为数据

Column Qualifier(列限定符)

列限定符是列族内的具体列,具有以下特点:

  • 动态添加:可以随时添加新列,无需预定义
  • 类型灵活:以字节数组存储,可以存储任意类型数据
  • 命名规范:通常使用有意义的字符串命名
# 列限定符示例
info:name # 列族 info 下的 name 列
info:age # 列族 info 下的 age 列
data:login # 列族 data 下的 login 列

Timestamp(时间戳)

时间戳标识数据的版本,具有以下特点:

  • 版本控制:同一单元格可以存储多个版本
  • 默认值:写入时自动生成(毫秒级时间戳),也可手动指定
  • 排序规则:同一单元格的版本按时间戳倒序排列(最新的在前)
  • 版本数限制:可配置保留的版本数量,默认为 1

Cell(单元格)

单元格是数据存储的基本单位,由四元组唯一确定:

Cell = (RowKey, Column Family, Column Qualifier, Timestamp) → Value

数据示例

假设有一个用户表 user,包含列族 infodata

RowKeyColumn FamilyColumn QualifierTimestampValue
user001infoname1704067200000Alice
user001infoage170406720000025
user001infocity1704067200000Beijing
user001infocity1703980800000Shanghai
user001datalogin_count1704067200000100
user001datalast_login17040672000002024-01-01
user002infoname1704067200000Bob

解读

  • user001 行有多个版本的城市信息(从 Shanghai 迁移到 Beijing)
  • user001 行有两个列族的数据:infodata
  • 不同行可以有不同的列(稀疏存储)

物理存储模型

理解 HBase 的物理存储模型有助于性能调优。

Region(区域)

Region 是 HBase 数据分片的基本单位:

  • 范围划分:表按 RowKey 范围划分为多个 Region
  • 分布式存储:每个 Region 由一个 RegionServer 管理
  • 自动分裂:Region 大小达到阈值时自动分裂
  • 负载均衡:HMaster 可以在 RegionServer 之间迁移 Region
表按 RowKey 范围划分:

Region 1: [ , row100) → RegionServer 1
Region 2: [row100, row200) → RegionServer 2
Region 3: [row200, row300) → RegionServer 3
Region 4: [row300, ) → RegionServer 1

Region 大小配置

<property>
<name>hbase.hregion.max.filesize</name>
<value>10737418240</value> <!-- 10GB,默认值 -->
</property>

Store(存储)

每个 Region 由多个 Store 组成,每个 Store 对应一个列族:

Region
├── Store (列族 info)
│ ├── MemStore(内存写缓存)
│ └── HFile(磁盘数据文件)
│ ├── 000001.hfile
│ ├── 000002.hfile
│ └── ...
├── Store (列族 data)
│ ├── MemStore
│ └── HFile
│ └── ...

MemStore

  • 写缓存:数据先写入 MemStore,在内存中排序
  • 刷写机制:MemStore 满时刷写到磁盘生成 HFile
  • 刷写触发条件
    • MemStore 大小达到阈值(默认 128MB)
    • RegionServer 全局 MemStore 内存达到阈值
    • 手动触发刷写

HFile

  • 磁盘存储:MemStore 刷写后生成的磁盘文件
  • 有序存储:数据按 RowKey、列族、列限定符排序
  • 不可修改:HFile 一旦生成不可修改,只能通过合并(Compaction)清理
  • 多版本:一个 HFile 可以包含同一单元格的多个版本

HFile 结构

HFile 是 HBase 的核心存储格式,基于 LSM-Tree 设计:

HFile 结构:

┌─────────────────────────────────────────┐
│ File Trailer │ 文件尾(索引信息)
├─────────────────────────────────────────┤
│ Data Block Index │ 数据块索引
├─────────────────────────────────────────┤
│ Meta Block Index │ 元数据块索引
├─────────────────────────────────────────┤
│ Load-on-open Section │ 启动时加载区
├─────────────────────────────────────────┤
│ Data Blocks │ 数据块(实际数据)
│ ┌─────────────────────────────────────┐│
│ │ Key1 → Value1 ││
│ │ Key2 → Value2 ││
│ │ ... ││
│ └─────────────────────────────────────┘│
├─────────────────────────────────────────┤
│ Meta Blocks │ 元数据块(布隆过滤器等)
├─────────────────────────────────────────┤
│ File Info │ 文件信息
└─────────────────────────────────────────┘

Data Block(数据块)

  • 数据存储的基本单位
  • 默认大小 64KB
  • 包含多个 KeyValue 对
  • 支持压缩(SNAPPY、LZO、GZIP 等)

Data Block Index(数据块索引)

  • 快速定位 Data Block
  • 存储每个 Data Block 的起始 RowKey 和偏移量
  • 支持二分查找

WAL(Write-Ahead Log)

WAL 是 HBase 保证数据可靠性的关键机制:

  • 预写日志:数据写入 MemStore 之前先写入 WAL
  • 故障恢复:RegionServer 故障时,通过 WAL 恢复未持久化的数据
  • 顺序写入:WAL 采用顺序写入,性能较高
写入流程:

Client → WAL(顺序写入)→ MemStore(内存排序)→ 返回成功
↓(MemStore 满)
HFile(持久化)

WAL 配置

<!-- WAL 类型 -->
<property>
<name>hbase.wal.provider</name>
<value>asyncfs</value> <!-- 默认,支持异步写入 -->
</property>

<!-- WAL 复制策略 -->
<property>
<name>hbase.regionserver.hlog.replication</name>
<value>1</value> <!-- WAL 副本数 -->
</property>

HBase 架构

系统架构

HBase 采用主从架构(Master-Slave),主要组件包括:

┌─────────────────────────────────────────────────────────────────────┐
│ Client │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Application │ │ HBase Shell │ │ Java API │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
└─────────┼────────────────┼────────────────┼──────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ ZooKeeper │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ - 集群状态维护 │ │
│ │ - Master 选举 │ │
│ │ - Meta 表位置 │ │
│ │ - RegionServer 状态监控 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ HMaster │ │ HRegionServer │ │ HRegionServer │
│ (主节点) │ │ (工作节点1) │ │ (工作节点2) │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ 表管理 │ │ │ │ Region 1 │ │ │ │ Region 3 │ │
│ │ Region分配 │ │ │ │ Region 2 │ │ │ │ Region 4 │ │
│ │ 负载均衡 │ │ │ │ WAL │ │ │ │ WAL │ │
│ │ 故障恢复 │ │ │ │ MemStore │ │ │ │ MemStore │ │
│ └─────────────┘ │ │ │ BlockCache │ │ │ │ BlockCache │ │
└─────────────────┘ │ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘
│ │ │
└────────────────┼────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│ HDFS │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ - WAL 文件 │ │
│ │ - HFile 文件 │ │
│ │ - 数据多副本存储 │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

HMaster(主节点)

HMaster 是 HBase 的主节点,负责集群的管理和协调:

核心职责

  1. 表管理:创建、删除、修改表结构
  2. Region 分配:将 Region 分配给 RegionServer
  3. 负载均衡:调整 Region 在 RegionServer 之间的分布
  4. 故障恢复:处理 RegionServer 故障,重新分配 Region
  5. 元数据管理:管理 meta 表(Region 位置信息)

高可用配置

HMaster 支持多实例部署,实现高可用:

<!-- backup-masters 文件配置 -->
<!-- 在 conf/backup-masters 中添加备份 Master -->
master2.example.com
master3.example.com

注意:HMaster 不是单点故障,即使 HMaster 宕机,数据读写仍可正常进行(只是无法进行表管理等操作)。

HRegionServer(区域服务器)

HRegionServer 是 HBase 的工作节点,负责实际的数据存储和处理:

核心职责

  1. 数据处理:处理客户端的读写请求
  2. Region 管理:管理分配给自己的 Region
  3. 数据持久化:将 MemStore 刷写到 HFile
  4. 合并压缩:合并小的 HFile,清理过期数据
  5. WAL 管理:维护 WAL 文件

内部组件

  • WAL:预写日志,保证数据可靠性
  • MemStore:内存写缓存,每个列族一个
  • BlockCache:读缓存,缓存频繁访问的数据块
  • HFile:磁盘数据文件

ZooKeeper 的作用

ZooKeeper 在 HBase 架构中扮演重要角色:

核心功能

  1. 集群管理:维护集群状态信息
  2. Master 选举:多个 HMaster 竞争成为 Active Master
  3. 元数据位置:存储 hbase:meta 表的位置
  4. 故障检测:监控 RegionServer 状态
  5. 分布式锁:表操作、Region 分配等需要分布式锁

ZooKeeper 节点结构

/hbase
├── /master # 当前 Active Master
├── /meta-region-server # meta 表位置
├── /rs # RegionServer 列表
│ ├── /rs/server1
│ ├── /rs/server2
│ └── /rs/server3
├── /table # 表状态
├── /table-lock # 表锁
└── /region-in-transition # Region 迁移状态

Meta 表

hbase:meta 表是 HBase 的系统表,存储所有 Region 的位置信息:

Meta 表结构

RowKeyColumn Family:info说明
表名,起始RowKey,时间戳regioninfoRegion 信息(表名、起始键、结束键)
serverRegionServer 地址
serverstartcodeRegionServer 启动时间

查询流程

  1. Client 从 ZooKeeper 获取 meta 表位置
  2. Client 查询 meta 表获取目标 Region 的 RegionServer
  3. Client 直接与目标 RegionServer 通信

数据读写流程

写入流程详解

HBase 的写入流程设计为高吞吐量优化:

写入流程:

Client

│ 1. 获取 Region 位置

ZooKeeper + Meta Table

│ 2. 返回 RegionServer 地址

Client ─────────────────────────────────────────────┐
│ │
│ 3. 发送写入请求 │
▼ │
RegionServer │
│ │
│ 4. 写入 WAL(持久化保证) │
▼ │
WAL (HDFS) │
│ │
│ 5. 写入 MemStore(内存排序) │
▼ │
MemStore │
│ │
│ 6. 返回写入成功 │
◄────────────────────────────────────────────────┘

详细步骤

  1. 定位 Region:Client 根据 RowKey 定位目标 Region 和 RegionServer
  2. 写入 WAL:数据先写入 WAL,保证持久性
  3. 写入 MemStore:数据写入 MemStore,在内存中按 RowKey 排序
  4. 返回成功:写入完成,返回客户端成功
  5. 异步刷写:MemStore 达到阈值后,异步刷写到 HFile

为什么写入这么快?

  1. 顺序写入:WAL 采用顺序写入,避免随机 I/O
  2. 内存写入:数据先写入内存,不直接操作磁盘
  3. 异步刷写:刷写操作异步进行,不阻塞写入

刷写触发条件

<!-- MemStore 大小阈值 -->
<property>
<name>hbase.hregion.memstore.flush.size</name>
<value>134217728</value> <!-- 128MB -->
</property>

<!-- RegionServer 全局 MemStore 内存占比 -->
<property>
<name>hbase.regionserver.global.memstore.size</name>
<value>0.4</value> <!-- 堆内存的 40% -->
</property>

<!-- 阻塞写入的内存占比 -->
<property>
<name>hbase.regionserver.global.memstore.size.lower.limit</name>
<value>0.95</value> <!-- 达到 95% 时阻塞写入 -->
</property>

读取流程详解

HBase 的读取流程需要合并多个数据源:

读取流程:

Client

│ 1. 获取 Region 位置

ZooKeeper + Meta Table

│ 2. 返回 RegionServer 地址

Client ─────────────────────────────────────────────┐
│ │
│ 3. 发送读取请求 │
▼ │
RegionServer │
│ │
│ 4. 查询 BlockCache(读缓存) │
▼ │
BlockCache ─────────────────────────────────────────┤
│ 未命中 │
│ 5. 查询 MemStore(写缓存) │
▼ │
MemStore ───────────────────────────────────────────┤
│ 未命中 │
│ 6. 查询 HFile(磁盘文件) │
▼ │
HFile (with Bloom Filter) │
│ │
│ 7. 合并结果,返回客户端 │
◄────────────────────────────────────────────────┘

详细步骤

  1. 定位 Region:Client 根据 RowKey 定位目标 Region
  2. 查询 BlockCache:先检查读缓存
  3. 查询 MemStore:检查写缓存中是否有最新数据
  4. 查询 HFile:从磁盘文件读取数据(使用布隆过滤器加速)
  5. 合并结果:合并多个数据源的结果,返回最新版本

读取优化:BlockCache

BlockCache 是 HBase 的读缓存,缓存频繁访问的数据块:

<!-- BlockCache 占堆内存比例 -->
<property>
<name>hfile.block.cache.size</name>
<value>0.4</value> <!-- 堆内存的 40% -->
</property>

<!-- BucketCache(堆外内存缓存) -->
<property>
<name>hbase.bucketcache.ioengine</name>
<value>offheap</value>
</property>
<property>
<name>hbase.bucketcache.size</name>
<value>8192</value> <!-- 8GB -->
</property>

MemStore 刷写机制

MemStore 刷写是 HBase 的重要机制:

刷写触发条件

触发条件说明行为
MemStore 大小达到阈值默认 128MB触发该 Region 的 MemStore 刷写
RegionServer 全局内存达到阈值默认堆内存 40%触发所有 Region 的 MemStore 刷写
RegionServer 全局内存达到高水位默认堆内存 40% * 95%阻塞写入,强制刷写
手动触发执行 flush 命令立即刷写指定 Region
定时刷写默认 1 小时定期刷写避免数据丢失

刷写流程

  1. 准备阶段:锁住 MemStore,创建快照
  2. 刷写阶段:将快照写入 HDFS 生成 HFile
  3. 完成阶段:更新元数据,释放内存

Compaction(合并)

Compaction 是 HBase 的后台优化机制,用于合并 HFile 和清理过期数据:

Minor Compaction(小合并)

  • 合并相邻的小 HFile
  • 清理过期版本数据
  • 不阻塞读写
  • 触发条件:HFile 数量达到阈值
Minor Compaction:

[HFile1] [HFile2] [HFile3] [HFile4]
↓ Minor Compaction
[HFile_merged]

Major Compaction(大合并)

  • 合并一个 Store 的所有 HFile
  • 清理过期和标记删除的数据
  • 可能阻塞读写
  • 建议在业务低峰期执行
Major Compaction:

[HFile1] [HFile2] [HFile3] [HFile4] [删除标记]
↓ Major Compaction
[HFile_merged(无删除标记)]

Compaction 配置

<!-- 触发 Minor Compaction 的 HFile 数量 -->
<property>
<name>hbase.hstore.compactionThreshold</name>
<value>3</value>
</property>

<!-- Major Compaction 周期(0 表示禁用自动执行) -->
<property>
<name>hbase.hregion.majorcompaction</name>
<value>604800000</value> <!-- 7天,建议设为0手动触发 -->
</property>

<!-- Compaction 线程数 -->
<property>
<name>hbase.regionserver.thread.compaction.throttle</name>
<value>2</value>
</property>

Region Split(Region 分裂)

当 Region 过大时,HBase 会自动分裂 Region:

分裂流程

  1. 触发判断:Region 大小超过 hbase.hregion.max.filesize(默认 10GB)
  2. 计算分裂点:找到 Region 中间位置的 RowKey
  3. 创建子 Region:创建两个新的子 Region
  4. 数据迁移:原 Region 的数据按分裂点分配给子 Region
  5. 更新 Meta:更新 meta 表中的 Region 信息
  6. 通知 HMaster:HMaster 将新 Region 分配给 RegionServer
Region Split:

原 Region: [row001 - row999]
↓ Split
子 Region 1: [row001 - row500]
子 Region 2: [row501 - row999]

预分区

为避免频繁分裂和数据热点,建议建表时预分区:

# 指定分区点
create 'user', 'info', SPLITS => ['10', '20', '30', '40']

# 使用分区文件
create 'user', 'info', SPLITS_FILE => '/path/to/splits.txt'

# 使用 HexStringSplit 算法
create 'user', 'info', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

HBase Shell 操作

表操作

# 进入 HBase Shell
hbase shell

# 创建表
create 'user', 'info', 'data'

# 创建表(指定列族属性)
create 'user',
{NAME => 'info', VERSIONS => 3, TTL => 86400},
{NAME => 'data', VERSIONS => 5, COMPRESSION => 'SNAPPY'}

# 创建表(指定分区)
create 'user', 'info', SPLITS => ['user100', 'user200', 'user300']

# 查看所有表
list

# 查看表结构
describe 'user'

# 检查表是否存在
exists 'user'

# 检查表是否启用
is_enabled 'user'
is_disabled 'user'

# 禁用表
disable 'user'

# 启用表
enable 'user'

# 删除表(需先禁用)
disable 'user'
drop 'user'

# 修改表结构
disable 'user'
# 修改列族版本数
alter 'user', {NAME => 'info', VERSIONS => 5}
# 添加新列族
alter 'user', {NAME => 'extra', VERSIONS => 1}
# 删除列族
alter 'user', {NAME => 'extra', METHOD => 'delete'}
enable 'user'

数据操作

# 插入数据
put 'user', 'user001', 'info:name', 'Alice'
put 'user', 'user001', 'info:age', '25'
put 'user', 'user001', 'info:city', 'Beijing'
put 'user', 'user001', 'data:login_count', '100'

# 插入数据(指定时间戳)
put 'user', 'user001', 'info:city', 'Shanghai', 1703980800000

# 获取单行数据
get 'user', 'user001'

# 获取指定列族
get 'user', 'user001', 'info'

# 获取指定列
get 'user', 'user001', 'info:name', 'info:age'

# 获取多个版本
get 'user', 'user001', {COLUMN => 'info:name', VERSIONS => 3}

# 获取指定时间范围
get 'user', 'user001', {COLUMN => 'info:city', TIMERANGE => [1703900000000, 1704100000000]}

# 扫描全表
scan 'user'

# 扫描指定范围
scan 'user', {STARTROW => 'user001', STOPROW => 'user003'}

# 扫描指定列族
scan 'user', {COLUMNS => 'info'}

# 扫描指定列
scan 'user', {COLUMNS => ['info:name', 'info:age']}

# 限制返回行数
scan 'user', {LIMIT => 10}

# 反向扫描
scan 'user', {REVERSED => true}

# 删除指定列
delete 'user', 'user001', 'info:age'

# 删除指定版本
delete 'user', 'user001', 'info:city', 1703980800000

# 删除整行
deleteall 'user', 'user001'

# 统计行数
count 'user'
count 'user', {INTERVAL => 1000} # 每1000行显示一次进度

# 追加数据
append 'user', 'user001', 'info:name', '_suffix'

# 原子递增
incr 'user', 'user001', 'data:login_count', 1

# 清空表
truncate 'user'
truncate_preserve 'table' # 保留分区

过滤器

HBase 提供丰富的过滤器用于数据查询:

# 值过滤器
scan 'user', {FILTER => "ValueFilter(=, 'binary:Alice')"}
scan 'user', {FILTER => "ValueFilter(=, 'substring:Ali')"}

# 行键过滤器
scan 'user', {FILTER => "RowFilter(=, 'binary:user001')"}
scan 'user', {FILTER => "RowFilter(=, 'substring:user')"}
scan 'user', {FILTER => "RowFilter(<=, 'binary:user005')"}

# 列名过滤器
scan 'user', {FILTER => "QualifierFilter(=, 'substring:name')"}

# 列族过滤器
scan 'user', {FILTER => "FamilyFilter(=, 'binary:info')"}

# 单列值过滤器
scan 'user', {FILTER => "SingleColumnValueFilter('info', 'age', >, 'binary:20')"}

# 单列值排除过滤器
scan 'user', {FILTER => "SingleColumnValueExcludeFilter('info', 'age', >, 'binary:20')"}

# 分页过滤器
scan 'user', {FILTER => "PageFilter(10)"}

# 时间戳过滤器
scan 'user', {FILTER => "TimestampsFilter(1703980800000, 1704067200000)"}

# 组合过滤器(AND)
scan 'user', {FILTER => "SingleColumnValueFilter('info', 'age', >, 'binary:20') AND SingleColumnValueFilter('info', 'city', =, 'binary:Beijing')"}

# 组合过滤器(OR)
scan 'user', {FILTER => "SingleColumnValueFilter('info', 'city', =, 'binary:Beijing') OR SingleColumnValueFilter('info', 'city', =, 'binary:Shanghai')"}

# 过滤器列表
scan 'user', {FILTER => "FilterList(MUST_PASS_ALL,
SingleColumnValueFilter('info', 'age', >, 'binary:20'),
SingleColumnValueFilter('info', 'city', =, 'binary:Beijing'))"}

快照操作

# 创建快照
snapshot 'user', 'user_snapshot_20240101'

# 列出快照
list_snapshots
list_snapshots 'user.*' # 正则匹配

# 查看快照详情
describe_snapshot 'user_snapshot_20240101'

# 从快照恢复
disable 'user'
restore_snapshot 'user_snapshot_20240101'
enable 'user'

# 克隆快照为新表
clone_snapshot 'user_snapshot_20240101', 'user_clone'

# 删除快照
delete_snapshot 'user_snapshot_20240101'

Java API 操作

连接 HBase

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;

public class HBaseConnection {

private static Connection connection;

/**
* 获取 HBase 连接(单例模式)
* HBase 的 Connection 是重量级对象,应该复用
*/
public static synchronized Connection getConnection() throws Exception {
if (connection == null || connection.isClosed()) {
Configuration config = HBaseConfiguration.create();
// 设置 ZooKeeper 地址
config.set("hbase.zookeeper.quorum", "zk1,zk2,zk3");
config.set("hbase.zookeeper.property.clientPort", "2181");
// 可选:设置超时时间
config.setInt("hbase.rpc.timeout", 60000);
config.setInt("hbase.client.operation.timeout", 60000);
config.setInt("hbase.client.scanner.timeout.period", 60000);

connection = ConnectionFactory.createConnection(config);
}
return connection;
}

/**
* 获取 Admin 对象
* Admin 用于执行管理操作(创建表、删除表等)
*/
public static Admin getAdmin() throws Exception {
return getConnection().getAdmin();
}

/**
* 获取 Table 对象
* Table 用于执行数据操作(增删改查)
*/
public static Table getTable(String tableName) throws Exception {
return getConnection().getTable(TableName.valueOf(tableName));
}

/**
* 关闭连接
*/
public static void close() {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

表管理

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseTableAdmin {

/**
* 创建表
*/
public static void createTable(String tableName, String[] columnFamilies) throws Exception {
try (Admin admin = HBaseConnection.getAdmin()) {
TableName table = TableName.valueOf(tableName);

if (admin.tableExists(table)) {
System.out.println("Table " + tableName + " already exists");
return;
}

// 构建表描述符
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(table);

// 添加列族
for (String cf : columnFamilies) {
ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(cf))
.setMaxVersions(3) // 版本数
.setMinVersions(1) // 最小版本数
.setTTL(86400) // TTL(秒)
.setBlocksize(65536) // 块大小
.setBlockCacheEnabled(true) // 启用块缓存
.setBloomFilterType(BloomType.ROW) // 布隆过滤器类型
.setCompressionType(Compression.Algorithm.SNAPPY); // 压缩算法

tableBuilder.setColumnFamily(cfBuilder.build());
}

// 创建表
admin.createTable(tableBuilder.build());
System.out.println("Table " + tableName + " created successfully");
}
}

/**
* 创建预分区表
*/
public static void createTableWithSplits(String tableName, String[] columnFamilies,
byte[][] splitKeys) throws Exception {
try (Admin admin = HBaseConnection.getAdmin()) {
TableName table = TableName.valueOf(tableName);

if (admin.tableExists(table)) {
System.out.println("Table " + tableName + " already exists");
return;
}

TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(table);
for (String cf : columnFamilies) {
tableBuilder.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf))
.setMaxVersions(3)
.build()
);
}

// 使用预分区创建表
admin.createTable(tableBuilder.build(), splitKeys);
System.out.println("Table " + tableName + " created with " + splitKeys.length + " regions");
}
}

/**
* 删除表
*/
public static void deleteTable(String tableName) throws Exception {
try (Admin admin = HBaseConnection.getAdmin()) {
TableName table = TableName.valueOf(tableName);

if (admin.tableExists(table)) {
// 先禁用表
admin.disableTable(table);
// 再删除表
admin.deleteTable(table);
System.out.println("Table " + tableName + " deleted successfully");
}
}
}

/**
* 修改表结构
*/
public static void modifyTable(String tableName) throws Exception {
try (Admin admin = HBaseConnection.getAdmin()) {
TableName table = TableName.valueOf(tableName);

// 获取当前表描述
TableDescriptor descriptor = admin.getDescriptor(table);
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(descriptor);

// 修改列族属性
ColumnFamilyDescriptor cfDescriptor = descriptor.getColumnFamily(Bytes.toBytes("info"));
ColumnFamilyDescriptor newCfDescriptor = ColumnFamilyDescriptorBuilder
.newBuilder(cfDescriptor)
.setMaxVersions(5) // 修改版本数
.build();

builder.modifyColumnFamily(newCfDescriptor);

// 禁用表后修改
admin.disableTable(table);
admin.modifyTable(builder.build());
admin.enableTable(table);

System.out.println("Table " + tableName + " modified successfully");
}
}
}

数据写入

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.ArrayList;
import java.util.List;

public class HBaseDataWriter {

/**
* 单条写入
*/
public static void put(String tableName, String rowKey, String columnFamily,
String column, String value) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(
Bytes.toBytes(columnFamily),
Bytes.toBytes(column),
Bytes.toBytes(value)
);
table.put(put);
}
}

/**
* 单条写入(带时间戳)
*/
public static void putWithTimestamp(String tableName, String rowKey,
String columnFamily, String column, String value, long timestamp) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(
Bytes.toBytes(columnFamily),
Bytes.toBytes(column),
timestamp,
Bytes.toBytes(value)
);
table.put(put);
}
}

/**
* 批量写入
*/
public static void batchPut(String tableName, List<Put> puts) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
table.put(puts);
}
}

/**
* 原子操作:检查并写入(CAS)
* 用于实现乐观锁
*/
public static boolean checkAndPut(String tableName, String rowKey,
String columnFamily, String column, String expectedValue,
Put put) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
return table.checkAndPut(
Bytes.toBytes(rowKey),
Bytes.toBytes(columnFamily),
Bytes.toBytes(column),
expectedValue == null ? null : Bytes.toBytes(expectedValue),
put
);
}
}

/**
* 示例:使用 checkAndPut 实现乐观锁更新
*/
public static boolean updateWithVersion(String tableName, String rowKey,
String columnFamily, String column, String newValue,
long expectedVersion) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
// 先获取当前版本号
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("version"));
Result result = table.get(get);

byte[] versionBytes = result.getValue(
Bytes.toBytes(columnFamily), Bytes.toBytes("version"));
long currentVersion = versionBytes == null ? 0 : Bytes.toLong(versionBytes);

// 检查版本号
if (currentVersion != expectedVersion) {
return false; // 版本冲突
}

// 原子更新
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column),
Bytes.toBytes(newValue));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("version"),
Bytes.toBytes(currentVersion + 1));

return table.checkAndPut(
Bytes.toBytes(rowKey),
Bytes.toBytes(columnFamily),
Bytes.toBytes("version"),
Bytes.toBytes(expectedVersion),
put
);
}
}
}

数据读取

import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;

import java.util.List;

public class HBaseDataReader {

/**
* 单行读取
*/
public static Result get(String tableName, String rowKey) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
}
}

/**
* 获取指定列族
*/
public static Result get(String tableName, String rowKey, String columnFamily) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Get get = new Get(Bytes.toBytes(rowKey));
get.addFamily(Bytes.toBytes(columnFamily));
return table.get(get);
}
}

/**
* 获取多个版本
*/
public static Result getWithVersions(String tableName, String rowKey,
String columnFamily, String column, int versions) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
get.readVersions(versions);
return table.get(get);
}
}

/**
* 解析 Result
*/
public static void printResult(Result result) {
if (result == null || result.isEmpty()) {
System.out.println("No data found");
return;
}

// 遍历所有 Cell
for (Cell cell : result.rawCells()) {
String rowKey = Bytes.toString(CellUtil.cloneRow(cell));
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long timestamp = cell.getTimestamp();

System.out.printf("RowKey: %s, CF: %s, Col: %s, Value: %s, TS: %d%n",
rowKey, family, qualifier, value, timestamp);
}
}

/**
* 扫描查询
*/
public static void scan(String tableName, String startRow, String stopRow) throws Exception {
try (Table table = HBaseConnection.getTable(tableName);
ResultScanner scanner = table.getScanner(
new Scan()
.withStartRow(Bytes.toBytes(startRow))
.withStopRow(Bytes.toBytes(stopRow))
)) {

for (Result result : scanner) {
printResult(result);
}
}
}

/**
* 带过滤器的扫描
*/
public static void scanWithFilter(String tableName, String startRow, String stopRow,
Filter filter) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Scan scan = new Scan()
.withStartRow(Bytes.toBytes(startRow))
.withStopRow(Bytes.toBytes(stopRow))
.setFilter(filter)
.setCaching(100) // 每次RPC返回的行数
.setBatch(10); // 每行返回的列数(大行时使用)

try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
printResult(result);
}
}
}
}

/**
* 分页扫描
*/
public static List<Result> scanByPage(String tableName, String startRow,
int pageSize) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Scan scan = new Scan()
.withStartRow(Bytes.toBytes(startRow))
.setLimit(pageSize)
.setCaching(pageSize);

List<Result> results = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
results.add(result);
}
}
return results;
}
}
}

数据删除

import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseDataDeleter {

/**
* 删除指定列
*/
public static void deleteColumn(String tableName, String rowKey,
String columnFamily, String column) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
table.delete(delete);
}
}

/**
* 删除指定版本
*/
public static void deleteVersion(String tableName, String rowKey,
String columnFamily, String column, long timestamp) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), timestamp);
table.delete(delete);
}
}

/**
* 删除整行
*/
public static void deleteRow(String tableName, String rowKey) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
}
}

/**
* 删除指定列族
*/
public static void deleteFamily(String tableName, String rowKey,
String columnFamily) throws Exception {
try (Table table = HBaseConnection.getTable(tableName)) {
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addFamily(Bytes.toBytes(columnFamily));
table.delete(delete);
}
}
}

RowKey 设计

RowKey 设计是 HBase 性能优化的关键,好的 RowKey 设计可以提高查询效率并避免热点问题。

设计原则

1. 唯一性原则

RowKey 必须唯一标识一行数据:

// 用户ID
String rowKey = userId;

// 复合键
String rowKey = userId + "_" + timestamp;

// 哈希键
String rowKey = MD5Hash.digest(userId);

2. 长度原则

RowKey 长度建议控制在 16-50 字节:

  • 太短:影响唯一性
  • 太长:增加存储开销,降低索引效率
// 不推荐:过长的 RowKey
String rowKey = "user_" + userId + "_" + timestamp + "_" + UUID.randomUUID();

// 推荐:合理长度
String rowKey = userId + "_" + (Long.MAX_VALUE - timestamp);

3. 散列原则

避免数据倾斜,让数据均匀分布:

// 问题:时间递增导致写入热点
String rowKey = timestamp + "_" + userId;

// 解决方案1:反转时间戳
long reverseTimestamp = Long.MAX_VALUE - System.currentTimeMillis();
String rowKey = userId + "_" + reverseTimestamp;

// 解决方案2:加盐
int salt = Math.abs(rowKey.hashCode()) % buckets;
String saltedRowKey = salt + "_" + rowKey;

// 解决方案3:哈希前缀
String hashPrefix = MD5Hash.digest(rowKey).toString().substring(0, 4);
String hashedRowKey = hashPrefix + "_" + rowKey;

4. 排序原则

利用字典序排序优化范围查询:

// 场景:按时间范围查询用户行为
// RowKey: userId_reverseTimestamp
// 查询用户 user001 最近一天的行为
long endTime = System.currentTimeMillis();
long startTime = endTime - 86400000; // 1天前

String startRow = "user001_" + (Long.MAX_VALUE - endTime);
String stopRow = "user001_" + (Long.MAX_VALUE - startTime);

// 这样可以高效地进行范围扫描

常见设计模式

反转时间戳模式

适用于按时间范围查询的场景:

/**
* 反转时间戳 RowKey 设计
* 优点:相同用户的数据按时间倒序排列,便于查询最近数据
*
* RowKey 格式: userId_reverseTimestamp
* 例如: user001_9223372036854775807
*/
public class ReverseTimestampRowKey {

/**
* 生成 RowKey
*/
public static String generateRowKey(String userId) {
long reverseTimestamp = Long.MAX_VALUE - System.currentTimeMillis();
return userId + "_" + reverseTimestamp;
}

/**
* 生成时间范围查询的起止 RowKey
*/
public static String[] generateTimeRange(String userId,
long startTime, long endTime) {
// 注意:反转后,结束时间对应的 RowKey 更小
String startRow = userId + "_" + (Long.MAX_VALUE - endTime);
String stopRow = userId + "_" + (Long.MAX_VALUE - startTime);
return new String[]{startRow, stopRow};
}
}

加盐模式

分散写入热点:

/**
* 加盐 RowKey 设计
* 优点:数据均匀分布,避免热点
* 缺点:范围查询需要扫描多个 Region
*
* RowKey 格式: salt_originalKey
* 例如: 3_user001
*/
public class SaltedRowKey {

private final int buckets;

public SaltedRowKey(int buckets) {
this.buckets = buckets;
}

/**
* 生成加盐 RowKey
*/
public String generateRowKey(String originalKey) {
int salt = Math.abs(originalKey.hashCode()) % buckets;
return salt + "_" + originalKey;
}

/**
* 获取所有可能的盐值前缀(用于范围查询)
*/
public String[] getAllSaltPrefixes() {
String[] prefixes = new String[buckets];
for (int i = 0; i < buckets; i++) {
prefixes[i] = i + "_";
}
return prefixes;
}

/**
* 范围查询时需要并行查询所有桶
*/
public List<Result> scanWithSalt(Table table, String originalKeyPrefix)
throws Exception {
List<Result> results = new ArrayList<>();

for (int i = 0; i < buckets; i++) {
String saltPrefix = i + "_";
Scan scan = new Scan()
.withStartRow(Bytes.toBytes(saltPrefix + originalKeyPrefix))
.withStopRow(Bytes.toBytes(saltPrefix + originalKeyPrefix + "~"));

try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result : scanner) {
results.add(result);
}
}
}

return results;
}
}

哈希模式

使用哈希值作为前缀:

/**
* 哈希前缀 RowKey 设计
* 优点:数据均匀分布,相同原始 Key 的数据在同一 Region
* 缺点:需要知道原始 Key 才能查询
*
* RowKey 格式: hashPrefix_originalKey
* 例如: a1b2_user001
*/
public class HashPrefixRowKey {

private final int prefixLength;

public HashPrefixRowKey(int prefixLength) {
this.prefixLength = prefixLength;
}

/**
* 生成哈希前缀 RowKey
*/
public String generateRowKey(String originalKey) {
String hash = MD5Hash.digest(originalKey).toString()
.substring(0, prefixLength);
return hash + "_" + originalKey;
}
}

组合键模式

多个字段组合成 RowKey:

/**
* 组合键 RowKey 设计
* 优点:支持多维度查询
* 设计关键:查询频率高的字段放在前面
*
* 场景:订单数据
* 查询模式1:按用户查询订单
* 查询模式2:按商户查询订单
*/
public class CompositeRowKey {

/**
* 按用户查询为主的设计
* RowKey: userId_orderId
*/
public static String generateUserOrderKey(String userId, String orderId) {
return userId + "_" + orderId;
}

/**
* 按商户查询为主的设计
* RowKey: merchantId_orderId
*/
public static String generateMerchantOrderKey(String merchantId, String orderId) {
return merchantId + "_" + orderId;
}

/**
* 双向索引设计:建立两个表
* user_order 表:userId_orderId -> 订单详情
* merchant_order 表:merchantId_orderId -> 订单详情
*/
}

设计案例分析

场景1:时序数据存储

/**
* 场景:存储设备传感器数据
* 需求:按设备查询,按时间范围过滤
*
* 设计方案:deviceId_reverseTimestamp
*
* 优点:
* 1. 同一设备的数据在同一 Region,便于按设备查询
* 2. 反转时间戳使数据按时间倒序,便于查询最近数据
* 3. 避免时间递增导致的写入热点
*/
public class TimeSeriesRowKey {

public static String generate(String deviceId) {
long reverseTimestamp = Long.MAX_VALUE - System.currentTimeMillis();
return deviceId + "_" + reverseTimestamp;
}

/**
* 查询某设备最近N小时的数据
*/
public static Scan createScan(String deviceId, int hours) {
long endTime = System.currentTimeMillis();
long startTime = endTime - hours * 3600000L;

return new Scan()
.withStartRow(Bytes.toBytes(deviceId + "_" + (Long.MAX_VALUE - endTime)))
.withStopRow(Bytes.toBytes(deviceId + "_" + (Long.MAX_VALUE - startTime)));
}
}

场景2:社交动态存储

/**
* 场景:存储用户社交动态(类似微博、朋友圈)
* 需求:按用户查看自己的动态,按时间排序
*
* 设计方案:userId_reverseTimestamp
*
* 问题:大V用户动态多,可能产生热点
* 优化:动态分表或使用时间分区
*/
public class SocialFeedRowKey {

public static String generate(String userId) {
long reverseTimestamp = Long.MAX_VALUE - System.currentTimeMillis();
return userId + "_" + reverseTimestamp;
}

/**
* 查询用户的动态列表
*/
public static Scan createScan(String userId, int limit) {
return new Scan()
.withStartRow(Bytes.toBytes(userId + "_"))
.withStopRow(Bytes.toBytes(userId + "_~"))
.setLimit(limit)
.setReversed(true); // 反向扫描,获取最新动态
}
}

Bloom Filter(布隆过滤器)

布隆过滤器是 HBase 优化读取性能的重要机制。

原理

布隆过滤器是一种空间效率很高的概率型数据结构,用于判断一个元素是否在集合中:

  • 可能存在:元素可能在集合中
  • 一定不存在:元素一定不在集合中
布隆过滤器原理:

┌─────────────────────────────────┐
│ Bit Array (m bits) │
│ 0 1 0 1 0 0 1 0 1 0 0 1 0 1 0 0 │
└─────────────────────────────────┘
↑ ↑ ↑
│ │ │
Key "Alice" ───┘ │ │
h1(Alice) = 2 │ │
h2(Alice) = 4 ────┘ │
h3(Alice) = 11 ───────────────┘

查询 "Bob":
h1(Bob) = 3 → bit[3] = 1 ✓
h2(Bob) = 7 → bit[7] = 0 ✗ → Bob 一定不存在

HBase 中的布隆过滤器

HBase 在每个 HFile 中维护布隆过滤器,加速 RowKey 查找:

Row 类型(默认)

  • 对每个 RowKey 创建布隆过滤器
  • 查询时先检查布隆过滤器
  • 如果布隆过滤器判断 RowKey 不存在,跳过该 HFile

RowCol 类型

  • 对每个 RowKey + Column 创建布隆过滤器
  • 更精确的过滤,但占用更多内存

NONE

  • 不使用布隆过滤器

配置与使用

// 创建表时指定布隆过滤器类型
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.setBloomFilterType(BloomType.ROW) // 行级布隆过滤器
// .setBloomFilterType(BloomType.ROWCOL) // 行列级布隆过滤器
// .setBloomFilterType(BloomType.NONE) // 不使用
.build();
# Shell 中创建表时指定
create 'user', {NAME => 'info', BLOOMFILTER => 'ROW'}
create 'user', {NAME => 'info', BLOOMFILTER => 'ROWCOL'}

选择建议

场景推荐类型原因
随机读取,按 RowKey 查询ROW减少不必要的 HFile 扫描
随机读取,指定列查询ROWCOL更精确的列级过滤
顺序扫描NONE布隆过滤器对扫描无帮助
写入密集型ROW 或 NONEROWCOL 增加写入开销

Coprocessor(协处理器)

协处理器允许在 RegionServer 上执行用户代码,减少数据传输。

类型

Observer(观察者)

  • 类似于数据库触发器
  • 在特定事件前后执行回调
  • 类型:
    • RegionObserver:Region 级别事件(Put、Get、Delete 等)
    • RegionServerObserver:RegionServer 级别事件
    • MasterObserver:Master 级别事件(创建表、删除表等)
    • WALObserver:WAL 级别事件

Endpoint(端点)

  • 类似于存储过程
  • 在 RegionServer 上执行计算,返回结果
  • 用于实现自定义聚合函数

RegionObserver 示例

import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;

import java.io.IOException;
import java.util.Optional;

/**
* 自定义 RegionObserver:自动维护 updated_time 字段
*/
public class UpdatedTimeCoprocessor implements RegionCoprocessor, RegionObserver {

private static final byte[] CF = Bytes.toBytes("info");
private static final byte[] UPDATED_TIME_COL = Bytes.toBytes("updated_time");

@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}

/**
* 在 Put 操作前自动设置 updated_time
*/
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> c,
Put put, WALEdit edit) throws IOException {

// 如果 Put 不包含 updated_time,自动添加
if (!put.has(CF, UPDATED_TIME_COL)) {
put.addColumn(CF, UPDATED_TIME_COL,
Bytes.toBytes(System.currentTimeMillis()));
}
}

/**
* 在 Get 操作后可以修改返回结果
*/
@Override
public void postGet(ObserverContext<RegionCoprocessorEnvironment> c,
Get get, Result result) throws IOException {
// 可以在这里添加额外的处理逻辑
}
}

部署协处理器

方式一:通过配置文件加载(全局)

<!-- hbase-site.xml -->
<property>
<name>hbase.coprocessor.region.classes</name>
<value>com.example.UpdatedTimeCoprocessor</value>
</property>

方式二:通过表属性加载(表级)

# 禁用表
disable 'user'

# 添加协处理器
alter 'user', 'coprocessor' => 'hdfs:///user/hadoop/coprocessor.jar|com.example.UpdatedTimeCoprocessor|1001|'

# 启用表
enable 'user'

# 查看协处理器
describe 'user'

# 删除协处理器
alter 'user', METHOD => 'table_att_unset', NAME => 'coprocessor$1'

Endpoint 示例:自定义聚合

import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionService;
import org.apache.hadoop.hbase.protobuf.generated.RowCountProtos;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;

/**
* 自定义 Endpoint:计算 Region 行数
*/
public class RowCountEndpoint extends RowCountProtos.RowCountService
implements RegionCoprocessor, RegionService {

private RegionCoprocessorEnvironment env;

@Override
public void start(CoprocessorEnvironment env) throws IOException {
this.env = (RegionCoprocessorEnvironment) env;
}

@Override
public void getRowCount(RpcController controller,
RowCountProtos.RowCountRequest request,
RpcCallback<RowCountProtos.RowCountResponse> done) {

// 计算当前 Region 的行数
long count = 0;
try {
Region region = env.getRegion();
InternalScanner scanner = region.getScanner(new Scan());
List<Cell> results = new ArrayList<>();
boolean hasMore;
do {
hasMore = scanner.next(results);
count += results.size();
results.clear();
} while (hasMore);
scanner.close();
} catch (IOException e) {
// 处理异常
}

// 返回结果
RowCountProtos.RowCountResponse response =
RowCountProtos.RowCountResponse.newBuilder()
.setCount(count)
.build();
done.run(response);
}

@Override
public Service getService() {
return this;
}
}

HBase 性能调优

内存配置

<!-- RegionServer 堆内存 -->
<!-- 建议:16-32GB,根据数据量和负载调整 -->
export HBASE_REGIONSERVER_OPTS="-Xms16g -Xmx16g"

<!-- MemStore 内存占比 -->
<property>
<name>hbase.regionserver.global.memstore.size</name>
<value>0.4</value> <!-- 堆内存的 40% -->
</property>

<!-- BlockCache 内存占比 -->
<property>
<name>hfile.block.cache.size</name>
<value>0.4</value> <!-- 堆内存的 40% -->
</property>

<!-- 注意:MemStore + BlockCache 不要超过堆内存的 80% -->

列族优化

<!-- 列族配置 -->
<property>
<name>hbase.hregion.memstore.flush.size</name>
<value>134217728</value> <!-- 128MB,MemStore 刷写阈值 -->
</property>

<property>
<name>hbase.hstore.blockingStoreFiles</name>
<value>10</value> <!-- 触发阻塞的 HFile 数量 -->
</property>

<property>
<name>hbase.hstore.compactionThreshold</name>
<value>3</value> <!-- 触发 Minor Compaction 的 HFile 数量 -->
</property>

压缩配置

// 创建表时指定压缩
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.setCompressionType(Compression.Algorithm.SNAPPY) // 推荐
// .setCompressionType(Compression.Algorithm.LZO) // 更快
// .setCompressionType(Compression.Algorithm.GZ) // 压缩率最高
.build();

压缩算法对比

算法压缩率压缩速度解压速度适用场景
NONE1:1--数据已压缩或对延迟敏感
SNAPPY2:1推荐,平衡压缩率和性能
LZO2.5:1最快最快写入密集型场景
GZIP4:1存储密集型,冷数据
LZ42.5:1最快需要快速解压的场景
ZSTD3:1较新的算法,压缩率和速度平衡

预分区

/**
* 预分区示例
* 避免 Region 自动分裂导致的数据迁移和性能下降
*/
public class PreSplitExample {

/**
* 使用预定义分区点
*/
public static void createWithSplits() throws Exception {
byte[][] splits = new byte[][] {
Bytes.toBytes("10"),
Bytes.toBytes("20"),
Bytes.toBytes("30"),
Bytes.toBytes("40"),
Bytes.toBytes("50")
};

Admin admin = HBaseConnection.getAdmin();
TableDescriptorBuilder builder = TableDescriptorBuilder
.newBuilder(TableName.valueOf("user"))
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.build()
);

admin.createTable(builder.build(), splits);
}

/**
* 使用十六进制分区算法
* 适用于 RowKey 是十六进制字符串的场景
*/
public static void createWithHexSplits() throws Exception {
Admin admin = HBaseConnection.getAdmin();

// 生成十六进制分区点
int numRegions = 16;
byte[][] splits = new byte[numRegions - 1][];
for (int i = 1; i < numRegions; i++) {
splits[i - 1] = Bytes.toBytes(String.format("%02x", i * 16));
}

TableDescriptorBuilder builder = TableDescriptorBuilder
.newBuilder(TableName.valueOf("user"))
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
.build()
);

admin.createTable(builder.build(), splits);
}
}

客户端优化

/**
* HBase 客户端优化配置
*/
public class HBaseClientOptimization {

public static Configuration getOptimizedConfig() {
Configuration config = HBaseConfiguration.create();

// 写缓冲区大小
config.set("hbase.client.write.buffer", "2097152"); // 2MB

// 扫描器缓存(每次 RPC 返回的行数)
config.set("hbase.client.scanner.caching", "100");

// 操作超时时间
config.setInt("hbase.rpc.timeout", 60000); // 60秒
config.setInt("hbase.client.operation.timeout", 120000); // 2分钟
config.setInt("hbase.client.scanner.timeout.period", 120000); // 2分钟

// 重试次数
config.setInt("hbase.client.retries.number", 3);

// 重试间隔
config.setLong("hbase.client.pause", 100); // 100ms

return config;
}

/**
* 批量写入优化
*/
public static void batchPutOptimized(Table table, List<Put> puts) throws Exception {
// 设置写入缓冲区
table.put(puts);

// 或者使用批量提交
Object[] results = new Object[puts.size()];
table.batch(puts, results);
}

/**
* 扫描优化
*/
public static void scanOptimized(Table table, Scan scan) throws Exception {
// 设置扫描缓存
scan.setCaching(100); // 每次 RPC 返回 100 行
scan.setBatch(10); // 每行返回 10 列(大行时使用)
scan.setCacheBlocks(true); // 缓存数据块
scan.setReversed(false); // 正向扫描

// 只查询需要的列
scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"));

// 使用过滤器减少数据传输
scan.setFilter(new PageFilter(1000));
}
}

HBase 与其他系统集成

与 MapReduce 集成

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;

/**
* MapReduce 读取 HBase 数据
*/
public class HBaseMapReduceExample {

/**
* Mapper:从 HBase 读取数据
*/
public static class MyMapper extends TableMapper<Text, IntWritable> {

private static final IntWritable ONE = new IntWritable(1);
private Text text = new Text();

@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
// 获取列值
byte[] nameBytes = value.getValue(
Bytes.toBytes("info"), Bytes.toBytes("name"));
String name = Bytes.toString(nameBytes);

text.set(name);
context.write(text, ONE);
}
}

/**
* Reducer:写入 HBase
*/
public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}

// 创建 Put 对象
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("stats"), Bytes.toBytes("count"),
Bytes.toBytes(sum));

context.write(null, put);
}
}

/**
* 配置 Job
*/
public static Job createJob(Configuration conf) throws Exception {
Job job = Job.getInstance(conf, "HBase MapReduce Example");

// 配置 Mapper 从 HBase 读取
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);

TableMapReduceUtil.initTableMapperJob(
"source_table", // 输入表
scan, // Scan 配置
MyMapper.class, // Mapper 类
Text.class, // Mapper 输出 Key 类型
IntWritable.class, // Mapper 输出 Value 类型
job
);

// 配置 Reducer 写入 HBase
TableMapReduceUtil.initTableReducerJob(
"target_table", // 输出表
MyReducer.class, // Reducer 类
job
);

return job;
}
}

与 Spark 集成

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

/**
* Spark 读取 HBase
*/
object HBaseSparkExample {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("HBase Spark Example")
.getOrCreate()

// 配置 HBase
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
conf.set(TableInputFormat.INPUT_TABLE, "user")

// 读取 HBase 数据
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] =
spark.sparkContext.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)

// 处理数据
val result = hbaseRDD.map { case (_, result) =>
val rowKey = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue(
Bytes.toBytes("info"), Bytes.toBytes("name")))
(rowKey, name)
}

result.foreach(println)

spark.stop()
}
}

与 Phoenix 集成

Phoenix 是 HBase 的 SQL 层,提供 SQL 查询能力:

-- 创建表
CREATE TABLE user (
rowkey VARCHAR PRIMARY KEY,
info.name VARCHAR,
info.age INTEGER,
info.city VARCHAR,
data.login_count INTEGER
);

-- 插入数据
UPSERT INTO user VALUES('user001', 'Alice', 25, 'Beijing', 100);

-- 查询数据
SELECT * FROM user WHERE age > 20;

-- 创建索引
CREATE INDEX idx_age ON user(info.age);

-- 创建覆盖索引
CREATE INDEX idx_age_city ON user(info.age) INCLUDE(info.name, info.city);

-- 聚合查询
SELECT city, COUNT(*) as cnt, AVG(age) as avg_age
FROM user
GROUP BY city;

Phoenix 适用场景

  • 需要 SQL 查询能力
  • 二级索引需求
  • OLAP 分析场景

注意:Phoenix 适合中小规模数据查询,大规模数据扫描性能不如原生 HBase API。

HBase 运维管理

集群监控

关键监控指标

类别指标说明告警阈值
RegionServer区域数量每个 RS 管理的 Region 数> 1000
请求队列长度待处理请求数> 100
内存使用率MemStore + BlockCache> 80%
CompactionCompaction 队列待合并的 Store 数> 10
Flush 队列待刷写的 MemStore 数> 5
读写性能读延迟P99 读延迟> 100ms
写延迟P99 写延迟> 50ms
GC 时间GC 时间占比> 10%

常用运维命令

# RegionServer 状态
hbase regionserver

# HMaster 状态
hbase master

# 手动触发 Compaction
hbase org.apache.hadoop.hbase.regionserver.CompactionTool \
--compact /hbase/data/default/user

# 手动触发 Major Compaction
major_compact 'user'

# 手动触发 Flush
flush 'user'
flush 'region_name'

# 移动 Region
move 'region_encoded_name', 'target_server'

# 分裂 Region
split 'region_name'
split 'table_name', 'split_key'

# 合并相邻 Region
merge_region 'region1', 'region2'

# 查看集群状态
hbase hbck
hbase hbck -details

# 修复集群不一致
hbase hbck -fix

# 查看表磁盘使用
du 'user'

# WAL 工具
hbase wal

故障排查

问题1:RegionServer 宕机

排查步骤:
1. 检查 RegionServer 日志
tail -f /var/log/hbase/hbase-hbase-regionserver-*.log

2. 检查 JVM GC 情况
jstat -gcutil <pid> 1000

3. 检查系统资源
top, iostat, vmstat

4. 检查网络连接
netstat -an | grep 16020

常见原因:
- GC 时间过长
- 内存不足
- 磁盘空间不足
- 网络问题

问题2:读写延迟高

排查步骤:
1. 检查 Compaction 队列
hbase shell: status 'detailed'

2. 检查 BlockCache 命中率
HBase Web UI -> RegionServer -> BlockCache

3. 检查热点 Region
HBase Web UI -> RegionServer -> Regions

4. 检查 RowKey 设计
是否存在数据倾斜

优化方案:
- 增加 RegionServer 数量
- 调整 Compaction 参数
- 优化 RowKey 设计
- 增加缓存大小

问题3:数据不一致

排查步骤:
1. 运行 hbck 检查
hbase hbck -details

2. 检查 Region 状态
HBase Web UI -> Table -> Regions

修复命令:
# 修复 Region 分配不一致
hbase hbck -fixAssignments

# 修复 Meta 表不一致
hbase hbck -fixMeta

# 修复 HDFS 上不一致的 Region
hbase hbck -fixHdfsHoles

小结

本章详细介绍了 HBase 分布式数据库的核心概念和使用方法:

  1. 数据模型:深入理解 RowKey、列族、列限定符、时间戳、单元格等概念
  2. 物理存储:Region、Store、MemStore、HFile、WAL 的组织结构
  3. 架构设计:HMaster、HRegionServer、ZooKeeper 的协作关系
  4. 读写流程:写入 WAL → MemStore → HFile 的流程,读取时多数据源合并
  5. Shell 操作:表管理、数据操作、过滤器、快照
  6. Java API:连接管理、表操作、数据读写
  7. RowKey 设计:反转时间戳、加盐、哈希、组合键等设计模式
  8. 性能优化:布隆过滤器、协处理器、内存配置、预分区
  9. 系统集成:MapReduce、Spark、Phoenix 集成方案
  10. 运维管理:监控指标、常用命令、故障排查

HBase 是大数据场景下实时随机读写的首选数据库,特别适合海量数据的键值查询场景。在实际项目中,HBase 常与 Spark、Flink、Phoenix 等组件配合使用,构建完整的大数据平台。

参考资源