数据分区与复制
数据分区(Sharding)和数据复制(Replication)是分布式数据管理的两个核心概念。
数据分区(Sharding)
为什么需要分区
分区策略
1. 哈希分区
将数据的某个字段哈希后取模,决定数据的分区:
// 哈希分区实现
class HashSharding {
private int shardCount;
public HashSharding(int shardCount) {
this.shardCount = shardCount;
}
// 计算数据所在的分片
public int getShard(String key) {
// 使用一致性哈希避免数据大量迁移
int hash = Math.abs(hash(key));
return hash % shardCount;
}
// MurmurHash比Java hashCode更均匀
private int hash(String key) {
MurmurHash hash = new MurmurHash();
return hash.hash(key);
}
}
优点:
- 数据分布均匀
- 适合点查询
缺点:
- 无法范围查询
- 增加节点需要迁移大量数据
2. 范围分区
按照数据的某个范围进行分区:
// 范围分区实现
class RangeSharding {
private List<RangeShard> shards;
// 范围分区映射表
public int getShard(String key) {
for (RangeShard shard : shards) {
if (key.compareTo(shard.start) >= 0 &&
key.compareTo(shard.end) < 0) {
return shard.id;
}
}
return shards.get(shards.size() - 1).id;
}
// 用户ID范围分区示例
static class UserRangeShard {
static Map<String, String> getUserShard(int userId) {
if (userId < 100000) return "shard1";
if (userId < 200000) return "shard2";
if (userId < 300000) => "shard3";
return "shard4";
}
}
}
优点:
- 支持范围查询
- 增加节点简单
缺点:
- 可能出现数据倾斜
- 热点数据集中
3. 一致性哈希
// 一致性哈希实现
class ConsistentHash {
// 虚拟节点数量
private static final int VIRTUAL_NODES = 150;
// 哈希环
private final SortedMap<Integer, String> circle = new TreeMap<>();
public ConsistentHash(List<String> nodes) {
for (String node : nodes) {
addNode(node);
}
}
// 添加节点
public void addNode(String node) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String vNode = node + "#" + i;
int hash = hash(vNode);
circle.put(hash, node);
}
}
// 移除节点
public void removeNode(String node) {
for (int i = 0; i < VIRTUAL_NODES; i++) {
String vNode = node + "#" + i;
int hash = hash(vNode);
circle.remove(hash);
}
}
// 获取数据对应的节点
public String getNode(String key) {
if (circle.isEmpty()) {
throw new IllegalStateException("没有可用节点");
}
int hash = hash(key);
// 找到第一个大于等于hash的节点
SortedMap<Integer, String> tail = circle.tailMap(hash);
if (tail.isEmpty()) {
// 环回到开头
return circle.get(circle.firstKey());
}
return tail.get(tail.firstKey());
}
private int hash(String key) {
return Math.abs(key.hashCode());
}
}
分区架构
分区带来的问题
跨分区查询
// 跨分片查询处理
class CrossShardQuery {
// 聚合查询:需要从多个分片获取结果后聚合
public int countUsersByAge(int age) {
int totalCount = 0;
// 并行查询所有分片
List<Future<Integer>> futures = new ArrayList<>();
for (Shard shard : allShards) {
futures.add(executor.submit(() -> shard.countByAge(age)));
}
// 聚合结果
for (Future<Integer> future : futures) {
totalCount += future.get();
}
return totalCount;
}
// 排序查询:从多分片获取数据后排序
public List<User> getTopUsers(int limit) {
List<User> allUsers = new ArrayList<>();
// 并行获取各分片数据
for (Shard shard : allShards) {
allUsers.addAll(shard.getAllUsers());
}
// 内存排序取Top N
return allUsers.stream()
.sorted(Comparator.comparing(User::getScore).reversed())
.limit(limit)
.collect(Collectors.toList());
}
}
分布式ID
// 分布式ID生成方案
class DistributedIdGenerator {
// 方案1:UUID
public String generateUUID() {
return UUID.randomUUID().toString();
}
// 方案2:雪花算法
static class SnowflakeId {
// 64位: 1位(符号) + 41位(时间戳) + 10位(机器ID) + 12位(序列号)
private long sequence = 0;
private long lastTimestamp = -1;
private final long workerId;
public SnowflakeId(long workerId) {
this.workerId = workerId;
}
public synchronized long nextId() {
long timestamp = System.currentTimeMillis();
// 时钟回拨处理
if (timestamp < lastTimestamp) {
timestamp = lastTimestamp;
}
if (timestamp == lastTimestamp) {
// 同一毫秒内,序列号+1
sequence = (sequence + 1) & ((1 << 12) - 1);
if (sequence == 0) {
// 序列号用完,等待下一毫秒
timestamp = waitNextMillis(timestamp);
}
} else {
sequence = 0;
}
lastTimestamp = timestamp;
// 组装ID
return ((timestamp - 1609459200000L) << 22) |
(workerId << 12) |
sequence;
}
}
// 方案3:数据库号段
static class DatabaseSegmentId {
private static final long BATCH_SIZE = 1000;
private long currentId = 0;
private long maxId = 0;
public synchronized long nextId() {
if (currentId >= maxId) {
// 从数据库获取新的号段
currentId = fetchFromDatabase();
maxId = currentId + BATCH_SIZE;
}
return currentId++;
}
}
}
数据复制
复制模式
1. 主从复制
// 主从复制实现
class MasterSlaveReplication {
// 主节点写入
public void write(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);
// 2. 同步/异步复制到从节点
replicateToSlaves(key, value);
}
// 从节点读取
public String read(String key, ReadConsistency consistency) {
if (consistency == ReadConsistency.STRONG) {
// 强一致性:读取主节点或多数派
return readFromQuorum(key);
} else {
// 弱一致性:读取任意从节点
return readFromAnySlave(key);
}
}
// 异步复制
private void replicateToSlaves(String key, String value) {
for (SlaveNode slave : slaves) {
// 异步复制,不等待完成
slave.asyncReplicate(key, value);
}
}
}
2. 复制策略
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 同步复制 | 数据不丢失 | 延迟高 | 金融交易 |
| 异步复制 | 延迟低 | 可能丢数据 | 普通业务 |
| 半同步复制 | 平衡 | - | 大多数场景 |
// 半同步复制
class SemiSyncReplication {
public void write(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);
// 2. 同步复制到至少N个节点
int syncCount = 0;
for (ReplicaNode replica : replicas) {
if (replica.syncWrite(key, value)) {
syncCount++;
}
// 多数派确认后返回
if (syncCount >= requiredQuorum) {
break;
}
}
// 3. 剩余节点异步复制
for (ReplicaNode replica : replicas) {
if (!replica.isSynced()) {
replica.asyncReplicate(key, value);
}
}
}
}
3. 复制延迟处理
// 复制延迟处理
class ReplicationLagHandler {
// 读请求重定向
public String readWithRetry(String key) {
try {
// 优先读从节点
return readFromSlave(key);
} catch (StaleDataException e) {
// 数据过期,重试读主节点
return readFromMaster(key);
}
}
// 写入后立即读取
public void writeAndRead(String key, String value) {
// 1. 写入主节点
masterDB.write(key, value);
// 2. 同步复制到足够节点
replicateToQuorum(key, value);
// 3. 确保复制完成后再读取
waitForReplication(key);
// 4. 读取
return readFromMaster(key);
}
}
数据一致性模型
// 各种一致性模型实现
// 1. 强一致性:所有节点同步
class StrongConsistency {
public void write(String key, String value) {
// 同步到所有节点
for (Node node : allNodes) {
node.syncWrite(key, value);
}
}
}
// 2. 顺序一致性:保持操作顺序
class SequentialConsistency {
// 使用向量时钟保证顺序
VectorClock clock = new VectorClock();
public void write(String key, String value) {
// 为操作分配全局顺序
long sequence = clock.incrementAndGet();
node.write(key, value, sequence);
}
}
// 3. 因果一致性:保证因果关系
class CausalConsistency {
// 记录因果关系
Map<String, VectorClock> causality = new HashMap<>();
public void write(String key, String value, VectorClock dependsOn) {
// 检查依赖是否满足
if (dependsOn != null && !isCausalitySatisfied(dependsOn)) {
// 等待依赖操作完成
waitFor(dependsOn);
}
node.write(key, value);
}
}
// 4. 最终一致性:允许短暂不一致
class EventualConsistency {
public void write(String key, String value) {
// 立即返回,异步复制
node.writeLocal(key, value);
asyncReplicateToAll(key, value);
}
}
数据分区与复制结合
典型架构
实现示例
// 分片+副本系统
class ShardedReplicatedDB {
// 分片路由
private ShardingRouter router;
// 各分片的主从集群
private Map<Integer, MasterSlaveCluster> clusters;
// 写入操作
public void write(String key, String value) {
// 1. 计算分片
int shardId = router.getShard(key);
// 2. 找到对应集群的主节点
MasterSlaveCluster cluster = clusters.get(shardId);
MasterNode master = cluster.getMaster();
// 3. 写入主节点(主节点会自动复制到从节点)
master.write(key, value);
}
// 读取操作
public String read(String key, ReadConsistency consistency) {
int shardId = router.getShard(key);
MasterSlaveCluster cluster = clusters.get(shardId);
return consistency == ReadConsistency.STRONG
? cluster.readFromQuorum(key)
: cluster.readFromAnySlave(key);
}
}
小结
本章我们深入学习了数据分区与复制的核心概念:
-
数据分区(Sharding)
- 哈希分区:数据分布均匀,无法范围查询
- 范围分区:支持范围查询,可能数据倾斜
- 一致性哈希:减少节点变动时的数据迁移
-
数据复制
- 主从复制:读写分离,提高读取能力
- 同步/异步/半同步:权衡一致性和性能
- 复制延迟处理:读写分离、重试机制
-
一致性模型
- 强一致性:所有节点同步
- 顺序一致性:保持操作顺序
- 因果一致性:保证因果关系
- 最终一致性:允许短暂不一致
-
实际应用
- 分布式ID生成:UUID、雪花算法、号段模式
- 跨分区查询:并行查询、结果聚合
- 分区与复制结合:高可用、高性能