跳到主要内容

数据分区与复制

数据分区(Sharding)和数据复制(Replication)是分布式数据管理的两个核心概念。

数据分区(Sharding)

为什么需要分区

分区策略

1. 哈希分区

将数据的某个字段哈希后取模,决定数据的分区:

// 哈希分区实现
class HashSharding {

private int shardCount;

public HashSharding(int shardCount) {
this.shardCount = shardCount;
}

// 计算数据所在的分片
public int getShard(String key) {
// 使用一致性哈希避免数据大量迁移
int hash = Math.abs(hash(key));
return hash % shardCount;
}

// MurmurHash比Java hashCode更均匀
private int hash(String key) {
MurmurHash hash = new MurmurHash();
return hash.hash(key);
}
}

优点

  • 数据分布均匀
  • 适合点查询

缺点

  • 无法范围查询
  • 增加节点需要迁移大量数据

2. 范围分区

按照数据的某个范围进行分区:

// 范围分区实现
class RangeSharding {

private List<RangeShard> shards;

// 范围分区映射表
public int getShard(String key) {
for (RangeShard shard : shards) {
if (key.compareTo(shard.start) >= 0 &&
key.compareTo(shard.end) < 0) {
return shard.id;
}
}
return shards.get(shards.size() - 1).id;
}

// 用户ID范围分区示例
static class UserRangeShard {
static Map<String, String> getUserShard(int userId) {
if (userId < 100000) return "shard1";
if (userId < 200000) return "shard2";
if (userId < 300000) => "shard3";
return "shard4";
}
}
}

优点

  • 支持范围查询
  • 增加节点简单

缺点

  • 可能出现数据倾斜
  • 热点数据集中

3. 一致性哈希

// 一致性哈希实现
class ConsistentHash {

// 虚拟节点数量
private static final int VIRTUAL_NODES = 150;

// 哈希环
private final SortedMap<Integer, String> circle = new TreeMap<>();

public ConsistentHash(List<String> nodes) {
for (String node : nodes) {
addNode(node);
}
}

// 添加节点
public void addNode(String node) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String vNode = node + "#" + i;
int hash = hash(vNode);
circle.put(hash, node);
}
}

// 移除节点
public void removeNode(String node) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String vNode = node + "#" + i;
int hash = hash(vNode);
circle.remove(hash);
}
}

// 获取数据对应的节点
public String getNode(String key) {
if (circle.isEmpty()) {
throw new IllegalStateException("没有可用节点");
}

int hash = hash(key);

// 找到第一个大于等于hash的节点
SortedMap<Integer, String> tail = circle.tailMap(hash);

if (tail.isEmpty()) {
// 环回到开头
return circle.get(circle.firstKey());
}

return tail.get(tail.firstKey());
}

private int hash(String key) {
return Math.abs(key.hashCode());
}
}

分区架构

分区带来的问题

跨分区查询

// 跨分片查询处理
class CrossShardQuery {

// 聚合查询:需要从多个分片获取结果后聚合
public int countUsersByAge(int age) {
int totalCount = 0;

// 并行查询所有分片
List<Future<Integer>> futures = new ArrayList<>();
for (Shard shard : allShards) {
futures.add(executor.submit(() -> shard.countByAge(age)));
}

// 聚合结果
for (Future<Integer> future : futures) {
totalCount += future.get();
}

return totalCount;
}

// 排序查询:从多分片获取数据后排序
public List<User> getTopUsers(int limit) {
List<User> allUsers = new ArrayList<>();

// 并行获取各分片数据
for (Shard shard : allShards) {
allUsers.addAll(shard.getAllUsers());
}

// 内存排序取Top N
return allUsers.stream()
.sorted(Comparator.comparing(User::getScore).reversed())
.limit(limit)
.collect(Collectors.toList());
}
}

分布式ID

// 分布式ID生成方案
class DistributedIdGenerator {

// 方案1:UUID
public String generateUUID() {
return UUID.randomUUID().toString();
}

// 方案2:雪花算法
static class SnowflakeId {
// 64位: 1位(符号) + 41位(时间戳) + 10位(机器ID) + 12位(序列号)
private long sequence = 0;
private long lastTimestamp = -1;
private final long workerId;

public SnowflakeId(long workerId) {
this.workerId = workerId;
}

public synchronized long nextId() {
long timestamp = System.currentTimeMillis();

// 时钟回拨处理
if (timestamp < lastTimestamp) {
timestamp = lastTimestamp;
}

if (timestamp == lastTimestamp) {
// 同一毫秒内,序列号+1
sequence = (sequence + 1) & ((1 << 12) - 1);
if (sequence == 0) {
// 序列号用完,等待下一毫秒
timestamp = waitNextMillis(timestamp);
}
} else {
sequence = 0;
}

lastTimestamp = timestamp;

// 组装ID
return ((timestamp - 1609459200000L) << 22) |
(workerId << 12) |
sequence;
}
}

// 方案3:数据库号段
static class DatabaseSegmentId {
private static final long BATCH_SIZE = 1000;
private long currentId = 0;
private long maxId = 0;

public synchronized long nextId() {
if (currentId >= maxId) {
// 从数据库获取新的号段
currentId = fetchFromDatabase();
maxId = currentId + BATCH_SIZE;
}
return currentId++;
}
}
}

