跳到主要内容

数据分区与复制

数据分区(Sharding)和数据复制(Replication)是分布式存储系统的两大核心技术。分区将数据分散到多个节点,突破单节点的存储和计算瓶颈;复制将数据复制到多个副本,提高数据的可用性和读取性能。两者结合,构建出可扩展、高可用的分布式存储系统。

为什么需要数据分区

单机数据库在面对大规模数据和高并发访问时,会遇到明显的瓶颈。

单机的物理限制

存储容量限制:单台服务器的硬盘容量有限。即使使用多块硬盘组建 RAID,容量扩展也有上限。当数据量达到 TB 甚至 PB 级别时,单机存储不再可行。

计算能力限制:单个 CPU 的核心数有限,处理能力有限。当并发查询数量激增时,CPU 成为瓶颈,查询响应时间急剧增加。

内存容量限制:数据库的性能很大程度上依赖于缓存。单机内存有限,无法缓存全部热点数据,导致频繁的磁盘 I/O,性能下降。

网络带宽限制:单机的网络接口带宽有限,当读写请求量超过带宽上限时,延迟会显著增加。

扩展的经济性:纵向扩展(Scale Up,升级硬件)的成本呈指数增长。一台性能翻倍的服务器,价格可能翻几倍。而横向扩展(Scale Out,增加服务器)的成本增长相对线性。

数据分区策略

数据分区的核心问题是:如何将数据均匀地分散到多个节点,同时支持高效的查询?

哈希分区

哈希分区是最常用的分区策略。对数据的某个字段(通常是主键)计算哈希值,然后对分区数取模,决定数据所属的分区。

// 哈希分区实现
public class HashSharding {

private final int shardCount;

/**
* 计算数据所属的分片
* @param key 数据键(如用户ID)
* @return 分片编号
*/
public int getShard(String key) {
// 使用 MurmurHash 比 Java hashCode 更均匀
int hash = MurmurHash.hash32(key);
// 取模确定分片
return Math.abs(hash) % shardCount;
}
}

优点

  • 数据分布均匀(假设哈希函数足够好)
  • 点查询效率高,一次计算即可定位分区
  • 实现简单,易于理解

缺点

  • 不支持范围查询(相邻的键可能分布在不同的分区)
  • 增减分区时需要大量数据迁移(所有数据都需要重新计算哈希)

范围分区

范围分区按照数据的键值范围划分分区。每个分区负责一段连续的键范围。

// 范围分区实现
public class RangeSharding {

private final List<RangeShard> shards;

/**
* 范围分区映射表
*/
static class RangeShard {
String start; // 范围起点(包含)
String end; // 范围终点(不包含)
int shardId; // 分片编号
}

/**
* 计算数据所属的分片
*/
public int getShard(String key) {
// 二分查找确定范围
for (RangeShard shard : shards) {
if (key.compareTo(shard.start) >= 0 &&
key.compareTo(shard.end) < 0) {
return shard.shardId;
}
}
// 默认返回最后一个分区
return shards.get(shards.size() - 1).shardId;
}
}

优点

  • 支持范围查询(相邻的键在同一个分区)
  • 增减分区相对简单(只需调整范围边界)
  • 适合按时间顺序存储的数据(如日志、时间序列)

缺点

  • 可能出现数据倾斜(某些范围的数据量远大于其他范围)
  • 热点问题(某些范围被频繁访问,如最新的数据)

3. 一致性哈希

// 一致性哈希实现
class ConsistentHash {

// 虚拟节点数量
private static final int VIRTUAL_NODES = 150;

// 哈希环
private final SortedMap<Integer, String> circle = new TreeMap<>();

public ConsistentHash(List<String> nodes) {
for (String node : nodes) {
addNode(node);
}
}

// 添加节点
public void addNode(String node) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String vNode = node + "#" + i;
int hash = hash(vNode);
circle.put(hash, node);
}
}

// 移除节点
public void removeNode(String node) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String vNode = node + "#" + i;
int hash = hash(vNode);
circle.remove(hash);
}
}

// 获取数据对应的节点
public String getNode(String key) {
if (circle.isEmpty()) {
throw new IllegalStateException("没有可用节点");
}

int hash = hash(key);

// 找到第一个大于等于hash的节点
SortedMap<Integer, String> tail = circle.tailMap(hash);

if (tail.isEmpty()) {
// 环回到开头
return circle.get(circle.firstKey());
}

return tail.get(tail.firstKey());
}

private int hash(String key) {
return Math.abs(key.hashCode());
}
}

分区架构

分区带来的问题

跨分区查询

