数据分区与复制
数据分区(Sharding)和数据复制(Replication)是分布式存储系统的两大核心技术。分区将数据分散到多个节点,突破单节点的存储和计算瓶颈;复制将数据复制到多个副本,提高数据的可用性和读取性能。两者结合,构建出可扩展、高可用的分布式存储系统。
为什么需要数据分区
单机数据库在面对大规模数据和高并发访问时,会遇到明显的瓶颈。
单机的物理限制
存储容量限制:单台服务器的硬盘容量有限。即使使用多块硬盘组建 RAID,容量扩展也有上限。当数据量达到 TB 甚至 PB 级别时,单机存储不再可行。
计算能力限制:单个 CPU 的核心数有限,处理能力有限。当并发查询数量激增时,CPU 成为瓶颈,查询响应时间急剧增加。
内存容量限制:数据库的性能很大程度上依赖于缓存。单机内存有限,无法缓存全部热点数据,导致频繁的磁盘 I/O,性能下降。
网络带宽限制:单机的网络接口带宽有限,当读写请求量超过带宽上限时,延迟会显著增加。
扩展的经济性:纵向扩展(Scale Up,升级硬件)的成本呈指数增长。一台性能翻倍的服务器,价格可能翻几倍。而横向扩展(Scale Out,增加服务器)的成本增长相对线性。
数据分区策略
数据分区的核心问题是:如何将数据均匀地分散到多个节点,同时支持高效的查询?
哈希分区
哈希分区是最常用的分区策略。对数据的某个字段(通常是主键)计算哈希值,然后对分区数取模,决定数据所属的分区。
// 哈希分区实现
public class HashSharding {
private final int shardCount;
/**
* 计算数据所属的分片
* @param key 数据键(如用户ID)
* @return 分片编号
*/
public int getShard(String key) {
// 使用 MurmurHash 比 Java hashCode 更均匀
int hash = MurmurHash.hash32(key);
// 取模确定分片
return Math.abs(hash) % shardCount;
}
}
优点:
- 数据分布均匀(假设哈希函数足够好)
- 点查询效率高,一次计算即可定位分区
- 实现简单,易于理解
缺点:
- 不支持范围查询(相邻的键可能分布在不同的分区)
- 增减分区时需要大量数据迁移(所有数据都需要重新计算哈希)
范围分区
范围分区按照数据的键值范围划分分区。每个分区负责一段连续的键范围。
// 范围分区实现
public class RangeSharding {
private final List<RangeShard> shards;
/**
* 范围分区映射表
*/
static class RangeShard {
String start; // 范围起点(包含)
String end; // 范围终点(不包含)
int shardId; // 分片编号
}
/**
* 计算数据所属的分片
*/
public int getShard(String key) {
// 二分查找确定范围
for (RangeShard shard : shards) {
if (key.compareTo(shard.start) >= 0 &&
key.compareTo(shard.end) < 0) {
return shard.shardId;
}
}
// 默认返回最后一个分区
return shards.get(shards.size() - 1).shardId;
}
}
优点:
- 支持范围查询(相邻的键在同一个分区)
- 增减分区相对简单(只需调整范围边界)
- 适合按时间顺序存储的数据(如日志、时间序列)
缺点:
- 可能出现数据倾斜(某些范围的数据量远大于其他范围)
- 热点问题(某些范围被频繁访问,如最新的数据)
3. 一致性哈希
// 一致性哈希实现
class ConsistentHash {
// 虚拟节点数量
private static final int VIRTUAL_NODES = 150;
// 哈希环
private final SortedMap<Integer, String> circle = new TreeMap<>();
public ConsistentHash(List<String> nodes) {
for (String node : nodes) {
addNode(node);
}
}
// 添加节点
public void addNode(String node) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String vNode = node + "#" + i;
int hash = hash(vNode);
circle.put(hash, node);
}
}
// 移除节点
public void removeNode(String node) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String vNode = node + "#" + i;
int hash = hash(vNode);
circle.remove(hash);
}
}
// 获取数据对应的节点
public String getNode(String key) {
if (circle.isEmpty()) {
throw new IllegalStateException("没有可用节点");
}
int hash = hash(key);
// 找到第一个大于等于hash的节点
SortedMap<Integer, String> tail = circle.tailMap(hash);
if (tail.isEmpty()) {
// 环回到开头
return circle.get(circle.firstKey());
}
return tail.get(tail.firstKey());
}
private int hash(String key) {
return Math.abs(key.hashCode());
}
}
分区架构
分区带来的问题
跨分区查询
// 跨分片查询处理
class CrossShardQuery {
// 聚合查询:需要从多个分片获取结果后聚合
public int countUsersByAge(int age) {
int totalCount = 0;
// 并行查询所有分片
List<Future<Integer>> futures = new ArrayList<>();
for (Shard shard : allShards) {
futures.add(executor.submit(() -> shard.countByAge(age)));
}
// 聚合结果
for (Future<Integer> future : futures) {
totalCount += future.get();
}
return totalCount;
}
// 排序查询:从多分片获取数据后排序
public List<User> getTopUsers(int limit) {
List<User> allUsers = new ArrayList<>();
// 并行获取各分片数据
for (Shard shard : allShards) {
allUsers.addAll(shard.getAllUsers());
}
// 内存排序取Top N
return allUsers.stream()
.sorted(Comparator.comparing(User::getScore).reversed())
.limit(limit)
.collect(Collectors.toList());
}
}
分布式ID
// 分布式ID生成方案
class DistributedIdGenerator {
// 方案1:UUID
public String generateUUID() {
return UUID.randomUUID().toString();
}
// 方案2:雪花算法
static class SnowflakeId {
// 64位: 1位(符号) + 41位(时间戳) + 10位(机器ID) + 12位(序列号)
private long sequence = 0;
private long lastTimestamp = -1;
private final long workerId;
public SnowflakeId(long workerId) {
this.workerId = workerId;
}
public synchronized long nextId() {
long timestamp = System.currentTimeMillis();
// 时钟回拨处理
if (timestamp < lastTimestamp) {
timestamp = lastTimestamp;
}
if (timestamp == lastTimestamp) {
// 同一毫秒内,序列号+1
sequence = (sequence + 1) & ((1 << 12) - 1);
if (sequence == 0) {
// 序列号用完,等待下一毫秒
timestamp = waitNextMillis(timestamp);
}
} else {
sequence = 0;
}
lastTimestamp = timestamp;
// 组装ID
return ((timestamp - 1609459200000L) << 22) |
(workerId << 12) |
sequence;
}
}
// 方案3:数据库号段
static class DatabaseSegmentId {
private static final long BATCH_SIZE = 1000;
private long currentId = 0;
private long maxId = 0;
public synchronized long nextId() {
if (currentId >= maxId) {
// 从数据库获取新的号段
currentId = fetchFromDatabase();
maxId = currentId + BATCH_SIZE;
}
return currentId++;
}
}
}
数据复制
复制模式
1. 主从复制
// 主从复制实现
class MasterSlaveReplication {
// 主节点写入
public void write(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);
// 2. 同步/异步复制到从节点
replicateToSlaves(key, value);
}
// 从节点读取
public String read(String key, ReadConsistency consistency) {
if (consistency == ReadConsistency.STRONG) {
// 强一致性:读取主节点或多数派
return readFromQuorum(key);
} else {
// 弱一致性:读取任意从节点
return readFromAnySlave(key);
}
}
// 异步复制
private void replicateToSlaves(String key, String value) {
for (SlaveNode slave : slaves) {
// 异步复制,不等待完成
slave.asyncReplicate(key, value);
}
}
}
2. 复制策略
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 同步复制 | 数据不丢失 | 延迟高 | 金融交易 |
| 异步复制 | 延迟低 | 可能丢数据 | 普通业务 |
| 半同步复制 | 平衡 | - | 大多数场景 |
// 半同步复制
class SemiSyncReplication {
public void write(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);
// 2. 同步复制到至少N个节点
int syncCount = 0;
for (ReplicaNode replica : replicas) {
if (replica.syncWrite(key, value)) {
syncCount++;
}
// 多数派确认后返回
if (syncCount >= requiredQuorum) {
break;
}
}
// 3. 剩余节点异步复制
for (ReplicaNode replica : replicas) {
if (!replica.isSynced()) {
replica.asyncReplicate(key, value);
}
}
}
}
3. 复制延迟处理
// 复制延迟处理
class ReplicationLagHandler {
// 读请求重定向
public String readWithRetry(String key) {
try {
// 优先读从节点
return readFromSlave(key);
} catch (StaleDataException e) {
// 数据过期,重试读主节点
return readFromMaster(key);
}
}
// 写入后立即读取
public void writeAndRead(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);
// 2. 同步复制到足够节点
replicateToQuorum(key, value);
// 3. 确保复制完成后再读取
waitForReplication(key);
// 4. 读取
return readFromMaster(key);
}
}
数据一致性模型
// 各种一致性模型实现
// 1. 强一致性:所有节点同步
class StrongConsistency {
public void write(String key, String value) {
// 同步到所有节点
for (Node node : allNodes) {
node.syncWrite(key, value);
}
}
}
// 2. 顺序一致性:保持操作顺序
class SequentialConsistency {
// 使用向量时钟保证顺序
VectorClock clock = new VectorClock();
public void write(String key, String value) {
// 为操作分配全局顺序
long sequence = clock.incrementAndGet();
node.write(key, value, sequence);
}
}
// 3. 因果一致性:保证因果关系
class CausalConsistency {
// 记录因果关系
Map<String, VectorClock> causality = new HashMap<>();
public void write(String key, String value, VectorClock dependsOn) {
// 检查依赖是否满足
if (dependsOn != null && !isCausalitySatisfied(dependsOn)) {
// 等待依赖操作完成
waitFor(dependsOn);
}
node.write(key, value);
}
}
// 4. 最终一致性:允许短暂不一致
class EventualConsistency {
public void write(String key, String value) {
// 立即返回,异步复制
node.writeLocal(key, value);
asyncReplicateToAll(key, value);
}
}
数据分区与复制结合
典型架构
实现示例
// 分片+副本系统
class ShardedReplicatedDB {
// 分片路由
private ShardingRouter router;
// 各分片的主从集群
private Map<Integer, MasterSlaveCluster> clusters;
// 写入操作
public void write(String key, String value) {
// 1. 计算分片
int shardId = router.getShard(key);
// 2. 找到对应集群的主节点
MasterSlaveCluster cluster = clusters.get(shardId);
MasterNode master = cluster.getMaster();
// 3. 写入主节点(主节点会自动复制到从节点)
master.write(key, value);
}
// 读取操作
public String read(String key, ReadConsistency consistency) {
int shardId = router.getShard(key);
MasterSlaveCluster cluster = clusters.get(shardId);
return consistency == ReadConsistency.STRONG
? cluster.readFromQuorum(key)
: cluster.readFromAnySlave(key);
}
}
小结
本章我们深入学习了数据分区与复制的核心概念:
-
数据分区(Sharding)
- 哈希分区:数据分布均匀,无法范围查询
- 范围分区:支持范围查询,可能数据倾斜
- 一致性哈希:减少节点变动时的数据迁移
-
数据复制
- 主从复制:读写分离,提高读取能力
- 同步/异步/半同步:权衡一致性和性能
- 复制延迟处理:读写分离、重试机制
-
一致性模型
- 强一致性:所有节点同步
- 顺序一致性:保持操作顺序
- 因果一致性:保证因果关系
- 最终一致性:允许短暂不一致
-
实际应用
- 分布式ID生成:UUID、雪花算法、号段模式
- 跨分区查询:并行查询、结果聚合
- 分区与复制结合:高可用、高性能
实战案例:使用 ShardingSphere 分库分表
案例背景
假设我们有一个订单系统,单表数据量已经超过 5000 万,查询性能严重下降。我们需要对订单表进行水平分片,同时保证数据的高可用。
分片策略设计
业务分析:
- 订单查询主要有两种场景:按订单ID查询、按用户ID查询
- 订单ID 使用雪花算法生成,包含时间信息
- 用户ID 是业务主键,适合作为分片键
分片方案:
- 采用用户ID作为分片键,使用哈希分片
- 分为 4 个库,每个库 4 张表,共 16 张表
- 每个分库配置一主两从
ShardingSphere 配置
Maven 依赖:
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>
<version>5.4.1</version>
</dependency>
Spring Boot 配置(application.yml):
spring:
shardingsphere:
datasource:
names: ds0,ds1,ds2,ds3
# 分库0(一主两从)
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.10:3306/order_db0
username: root
password: password
ds0slave0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.11:3306/order_db0
username: root
password: password
ds0slave1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.12:3306/order_db0
username: root
password: password
# 其他分库配置类似...
rules:
# 读写分离配置
readwrite-splitting:
data-sources:
ds0:
write-data-source-name: ds0
read-data-source-names: ds0slave0,ds0slave1
load-balancer-name: round_robin
ds1:
write-data-source-name: ds1
read-data-source-names: ds1slave0,ds1slave1
load-balancer-name: round_robin
load-balancers:
round_robin:
type: ROUND_ROBIN
# 分片配置
sharding:
tables:
t_order:
# 实际数据节点:4库 x 4表 = 16张表
actual-data-nodes: ds$->{0..3}.t_order_$->{0..3}
# 分库策略:按用户ID哈希
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: order-db-inline
# 分表策略:按订单ID哈希
table-strategy:
standard:
sharding-column: order_id
sharding-algorithm-name: order-table-inline
sharding-algorithms:
order-db-inline:
type: INLINE
props:
algorithm-expression: ds$->{user_id % 4}
order-table-inline:
type: INLINE
props:
algorithm-expression: t_order_$->{order_id % 4}
props:
sql-show: true
实体类与 Mapper
// 订单实体
@Data
@TableName("t_order")
public class Order {
@TableId(type = IdType.ASSIGN_ID) // 使用雪花算法生成ID
private Long orderId;
private Long userId;
private String productCode;
private Integer quantity;
private BigDecimal amount;
private Integer status;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
// 订单 Mapper
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
// 自定义查询:按用户ID查询订单
@Select("SELECT * FROM t_order WHERE user_id = #{userId} ORDER BY create_time DESC")
List<Order> selectByUserId(@Param("userId") Long userId);
// 自定义查询:按订单ID查询(需要路由到正确的分片)
@Select("SELECT * FROM t_order WHERE order_id = #{orderId}")
Order selectByOrderId(@Param("orderId") Long orderId);
}
业务服务层
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private SnowflakeIdGenerator idGenerator;
/**
* 创建订单
* 分片键 user_id 和 order_id 都需要指定
*/
@Transactional
public Order createOrder(OrderDTO dto) {
Order order = new Order();
// 生成订单ID(雪花算法,包含机器ID和时间戳)
order.setOrderId(idGenerator.nextId());
order.setUserId(dto.getUserId());
order.setProductCode(dto.getProductCode());
order.setQuantity(dto.getQuantity());
order.setAmount(dto.getAmount());
order.setStatus(OrderStatus.CREATED.getCode());
order.setCreateTime(LocalDateTime.now());
orderMapper.insert(order);
return order;
}
/**
* 查询用户订单列表
* 自动路由到 user_id % 4 对应的分库
*/
public List<Order> getUserOrders(Long userId) {
return orderMapper.selectByUserId(userId);
}
/**
* 查询单个订单
* 自动路由到 order_id % 4 对应的分表
*/
public Order getOrder(Long orderId) {
return orderMapper.selectByOrderId(orderId);
}
/**
* 分页查询 - 跨分片查询
* 需要从所有分片获取数据后聚合
*/
public Page<Order> queryOrders(OrderQueryDTO query, int pageNum, int pageSize) {
// ShardingSphere 会自动处理跨分片查询
// 但性能较差,建议使用 Elasticsearch 等搜索引擎
Page<Order> page = new Page<>(pageNum, pageSize);
LambdaQueryWrapper<Order> wrapper = new LambdaQueryWrapper<>();
if (query.getStatus() != null) {
wrapper.eq(Order::getStatus, query.getStatus());
}
if (query.getStartTime() != null) {
wrapper.ge(Order::getCreateTime, query.getStartTime());
}
wrapper.orderByDesc(Order::getCreateTime);
return orderMapper.selectPage(page, wrapper);
}
}
数据迁移方案
从单库迁移到分库分表是一个复杂的过程,需要谨慎处理:
/**
* 数据迁移服务
* 采用双写方案:同时写入旧库和新库
*/
@Service
public class DataMigrationService {
@Autowired
private LegacyOrderMapper legacyMapper; // 旧库
@Autowired
private OrderMapper newMapper; // 新分片库
/**
* 阶段1:历史数据迁移
* 按批次迁移,避免大事务
*/
public void migrateHistoryData() {
long lastId = 0;
int batchSize = 1000;
while (true) {
// 从旧库读取一批数据
List<Order> orders = legacyMapper.selectBatch(lastId, batchSize);
if (orders.isEmpty()) {
break;
}
// 写入新分片库
for (Order order : orders) {
// 需要重新生成分片键或保持原ID
newMapper.insert(order);
}
lastId = orders.get(orders.size() - 1).getOrderId();
log.info("已迁移 {} 条数据,最后ID: {}", orders.size(), lastId);
}
}
/**
* 阶段2:增量数据同步(双写)
*/
@Transactional
public void createOrderWithDualWrite(Order order) {
// 同时写入旧库和新库
legacyMapper.insert(order);
newMapper.insert(order);
}
/**
* 阶段3:数据校验
*/
public void verifyData() {
long lastId = 0;
int batchSize = 10000;
int errorCount = 0;
while (true) {
List<Order> legacyOrders = legacyMapper.selectBatch(lastId, batchSize);
if (legacyOrders.isEmpty()) break;
for (Order legacy : legacyOrders) {
Order newOrder = newMapper.selectByOrderId(legacy.getOrderId());
if (newOrder == null || !legacy.equals(newOrder)) {
log.error("数据不一致: orderId={}", legacy.getOrderId());
errorCount++;
}
}
lastId = legacyOrders.get(legacyOrders.size() - 1).getOrderId();
}
log.info("数据校验完成,不一致数量: {}", errorCount);
}
}
迁移流程:
性能优化建议
- 避免跨分片查询:将跨分片的查询需求转移到 Elasticsearch
- 合理设置分片键:选择查询频率高的字段作为分片键
- 热点数据处理:对热点数据使用广播表或单独处理
- 连接池优化:每个分库独立配置连接池
- 监控告警:监控各分库的负载均衡情况
// 广播表配置:小表全量复制到每个分库
// 适用于配置表、字典表等
spring:
shardingsphere:
rules:
sharding:
broadcast-tables:
- t_config
- t_dict