分布式锁
分布式锁是分布式系统中保证多个进程互斥访问共享资源的重要机制。
为什么需要分布式锁
在单机环境中,我们可以使用synchronized或ReentrantLock来保证互斥。但在分布式环境中,多个进程部署在不同服务器上,需要跨服务器的锁机制。
分布式锁的要求
| 要求 | 说明 |
|---|---|
| 互斥 | 任意时刻只有一个客户端能持有锁 |
| 可重入 | 同一客户端可多次获取同一把锁 |
| 公平性 | 按照请求顺序获取锁(可选) |
| 故障处理 | 客户端崩溃时,锁能自动释放 |
| 高性能 | 获取/释放锁的操作延迟低 |
基于 Redis 的分布式锁
SETNX 实现
// Redis 分布式锁
class RedisDistributedLock {
private Jedis jedis;
private String lockKey;
private String lockValue; // 锁持有者的标识
// 尝试获取锁
public boolean tryLock(String key, String value, long expireTime) {
// SET key value NX PX expireTime
// NX: 只在键不存在时设置
// PX: 过期时间(毫秒)
String result = jedis.set(key, value, "NX", "PX", expireTime);
return "OK".equals(result);
}
// 释放锁
public boolean unlock(String key, String value) {
// Lua 脚本:只有锁持有者才能释放
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
Long result = jedis.eval(script, 1, key, value);
return result > 0;
}
// 获取锁(带重试)
public boolean lockWithRetry(String key, String value,
int maxRetries, long retryInterval) {
for (int i = 0; i < maxRetries; i++) {
if (tryLock(key, value, 30000)) {
return true;
}
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
}
可重入锁
// Redis 可重入锁
class RedisReentrantLock {
private Jedis jedis;
// 存储结构:key -> {count, threadId}
public boolean tryLock(String key, String threadId, long expireTime) {
// 检查是否已持有锁
String lockInfo = jedis.hget(key, "count");
if (lockInfo != null) {
// 已持有锁,检查是否是同一线程
if (lockInfo.startsWith(threadId + ":")) {
// 重入:增加计数
long count = Long.parseLong(lockInfo.split(":")[1]);
jedis.hset(key, "count", threadId + ":" + (count + 1));
return true;
} else {
// 不同线程,无法获取
return false;
}
}
// 首次获取锁
Map<String, String> lockValue = new HashMap<>();
lockValue.put("count", "1");
lockValue.put("thread", threadId);
String result = jedis.hsetnx(key, lockValue);
if (result == 1) {
// 设置过期时间
jedis.pexpire(key, expireTime);
return true;
}
return false;
}
public boolean unlock(String key, String threadId) {
String lockInfo = jedis.hget(key, "count");
if (lockInfo == null) {
return true; // 锁已过期
}
String[] parts = lockInfo.split(":");
if (!parts[0].equals(threadId)) {
return false; // 不是锁持有者
}
long count = Long.parseLong(parts[1]);
if (count > 1) {
// 重入:减少计数
jedis.hset(key, "count", threadId + ":" + (count - 1));
} else {
// 最后一个释放
jedis.del(key);
}
return true;
}
}
Redisson 实现
// 使用 Redisson 客户端
class RedissonLockExample {
public static void main(String[] args) {
// 创建 Redisson 客户端
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
// 获取锁
RLock lock = redisson.getLock("myLock");
try {
// 尝试获取锁(等待100秒,锁持有10秒)
boolean acquired = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (acquired) {
// 执行业务逻辑
doBusiness();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
redisson.shutdown();
}
// Redisson 可重入锁
public static void reentrantExample(RedissonClient redisson) {
RLock lock = redisson.getLock("myLock");
lock.lock();
try {
// 可以多次获取同一把锁
lock.lock();
try {
// 业务逻辑
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
}
// 公平锁
public static void fairLockExample(RedissonClient redisson) {
// 获取公平锁(按请求顺序获取)
RLock fairLock = redisson.getFairLock("myFairLock");
fairLock.lock();
try {
// 业务逻辑
} finally {
fairLock.unlock();
}
}
}
基于 Zookeeper 的分布式锁
原理
实现
// Zookeeper 分布式锁
class ZookeeperLock {
private ZooKeeper zk;
private String lockPath;
private String currentNode;
private CountDownLatch latch = new CountDownLatch(1);
// 获取锁
public boolean tryLock(String lockName, long timeout, TimeUnit unit)
throws KeeperException, InterruptedException {
lockPath = "/" + lockName;
// 1. 创建临时顺序节点
currentNode = zk.create(
lockPath + "/lock_",
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL
);
// 2. 检查是否是最小节点
List<String> children = zk.getChildren(lockPath, false);
Collections.sort(children);
String smallestNode = children.get(0);
if (currentNode.endsWith(smallestNode)) {
// 获取锁成功
return true;
} else {
// 3. 等待前一个节点删除
String predecessor = findPredecessor(children);
Watcher watcher = event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
latch.countDown();
}
};
zk.exists(lockPath + "/" + predecessor, watcher);
// 等待超时
return latch.await(timeout, unit);
}
}
// 释放锁
public void unlock() throws KeeperException, InterruptedException {
if (currentNode != null) {
zk.delete(currentNode, -1);
}
}
// 找到前一个节点
private String findPredecessor(List<String> children) {
String currentName = currentNode.substring(currentNode.lastIndexOf("/") + 1);
for (int i = children.size() - 1; i >= 0; i--) {
if (children.get(i).compareTo(currentName) < 0) {
return children.get(i);
}
}
return null;
}
}
// 使用 Curator 框架
class CuratorLockExample {
public static void main(String[] args) throws Exception {
// 创建 Curator 客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
// 获取可重入锁
InterProcessMutex lock = new InterProcessMutex(
client,
"/my/lock"
);
// 获取锁
boolean acquired = lock.acquire(30, TimeUnit.SECONDS);
if (acquired) {
try {
// 执行业务逻辑
doBusiness();
} finally {
// 释放锁
lock.release();
}
}
client.close();
}
}
基于数据库的分布式锁
表级锁
// 基于数据库的分布式锁
class DatabaseLock {
@Autowired
private DataSource dataSource;
// 尝试获取锁
public boolean tryLock(String lockName, String owner, long expireTime) {
String sql = "INSERT INTO distributed_locks (lock_name, owner, expire_time) " +
"VALUES (?, ?, ?) " +
"ON DUPLICATE KEY UPDATE " +
"owner = IF(expire_time < NOW(), ?, owner), " +
"expire_time = IF(expire_time < NOW(), ?, expire_time)";
Timestamp expireAt = new Timestamp(System.currentTimeMillis() + expireTime);
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, lockName);
ps.setTimestamp(2, expireAt);
ps.setTimestamp(3, expireAt);
ps.setString(4, owner);
ps.setTimestamp(5, expireAt);
int result = ps.executeUpdate();
return result > 0;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
// 释放锁
public boolean unlock(String lockName, String owner) {
String sql = "DELETE FROM distributed_locks " +
"WHERE lock_name = ? AND owner = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, lockName);
ps.setString(2, owner);
int result = ps.executeUpdate();
return result > 0;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
// 使用乐观锁的分布式锁
public boolean tryOptimisticLock(String lockName, String owner, int version) {
String sql = "UPDATE distributed_locks " +
"SET owner = ?, version = version + 1 " +
"WHERE lock_name = ? AND version = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, owner);
ps.setString(2, lockName);
ps.setInt(3, version);
int result = ps.executeUpdate();
return result > 0;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
-- 锁表结构
CREATE TABLE distributed_locks (
lock_name VARCHAR(64) PRIMARY KEY,
owner VARCHAR(64) NOT NULL,
expire_time TIMESTAMP NOT NULL,
version INT DEFAULT 0,
INDEX idx_expire_time (expire_time)
);
分布式锁对比
| 特性 | Redis | Zookeeper | 数据库 |
|---|---|---|---|
| 性能 | 高 | 中 | 低 |
| 可靠性 | 需要额外保证 | 高 | 高 |
| 实现复杂度 | 中 | 中 | 低 |
| 故障处理 | 依赖TTL | 自动清理 | 依赖事务 |
| 公平性 | 需要额外实现 | 支持 | 需要额外实现 |
| 可重入 | 支持 | 支持 | 支持 |
最佳实践
1. 锁的粒度
// ❌ 粗粒度锁
public void processOrder(Order order) {
lock.lock("order:" + order.getId()); // 锁住整个订单处理
// 处理订单
lock.unlock();
}
// ✅ 细粒度锁
public void processOrder(Order order) {
// 只锁库存
lock.lock("inventory:" + order.getProductId());
try {
checkAndDeductInventory(order.getProductId(), order.getQuantity());
} finally {
lock.unlock();
}
// 只锁余额
lock.lock("balance:" + order.getUserId());
try {
checkAndDeductBalance(order.getUserId(), order.getTotalAmount());
} finally {
lock.unlock();
}
}
2. 锁超时
// 设置合理的锁超时时间
class LockTimeoutExample {
// 计算合理的超时时间
public long calculateLockTimeout() {
// 基于操作历史估算
long avgOperationTime = getAverageOperationTime();
// 加上缓冲时间
return avgOperationTime * 2 + 10000; // 额外10秒缓冲
}
// 建议值:5-30秒
// 过长:节点崩溃时锁长时间不释放
// 过短:业务处理时间长导致锁自动释放
}
3. 异步续期
// 看门狗机制:自动续期
class WatchDog {
private ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
// 启动看门狗
public void startWatchDog(RLock lock, String lockName) {
// 每10秒检查一次
scheduler.scheduleAtFixedRate(() -> {
if (lock.isHeldByCurrentThread()) {
// 续期
lock.lock();
try {
lock.expire(30, TimeUnit.SECONDS);
} finally {
lock.unlock();
}
}
}, 10, 10, TimeUnit.SECONDS);
}
}
4. 异常处理
// 确保锁最终释放
class LockSafetyExample {
public void doWithLock(RLock lock) {
try {
// 获取锁
lock.lock();
// 执行业务逻辑
doBusiness();
} catch (Exception e) {
// 业务异常处理
} finally {
// 确保释放
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
小结
本章我们深入学习了分布式锁:
-
为什么需要分布式锁
- 单机锁无法跨服务器工作
- 分布式环境下需要跨进程互斥
-
分布式锁的要求
- 互斥、可重入、公平性、故障处理、高性能
-
Redis 分布式锁
- SETNX 实现
- Lua 脚本保证原子性
- Redisson 客户端封装
-
Zookeeper 分布式锁
- 临时顺序节点
- Watch 机制通知
- Curator 框架
-
数据库分布式锁
- 基于表结构
- 乐观锁/悲观锁
-
最佳实践
- 合理的锁粒度
- 适当的超时时间
- 自动续期机制
- 完善的异常处理