// 跨分片查询处理
class CrossShardQuery {

// 聚合查询:需要从多个分片获取结果后聚合
public int countUsersByAge(int age) {
int totalCount = 0;

// 并行查询所有分片
List<Future<Integer>> futures = new ArrayList<>();
for (Shard shard : allShards) {
futures.add(executor.submit(() -> shard.countByAge(age)));
}

// 聚合结果
for (Future<Integer> future : futures) {
totalCount += future.get();
}

return totalCount;
}

// 排序查询:从多分片获取数据后排序
public List<User> getTopUsers(int limit) {
List<User> allUsers = new ArrayList<>();

// 并行获取各分片数据
for (Shard shard : allShards) {
allUsers.addAll(shard.getAllUsers());
}

// 内存排序取Top N
return allUsers.stream()
.sorted(Comparator.comparing(User::getScore).reversed())
.limit(limit)
.collect(Collectors.toList());
}
}

分布式ID

// 分布式ID生成方案
class DistributedIdGenerator {

// 方案1:UUID
public String generateUUID() {
return UUID.randomUUID().toString();
}

// 方案2:雪花算法
static class SnowflakeId {
// 64位: 1位(符号) + 41位(时间戳) + 10位(机器ID) + 12位(序列号)
private long sequence = 0;
private long lastTimestamp = -1;
private final long workerId;

public SnowflakeId(long workerId) {
this.workerId = workerId;
}

public synchronized long nextId() {
long timestamp = System.currentTimeMillis();

// 时钟回拨处理
if (timestamp < lastTimestamp) {
timestamp = lastTimestamp;
}

if (timestamp == lastTimestamp) {
// 同一毫秒内,序列号+1
sequence = (sequence + 1) & ((1 << 12) - 1);
if (sequence == 0) {
// 序列号用完,等待下一毫秒
timestamp = waitNextMillis(timestamp);
}
} else {
sequence = 0;
}

lastTimestamp = timestamp;

// 组装ID
return ((timestamp - 1609459200000L) << 22) |
(workerId << 12) |
sequence;
}
}

// 方案3:数据库号段
static class DatabaseSegmentId {
private static final long BATCH_SIZE = 1000;
private long currentId = 0;
private long maxId = 0;

public synchronized long nextId() {
if (currentId >= maxId) {
// 从数据库获取新的号段
currentId = fetchFromDatabase();
maxId = currentId + BATCH_SIZE;
}
return currentId++;
}
}
}

数据复制

复制模式

1. 主从复制

// 主从复制实现
class MasterSlaveReplication {

// 主节点写入
public void write(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);

// 2. 同步/异步复制到从节点
replicateToSlaves(key, value);
}

// 从节点读取
public String read(String key, ReadConsistency consistency) {
if (consistency == ReadConsistency.STRONG) {
// 强一致性:读取主节点或多数派
return readFromQuorum(key);
} else {
// 弱一致性:读取任意从节点
return readFromAnySlave(key);
}
}

// 异步复制
private void replicateToSlaves(String key, String value) {
for (SlaveNode slave : slaves) {
// 异步复制,不等待完成
slave.asyncReplicate(key, value);
}
}
}

2. 复制策略

策略优点缺点适用场景
同步复制数据不丢失延迟高金融交易
异步复制延迟低可能丢数据普通业务
半同步复制平衡-大多数场景
// 半同步复制
class SemiSyncReplication {

public void write(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);

// 2. 同步复制到至少N个节点
int syncCount = 0;
for (ReplicaNode replica : replicas) {
if (replica.syncWrite(key, value)) {
syncCount++;
}
// 多数派确认后返回
if (syncCount >= requiredQuorum) {
break;
}
}

// 3. 剩余节点异步复制
for (ReplicaNode replica : replicas) {
if (!replica.isSynced()) {
replica.asyncReplicate(key, value);
}
}
}
}

3. 复制延迟处理

// 复制延迟处理
class ReplicationLagHandler {

// 读请求重定向
public String readWithRetry(String key) {
try {
// 优先读从节点
return readFromSlave(key);
} catch (StaleDataException e) {
// 数据过期,重试读主节点
return readFromMaster(key);
}
}

// 写入后立即读取
public void writeAndRead(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);

// 2. 同步复制到足够节点
replicateToQuorum(key, value);

// 3. 确保复制完成后再读取
waitForReplication(key);

// 4. 读取
return readFromMaster(key);
}
}

数据一致性模型

// 各种一致性模型实现

// 1. 强一致性:所有节点同步
class StrongConsistency {
public void write(String key, String value) {
// 同步到所有节点
for (Node node : allNodes) {
node.syncWrite(key, value);
}
}
}

// 2. 顺序一致性:保持操作顺序
class SequentialConsistency {
// 使用向量时钟保证顺序
VectorClock clock = new VectorClock();

public void write(String key, String value) {
// 为操作分配全局顺序
long sequence = clock.incrementAndGet();
node.write(key, value, sequence);
}
}