数据复制

复制模式

1. 主从复制

// 主从复制实现
class MasterSlaveReplication {

// 主节点写入
public void write(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);

// 2. 同步/异步复制到从节点
replicateToSlaves(key, value);
}

// 从节点读取
public String read(String key, ReadConsistency consistency) {
if (consistency == ReadConsistency.STRONG) {
// 强一致性:读取主节点或多数派
return readFromQuorum(key);
} else {
// 弱一致性:读取任意从节点
return readFromAnySlave(key);
}
}

// 异步复制
private void replicateToSlaves(String key, String value) {
for (SlaveNode slave : slaves) {
// 异步复制,不等待完成
slave.asyncReplicate(key, value);
}
}
}

2. 复制策略

策略优点缺点适用场景
同步复制数据不丢失延迟高金融交易
异步复制延迟低可能丢数据普通业务
半同步复制平衡-大多数场景
// 半同步复制
class SemiSyncReplication {

public void write(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);

// 2. 同步复制到至少N个节点
int syncCount = 0;
for (ReplicaNode replica : replicas) {
if (replica.syncWrite(key, value)) {
syncCount++;
}
// 多数派确认后返回
if (syncCount >= requiredQuorum) {
break;
}
}

// 3. 剩余节点异步复制
for (ReplicaNode replica : replicas) {
if (!replica.isSynced()) {
replica.asyncReplicate(key, value);
}
}
}
}

3. 复制延迟处理

// 复制延迟处理
class ReplicationLagHandler {

// 读请求重定向
public String readWithRetry(String key) {
try {
// 优先读从节点
return readFromSlave(key);
} catch (StaleDataException e) {
// 数据过期,重试读主节点
return readFromMaster(key);
}
}

// 写入后立即读取
public void writeAndRead(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);

// 2. 同步复制到足够节点
replicateToQuorum(key, value);

// 3. 确保复制完成后再读取
waitForReplication(key);

// 4. 读取
return readFromMaster(key);
}
}

数据一致性模型

// 各种一致性模型实现

// 1. 强一致性:所有节点同步
class StrongConsistency {
public void write(String key, String value) {
// 同步到所有节点
for (Node node : allNodes) {
node.syncWrite(key, value);
}
}
}

// 2. 顺序一致性:保持操作顺序
class SequentialConsistency {
// 使用向量时钟保证顺序
VectorClock clock = new VectorClock();

public void write(String key, String value) {
// 为操作分配全局顺序
long sequence = clock.incrementAndGet();
node.write(key, value, sequence);
}
}

// 3. 因果一致性:保证因果关系
class CausalConsistency {
// 记录因果关系
Map<String, VectorClock> causality = new HashMap<>();

public void write(String key, String value, VectorClock dependsOn) {
// 检查依赖是否满足
if (dependsOn != null && !isCausalitySatisfied(dependsOn)) {
// 等待依赖操作完成
waitFor(dependsOn);
}
node.write(key, value);
}
}

// 4. 最终一致性:允许短暂不一致
class EventualConsistency {
public void write(String key, String value) {
// 立即返回,异步复制
node.writeLocal(key, value);
asyncReplicateToAll(key, value);
}
}

数据分区与复制结合

典型架构

实现示例

// 分片+副本系统
class ShardedReplicatedDB {

// 分片路由
private ShardingRouter router;

// 各分片的主从集群
private Map<Integer, MasterSlaveCluster> clusters;

// 写入操作
public void write(String key, String value) {
// 1. 计算分片
int shardId = router.getShard(key);

// 2. 找到对应集群的主节点
MasterSlaveCluster cluster = clusters.get(shardId);
MasterNode master = cluster.getMaster();

// 3. 写入主节点(主节点会自动复制到从节点)
master.write(key, value);
}

// 读取操作
public String read(String key, ReadConsistency consistency) {
int shardId = router.getShard(key);
MasterSlaveCluster cluster = clusters.get(shardId);

return consistency == ReadConsistency.STRONG
? cluster.readFromQuorum(key)
: cluster.readFromAnySlave(key);
}
}

小结

本章我们深入学习了数据分区与复制的核心概念:

  1. 数据分区(Sharding)

    • 哈希分区:数据分布均匀,无法范围查询
    • 范围分区:支持范围查询,可能数据倾斜
    • 一致性哈希:减少节点变动时的数据迁移
  2. 数据复制

    • 主从复制:读写分离,提高读取能力
    • 同步/异步/半同步:权衡一致性和性能
    • 复制延迟处理:读写分离、重试机制
  3. 一致性模型

    • 强一致性:所有节点同步
    • 顺序一致性:保持操作顺序
    • 因果一致性:保证因果关系
    • 最终一致性:允许短暂不一致
  4. 实际应用

    • 分布式ID生成:UUID、雪花算法、号段模式
    • 跨分区查询:并行查询、结果聚合
    • 分区与复制结合:高可用、高性能