跳到主要内容

分布式锁

分布式锁是分布式系统中保证多个进程互斥访问共享资源的重要机制。当多个服务实例需要协调对同一资源的访问时,本地锁(如 Java 的 synchronized 或 ReentrantLock)已经无法工作,因为它们只能在单个 JVM 进程内生效。这时就需要一种跨进程、跨服务器的锁机制——分布式锁。

为什么需要分布式锁

在单机环境中,我们可以使用操作系统提供的锁机制来保证互斥访问。无论是 Java 的 synchronized、ReentrantLock,还是数据库的行锁,本质上都是通过共享内存或共享存储来协调进程间的互斥。但在分布式环境中,多个服务实例部署在不同的服务器上,各自有独立的内存空间,无法通过本地锁来协调。

考虑一个典型的场景:电商系统中的库存扣减。假设有 10 台服务器同时处理订单,每个订单都需要检查并扣减库存。如果使用本地锁,每台服务器只能保证本服务器上的请求互斥,无法防止其他服务器同时扣减库存,可能导致超卖问题。

分布式锁解决了这个问题:所有服务器都向同一个外部系统请求锁,只有获得锁的服务器才能操作共享资源,从而实现跨服务器的互斥访问。

分布式锁的核心要求

一个合格的分布式锁实现需要满足以下核心要求:

互斥性(Mutual Exclusion):这是最基本的要求。任意时刻,最多只有一个客户端能持有锁。无论有多少客户端竞争,锁必须保证同一时刻只有一个获胜者。

不会死锁(No Deadlock):如果持有锁的客户端崩溃或网络中断,锁必须能够自动释放,不能永远被占用。这通常通过过期时间(TTL)来实现。

加锁解锁同源性:只有加锁的客户端才能解锁,不能被其他客户端误解锁。这需要在锁中存储客户端标识,解锁时验证身份。

高可用性(High Availability):锁服务本身应该是高可用的。即使部分节点故障,锁服务仍能正常工作。

高性能(High Performance):加锁和解锁操作的延迟应该尽可能低,否则会成为系统瓶颈。

可重入性(Reentrancy):同一个客户端可以多次获取同一把锁而不会死锁。这对于递归调用或嵌套事务很有用。

公平性(Fairness):可选的要求。按请求顺序获取锁,避免饥饿问题。

基于 Redis 的分布式锁

Redis 是实现分布式锁最常用的方案之一。它提供了原子性的操作命令,可以高效地实现加锁和解锁。

SETNX 实现

最基础的实现方式是使用 Redis 的 SET 命令配合 NX 和 PX 参数:

// Redis 分布式锁基础实现
public class RedisDistributedLock {

private final Jedis jedis;
private final String lockKey;
private final String lockValue; // 锁持有者的唯一标识
private final long expireTime; // 过期时间(毫秒)

/**
* 尝试获取锁
* @return 是否成功获取锁
*/
public boolean tryLock() {
// SET key value NX PX expireTime
// NX: 只在键不存在时设置(Not eXists)
// PX: 设置过期时间(毫秒)
String result = jedis.set(lockKey, lockValue, "NX", "PX", expireTime);
return "OK".equals(result);
}

/**
* 释放锁
* 必须使用 Lua 脚本保证原子性,避免误解锁
*/
public boolean unlock() {
// Lua 脚本:只有锁持有者才能释放
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

Long result = (Long) jedis.eval(script, 1, lockKey, lockValue);
return result != null && result > 0;
}
}

为什么使用 Lua 脚本释放锁?

释放锁看似简单——先 get 再 del,但这两个操作不是原子的。考虑这样的场景:

  1. 客户端 A 执行 get,确认锁属于自己
  2. 此时锁刚好过期,Redis 自动删除了锁
  3. 客户端 B 获取了锁
  4. 客户端 A 执行 del,错误地删除了 B 的锁

使用 Lua 脚本可以将 get 和 del 作为一个原子操作执行,避免这个问题。

可重入锁的实现

可重入锁需要记录锁的持有者和重入次数。可以使用 Redis 的 Hash 结构来存储:

// Redis 可重入锁
public class RedisReentrantLock {

private final Jedis jedis;
private final String lockKey;
private final String threadId; // 线程唯一标识

/**
* 尝试获取锁(支持重入)
*/
public boolean tryLock(long expireTime) {
// 检查锁是否存在
String currentValue = jedis.get(lockKey);

if (currentValue == null) {
// 锁不存在,直接获取
jedis.hset(lockKey, "count", "1");
jedis.hset(lockKey, "owner", threadId);
jedis.pexpire(lockKey, expireTime);
return true;
}

// 锁存在,检查是否是当前线程持有
String owner = jedis.hget(lockKey, "owner");
if (threadId.equals(owner)) {
// 重入:增加计数
jedis.hincrBy(lockKey, "count", 1);
jedis.pexpire(lockKey, expireTime);
return true;
}

// 被其他线程持有
return false;
}

/**
* 释放锁(支持重入计数)
*/
public boolean unlock() {
String owner = jedis.hget(lockKey, "owner");

if (!threadId.equals(owner)) {
return false; // 不是锁持有者
}

long count = Long.parseLong(jedis.hget(lockKey, "count"));
if (count > 1) {
// 重入计数减一
jedis.hincrBy(lockKey, "count", -1);
} else {
// 最后一次,删除锁
jedis.del(lockKey);
}

return true;
}
}

Redisson 框架

Redisson 是一个功能完善的 Redis 客户端,提供了丰富的分布式对象和服务。使用 Redisson 可以避免自己实现分布式锁的各种细节。

// 使用 Redisson 实现分布式锁
public class RedissonLockExample {

public static void main(String[] args) {
// 配置 Redisson 客户端
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("your-password");

RedissonClient redisson = Redisson.create(config);

// 获取锁对象
RLock lock = redisson.getLock("myLock");

try {
// 尝试获取锁
// waitTime: 等待获取锁的最长时间
// leaseTime: 锁自动释放时间
boolean acquired = lock.tryLock(10, 30, TimeUnit.SECONDS);

if (acquired) {
try {
// 执行业务逻辑
doBusiness();
} finally {
// 释放锁(只有锁持有者才能释放)
lock.unlock();
}
} else {
System.out.println("获取锁失败");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
redisson.shutdown();
}
}
}

Redisson 提供了多种类型的锁:

  • 可重入锁(RLock):支持重入,实现了 java.util.concurrent.locks.Lock 接口
  • 公平锁(RFairLock):按请求顺序获取锁
  • 读写锁(RReadWriteLock):支持读读共享、写写互斥、读写互斥
  • 红锁(RedissonRedLock):基于多个 Redis 节点的分布式锁

### Redlock 算法

单点 Redis 实现分布式锁存在一个根本问题:如果 Redis 节点宕机,锁数据就会丢失。即使配置了主从复制,由于 Redis 的复制是异步的,主节点宕机时可能还有部分锁数据没有同步到从节点,导致锁的安全性被破坏。

Redis 作者 Antirez 提出了 Redlock 算法来解决这个问题。Redlock 的核心思想是:使用多个独立的 Redis 节点,只有当客户端在大多数节点上都成功获取锁时,才认为获取锁成功。

#### 算法步骤

假设有 N 个独立的 Redis 节点(通常 N=5),Redlock 的获取锁流程如下:

1. **记录开始时间**:客户端记录当前时间戳 $t_1$。

2. **依次请求加锁**:客户端向 N 个 Redis 节点发送加锁请求,使用相同的 key 和 value。每个请求都设置一个较小的超时时间,避免在某个节点上长时间阻塞。

3. **计算获取锁耗时**:客户端计算从 $t_1$ 到收到最后一个响应的时间 $t_2$,得到获取锁的耗时 $t_2 - t_1$。

4. **判断是否成功**:如果客户端在 majority($\lfloor N/2 \rfloor + 1$)个节点上成功获取锁,且获取锁耗时小于锁的过期时间,则认为获取锁成功。

5. **有效时间计算**:锁的有效时间 = 设置的过期时间 - 获取锁耗时。这个有效时间是锁实际可用的窗口。

6. **失败处理**:如果在多数节点上获取锁失败,或者获取锁耗时超过锁过期时间,客户端向所有节点发送解锁请求。

Redlock 加锁流程(5 个节点):

客户端 │ ├────── SET lock_key value NX PX 30000 ──────→ Redis 1 ──→ OK │ ├────── SET lock_key value NX PX 30000 ──────→ Redis 2 ──→ OK │ ├────── SET lock_key value NX PX 30000 ──────→ Redis 3 ──→ OK │ ├────── SET lock_key value NX PX 30000 ──────→ Redis 4 ──→ (超时/失败) │ └────── SET lock_key value NX PX 30000 ──────→ Redis 5 ──→ OK

结果:5 个节点中 4 个成功(≥ 3),加锁成功 有效时间 = 30000ms - 获取耗时(假设 5ms)= 29995ms


#### 关键设计点

Redlock 的设计有几个关键点需要注意:

**节点独立性**:N 个 Redis 节点应该是独立的,不使用主从复制或集群模式。这样才能避免单点故障影响多个节点。实践中通常将节点部署在不同的物理机或机架上。

**随机超时**:向每个节点发送请求时,应该设置一个小于锁过期时间的超时。如果某个节点无响应,客户端可以快速超时并继续尝试其他节点,而不是无限等待。

**时钟漂移影响**:锁的有效时间必须减去获取锁的耗时,这是为了补偿在获取锁过程中流逝的时间。如果获取锁花了 2 秒,而锁过期时间只有 3 秒,实际可用时间只有 1 秒。

**安全释放**:释放锁时,客户端必须向所有节点发送解锁请求,无论之前是否在该节点上成功获取锁。这确保了不会留下"残留"的锁。

#### Redlock 实现

Redisson 提供了 Redlock 的完整实现:

```java
// Redisson Redlock 实现
public class RedlockExample {

public static void main(String[] args) {
// 创建多个独立的 Redis 客户端
Config config1 = new Config();
config1.useSingleServer()
.setAddress("redis://redis1:6379");

Config config2 = new Config();
config2.useSingleServer()
.setAddress("redis://redis2:6379");

Config config3 = new Config();
config3.useSingleServer()
.setAddress("redis://redis3:6379");

RedissonClient client1 = Redisson.create(config1);
RedissonClient client2 = Redisson.create(config2);
RedissonClient client3 = Redisson.create(config3);

// 创建 Redlock 对象
RLock lock1 = client1.getLock("myLock");
RLock lock2 = client2.getLock("myLock");
RLock lock3 = client3.getLock("myLock");

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);

try {
// 尝试获取锁
// waitTime: 等待获取锁的最长时间
// leaseTime: 锁自动释放时间
boolean acquired = redLock.tryLock(10, 30, TimeUnit.SECONDS);

if (acquired) {
// 计算锁的剩余有效时间
long remainingTime = redLock.remainTimeToLive();
System.out.println("获取锁成功,剩余有效时间: " + remainingTime + "ms");

// 执行业务逻辑
doBusiness();
} else {
System.out.println("获取锁失败");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放锁(会向所有节点发送解锁请求)
redLock.unlock();

// 关闭客户端
client1.shutdown();
client2.shutdown();
client3.shutdown();
}
}
}

Redlock 的争议

Redlock 算法在分布式系统社区引发了一些争议。Martin Kleppmann(DDIA 作者)在博客文章中对 Redlock 提出了批评,主要集中在以下几点:

时钟依赖问题:Redlock 严重依赖系统时钟的准确性。如果某个 Redis 节点的时钟发生漂移(如 NTP 同步导致时钟跳跃),可能导致锁提前过期或过期后仍被持有。

考虑这个场景:

  1. 客户端 A 从 Redis 1、2、3 获取了锁,锁过期时间 10 秒
  2. Redis 1 的时钟向前跳跃了 5 秒(NTP 同步)
  3. Redis 1 认为锁已过期,允许客户端 B 获取锁
  4. 此时客户端 A 和 B 都认为自己持有锁

GC 暂停问题:如果客户端在持有锁期间发生长时间的 GC 暂停(Stop-The-World),暂停期间锁可能已经过期。客户端恢复后继续执行,但锁可能已被其他客户端获取。

网络分区问题:在网络分区场景下,客户端可能无法与多数节点通信,导致获取锁失败。这与单节点 Redis 锁的行为不同,需要在可用性和安全性之间权衡。

Martin Kleppmann 认为,对于需要强正确性保证的场景,应该使用基于 fencing token 的方案(如 ZooKeeper 或 etcd),而不是依赖时间假设的 Redlock。

Redlock vs 其他方案

方案优点缺点适用场景
单节点 Redis 锁简单、高性能单点故障对可靠性要求不高的场景
Redlock容错性高依赖时钟、复杂对可靠性有一定要求的场景
ZooKeeper 锁强一致性、支持 fencing性能较低、运维复杂对正确性要求极高的场景
etcd 锁强一致性、性能好需要额外组件Kubernetes 等云原生场景

实践建议

根据业务场景选择合适的分布式锁方案:

对于效率优化场景(如缓存更新、定时任务去重),单节点 Redis 锁通常足够。即使偶尔出现锁失效,也只是导致少量重复工作,不会产生严重后果。

对于正确性要求高的场景(如库存扣减、账户转账),应该考虑使用 ZooKeeper 或 etcd。这些系统基于共识算法,不依赖时间假设,提供更强的正确性保证。

如果选择 Redlock,需要注意:

  • 确保 Redis 节点部署在独立的物理环境
  • 监控时钟同步状态,设置时钟漂移告警
  • 锁的过期时间应该远大于业务的预期执行时间
  • 业务代码要有处理锁失效的保护逻辑
// 使用 Redlock 时添加保护逻辑
public void doWithRedlock(RedissonRedLock redLock) {
try {
if (redLock.tryLock(10, 30, TimeUnit.SECONDS)) {
long startTime = System.currentTimeMillis();
long remainingTime = redLock.remainTimeToLive();

try {
// 业务执行前检查剩余时间
if (remainingTime < BUSINESS_TIMEOUT) {
throw new LockTimeInsufficientException();
}

doBusiness();

// 业务执行后检查是否仍在锁有效期内
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed > remainingTime) {
// 锁已过期,结果可能不可靠
log.warn("业务执行时间 {}ms 超过锁剩余时间 {}ms", elapsed, remainingTime);
// 根据业务需求决定是否回滚
}
} finally {
// 只有在锁仍持有时才释放
if (redLock.isLocked() && redLock.isHeldByCurrentThread()) {
redLock.unlock();
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

基于 Zookeeper 的分布式锁

原理

实现

// Zookeeper 分布式锁
class ZookeeperLock {

private ZooKeeper zk;
private String lockPath;
private String currentNode;
private CountDownLatch latch = new CountDownLatch(1);

// 获取锁
public boolean tryLock(String lockName, long timeout, TimeUnit unit)
throws KeeperException, InterruptedException {

lockPath = "/" + lockName;

// 1. 创建临时顺序节点
currentNode = zk.create(
lockPath + "/lock_",
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL
);

// 2. 检查是否是最小节点
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);

String smallestNode = children.get(0);

if (currentNode.endsWith(smallestNode)) {
// 获取锁成功
return true;
} else {
// 3. 等待前一个节点删除
String predecessor = findPredecessor(children);

Watcher watcher = event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
latch.countDown();
}
};

zk.exists(lockPath + "/" + predecessor, watcher);

// 等待超时
return latch.await(timeout, unit);
}
}

// 释放锁
public void unlock() throws KeeperException, InterruptedException {
if (currentNode != null) {
zk.delete(currentNode, -1);
}
}

// 找到前一个节点
private String findPredecessor(List<String> children) {
String currentName = currentNode.substring(currentNode.lastIndexOf("/") + 1);
for (int i = children.size() - 1; i >= 0; i--) {
if (children.get(i).compareTo(currentName) < 0) {
return children.get(i);
}
}
return null;
}
}

// 使用 Curator 框架
class CuratorLockExample {

public static void main(String[] args) throws Exception {
// 创建 Curator 客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();

client.start();

// 获取可重入锁
InterProcessMutex lock = new InterProcessMutex(
client,
"/my/lock"
);

// 获取锁
boolean acquired = lock.acquire(30, TimeUnit.SECONDS);

if (acquired) {
try {
// 执行业务逻辑
doBusiness();
} finally {
// 释放锁
lock.release();
}
}

client.close();
}
}

基于数据库的分布式锁

表级锁

// 基于数据库的分布式锁
class DatabaseLock {

@Autowired
private DataSource dataSource;

// 尝试获取锁
public boolean tryLock(String lockName, String owner, long expireTime) {
String sql = "INSERT INTO distributed_locks (lock_name, owner, expire_time) " +
"VALUES (?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " +
"owner = IF(expire_time < NOW(), ?, owner), " +
"expire_time = IF(expire_time < NOW(), ?, expire_time)";

Timestamp expireAt = new Timestamp(System.currentTimeMillis() + expireTime);

try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {

ps.setString(1, lockName);
ps.setTimestamp(2, expireAt);
ps.setTimestamp(3, expireAt);
ps.setString(4, owner);
ps.setTimestamp(5, expireAt);

int result = ps.executeUpdate();
return result > 0;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

// 释放锁
public boolean unlock(String lockName, String owner) {
String sql = "DELETE FROM distributed_locks " +
"WHERE lock_name = ? AND owner = ?";

try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {

ps.setString(1, lockName);
ps.setString(2, owner);

int result = ps.executeUpdate();
return result > 0;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

// 使用乐观锁的分布式锁
public boolean tryOptimisticLock(String lockName, String owner, int version) {
String sql = "UPDATE distributed_locks " +
"SET owner = ?, version = version + 1 " +
"WHERE lock_name = ? AND version = ?";

try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {

ps.setString(1, owner);
ps.setString(2, lockName);
ps.setInt(3, version);

int result = ps.executeUpdate();
return result > 0;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
-- 锁表结构
CREATE TABLE distributed_locks (
lock_name VARCHAR(64) PRIMARY KEY,
owner VARCHAR(64) NOT NULL,
expire_time TIMESTAMP NOT NULL,
version INT DEFAULT 0,
INDEX idx_expire_time (expire_time)
);

分布式锁对比

特性RedisZookeeper数据库
性能
可靠性需要额外保证
实现复杂度
故障处理依赖TTL自动清理依赖事务
公平性需要额外实现支持需要额外实现
可重入支持支持支持

最佳实践

1. 锁的粒度

// ❌ 粗粒度锁
public void processOrder(Order order) {
lock.lock("order:" + order.getId()); // 锁住整个订单处理
// 处理订单
lock.unlock();
}

// ✅ 细粒度锁
public void processOrder(Order order) {
// 只锁库存
lock.lock("inventory:" + order.getProductId());
try {
checkAndDeductInventory(order.getProductId(), order.getQuantity());
} finally {
lock.unlock();
}

// 只锁余额
lock.lock("balance:" + order.getUserId());
try {
checkAndDeductBalance(order.getUserId(), order.getTotalAmount());
} finally {
lock.unlock();
}
}

2. 锁超时

// 设置合理的锁超时时间
class LockTimeoutExample {

// 计算合理的超时时间
public long calculateLockTimeout() {
// 基于操作历史估算
long avgOperationTime = getAverageOperationTime();
// 加上缓冲时间
return avgOperationTime * 2 + 10000; // 额外10秒缓冲
}

// 建议值:5-30秒
// 过长:节点崩溃时锁长时间不释放
// 过短:业务处理时间长导致锁自动释放
}

3. 异步续期

// 看门狗机制:自动续期
class WatchDog {

private ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);

// 启动看门狗
public void startWatchDog(RLock lock, String lockName) {
// 每10秒检查一次
scheduler.scheduleAtFixedRate(() -> {
if (lock.isHeldByCurrentThread()) {
// 续期
lock.lock();
try {
lock.expire(30, TimeUnit.SECONDS);
} finally {
lock.unlock();
}
}
}, 10, 10, TimeUnit.SECONDS);
}
}

4. 异常处理

// 确保锁最终释放
class LockSafetyExample {

public void doWithLock(RLock lock) {
try {
// 获取锁
lock.lock();

// 执行业务逻辑
doBusiness();

} catch (Exception e) {
// 业务异常处理

} finally {
// 确保释放
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

小结

本章我们深入学习了分布式锁:

  1. 为什么需要分布式锁

    • 单机锁无法跨服务器工作
    • 分布式环境下需要跨进程互斥
  2. 分布式锁的要求

    • 互斥、可重入、公平性、故障处理、高性能
  3. Redis 分布式锁

    • SETNX 实现
    • Lua 脚本保证原子性
    • Redisson 客户端封装
  4. Zookeeper 分布式锁

    • 临时顺序节点
    • Watch 机制通知
    • Curator 框架
  5. 数据库分布式锁

    • 基于表结构
    • 乐观锁/悲观锁
  6. 最佳实践

    • 合理的锁粒度
    • 适当的超时时间
    • 自动续期机制
    • 完善的异常处理