// 3. 因果一致性:保证因果关系
class CausalConsistency {
// 记录因果关系
Map<String, VectorClock> causality = new HashMap<>();

public void write(String key, String value, VectorClock dependsOn) {
// 检查依赖是否满足
if (dependsOn != null && !isCausalitySatisfied(dependsOn)) {
// 等待依赖操作完成
waitFor(dependsOn);
}
node.write(key, value);
}
}

// 4. 最终一致性:允许短暂不一致
class EventualConsistency {
public void write(String key, String value) {
// 立即返回,异步复制
node.writeLocal(key, value);
asyncReplicateToAll(key, value);
}
}

数据分区与复制结合

典型架构

实现示例

// 分片+副本系统
class ShardedReplicatedDB {

// 分片路由
private ShardingRouter router;

// 各分片的主从集群
private Map<Integer, MasterSlaveCluster> clusters;

// 写入操作
public void write(String key, String value) {
// 1. 计算分片
int shardId = router.getShard(key);

// 2. 找到对应集群的主节点
MasterSlaveCluster cluster = clusters.get(shardId);
MasterNode master = cluster.getMaster();

// 3. 写入主节点(主节点会自动复制到从节点)
master.write(key, value);
}

// 读取操作
public String read(String key, ReadConsistency consistency) {
int shardId = router.getShard(key);
MasterSlaveCluster cluster = clusters.get(shardId);

return consistency == ReadConsistency.STRONG
? cluster.readFromQuorum(key)
: cluster.readFromAnySlave(key);
}
}

小结

本章我们深入学习了数据分区与复制的核心概念:

  1. 数据分区(Sharding)

    • 哈希分区:数据分布均匀,无法范围查询
    • 范围分区:支持范围查询,可能数据倾斜
    • 一致性哈希:减少节点变动时的数据迁移
  2. 数据复制

    • 主从复制:读写分离,提高读取能力
    • 同步/异步/半同步:权衡一致性和性能
    • 复制延迟处理:读写分离、重试机制
  3. 一致性模型

    • 强一致性:所有节点同步
    • 顺序一致性:保持操作顺序
    • 因果一致性:保证因果关系
    • 最终一致性:允许短暂不一致
  4. 实际应用

    • 分布式ID生成:UUID、雪花算法、号段模式
    • 跨分区查询:并行查询、结果聚合
    • 分区与复制结合:高可用、高性能

实战案例:使用 ShardingSphere 分库分表

案例背景

假设我们有一个订单系统,单表数据量已经超过 5000 万,查询性能严重下降。我们需要对订单表进行水平分片,同时保证数据的高可用。

分片策略设计

业务分析

  • 订单查询主要有两种场景:按订单ID查询、按用户ID查询
  • 订单ID 使用雪花算法生成,包含时间信息
  • 用户ID 是业务主键,适合作为分片键

分片方案

  • 采用用户ID作为分片键,使用哈希分片
  • 分为 4 个库,每个库 4 张表,共 16 张表
  • 每个分库配置一主两从

ShardingSphere 配置

Maven 依赖

<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>
<version>5.4.1</version>
</dependency>

Spring Boot 配置(application.yml)

spring:
shardingsphere:
datasource:
names: ds0,ds1,ds2,ds3

# 分库0(一主两从)
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.10:3306/order_db0
username: root
password: password
ds0slave0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.11:3306/order_db0
username: root
password: password
ds0slave1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.12:3306/order_db0
username: root
password: password

# 其他分库配置类似...

rules:
# 读写分离配置
readwrite-splitting:
data-sources:
ds0:
write-data-source-name: ds0
read-data-source-names: ds0slave0,ds0slave1
load-balancer-name: round_robin
ds1:
write-data-source-name: ds1
read-data-source-names: ds1slave0,ds1slave1
load-balancer-name: round_robin
load-balancers:
round_robin:
type: ROUND_ROBIN

# 分片配置
sharding:
tables:
t_order:
# 实际数据节点:4库 x 4表 = 16张表
actual-data-nodes: ds$->{0..3}.t_order_$->{0..3}

# 分库策略:按用户ID哈希
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: order-db-inline

# 分表策略:按订单ID哈希
table-strategy:
standard:
sharding-column: order_id
sharding-algorithm-name: order-table-inline

sharding-algorithms:
order-db-inline:
type: INLINE
props:
algorithm-expression: ds$->{user_id % 4}
order-table-inline:
type: INLINE
props:
algorithm-expression: t_order_$->{order_id % 4}

props:
sql-show: true

实体类与 Mapper

// 订单实体
@Data
@TableName("t_order")
public class Order {
@TableId(type = IdType.ASSIGN_ID) // 使用雪花算法生成ID
private Long orderId;

private Long userId;
private String productCode;
private Integer quantity;
private BigDecimal amount;
private Integer status;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

// 订单 Mapper
@Mapper
public interface OrderMapper extends BaseMapper<Order> {

// 自定义查询:按用户ID查询订单
@Select("SELECT * FROM t_order WHERE user_id = #{userId} ORDER BY create_time DESC")
List<Order> selectByUserId(@Param("userId") Long userId);

// 自定义查询:按订单ID查询(需要路由到正确的分片)
@Select("SELECT * FROM t_order WHERE order_id = #{orderId}")
Order selectByOrderId(@Param("orderId") Long orderId);
}

业务服务层

@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private SnowflakeIdGenerator idGenerator;

/**
* 创建订单
* 分片键 user_id 和 order_id 都需要指定
*/
@Transactional
public Order createOrder(OrderDTO dto) {
Order order = new Order();

// 生成订单ID(雪花算法,包含机器ID和时间戳)
order.setOrderId(idGenerator.nextId());
order.setUserId(dto.getUserId());
order.setProductCode(dto.getProductCode());
order.setQuantity(dto.getQuantity());
order.setAmount(dto.getAmount());
order.setStatus(OrderStatus.CREATED.getCode());
order.setCreateTime(LocalDateTime.now());

orderMapper.insert(order);

return order;
}

/**
* 查询用户订单列表
* 自动路由到 user_id % 4 对应的分库
*/
public List<Order> getUserOrders(Long userId) {
return orderMapper.selectByUserId(userId);
}

/**
* 查询单个订单
* 自动路由到 order_id % 4 对应的分表
*/
public Order getOrder(Long orderId) {
return orderMapper.selectByOrderId(orderId);
}

/**
* 分页查询 - 跨分片查询
* 需要从所有分片获取数据后聚合
*/
public Page<Order> queryOrders(OrderQueryDTO query, int pageNum, int pageSize) {
// ShardingSphere 会自动处理跨分片查询
// 但性能较差,建议使用 Elasticsearch 等搜索引擎
Page<Order> page = new Page<>(pageNum, pageSize);

LambdaQueryWrapper<Order> wrapper = new LambdaQueryWrapper<>();
if (query.getStatus() != null) {
wrapper.eq(Order::getStatus, query.getStatus());
}
if (query.getStartTime() != null) {
wrapper.ge(Order::getCreateTime, query.getStartTime());
}
wrapper.orderByDesc(Order::getCreateTime);

return orderMapper.selectPage(page, wrapper);
}
}

数据迁移方案

从单库迁移到分库分表是一个复杂的过程,需要谨慎处理:

/**
* 数据迁移服务
* 采用双写方案:同时写入旧库和新库
*/
@Service
public class DataMigrationService {

@Autowired
private LegacyOrderMapper legacyMapper; // 旧库

@Autowired
private OrderMapper newMapper; // 新分片库

/**
* 阶段1:历史数据迁移
* 按批次迁移,避免大事务
*/
public void migrateHistoryData() {
long lastId = 0;
int batchSize = 1000;

while (true) {
// 从旧库读取一批数据
List<Order> orders = legacyMapper.selectBatch(lastId, batchSize);
if (orders.isEmpty()) {
break;
}

// 写入新分片库
for (Order order : orders) {
// 需要重新生成分片键或保持原ID
newMapper.insert(order);
}

lastId = orders.get(orders.size() - 1).getOrderId();

log.info("已迁移 {} 条数据,最后ID: {}", orders.size(), lastId);
}
}

/**
* 阶段2:增量数据同步(双写)
*/
@Transactional
public void createOrderWithDualWrite(Order order) {
// 同时写入旧库和新库
legacyMapper.insert(order);
newMapper.insert(order);
}

/**
* 阶段3:数据校验
*/
public void verifyData() {
long lastId = 0;
int batchSize = 10000;
int errorCount = 0;

while (true) {
List<Order> legacyOrders = legacyMapper.selectBatch(lastId, batchSize);
if (legacyOrders.isEmpty()) break;

for (Order legacy : legacyOrders) {
Order newOrder = newMapper.selectByOrderId(legacy.getOrderId());

if (newOrder == null || !legacy.equals(newOrder)) {
log.error("数据不一致: orderId={}", legacy.getOrderId());
errorCount++;
}
}

lastId = legacyOrders.get(legacyOrders.size() - 1).getOrderId();
}

log.info("数据校验完成,不一致数量: {}", errorCount);
}
}

迁移流程

性能优化建议

  1. 避免跨分片查询:将跨分片的查询需求转移到 Elasticsearch
  2. 合理设置分片键:选择查询频率高的字段作为分片键
  3. 热点数据处理:对热点数据使用广播表或单独处理
  4. 连接池优化:每个分库独立配置连接池
  5. 监控告警:监控各分库的负载均衡情况
// 广播表配置:小表全量复制到每个分库
// 适用于配置表、字典表等
spring:
shardingsphere:
rules:
sharding:
broadcast-tables:
- t_config
- t_dict