分布式事务
分布式事务是指在多个独立的数据库或服务之间保证数据一致性的机制。由于网络的不确定性和节点的独立性,实现分布式事务比单机事务更加复杂。
分布式事务的挑战
本地事务 vs 分布式事务
分布式事务的难点
- 网络通信不可靠:消息可能丢失、延迟
- 节点可能故障:部分节点可能宕机
- 时钟不一致:不同节点时间可能有差异
- 数据隔离性困难:跨服务的数据操作难以隔离
两阶段提交(2PC)
两阶段提交是最经典的分布式事务协议。
协议流程
第一阶段:准备(Prepare)
// 事务管理器
class TwoPhaseCommit {
// 第一阶段:准备
public boolean prepare(List<Resource> resources) {
for (Resource resource : resources) {
try {
// 要求资源管理器准备好提交
if (!resource.prepare()) {
// 任意一个失败,通知所有回滚
rollbackAll(resources);
return false;
}
} catch (Exception e) {
rollbackAll(resources);
return false;
}
}
return true;
}
// 第二阶段:提交
public void commit(List<Resource> resources) {
for (Resource resource : resources) {
resource.commit();
}
}
// 回滚所有资源
private void rollbackAll(List<Resource> resources) {
for (Resource resource : resources) {
try {
resource.rollback();
} catch (Exception e) {
// 记录日志,后续补偿
log.error("回滚失败", e);
}
}
}
}
// 资源管理器(数据库)
class DatabaseResource {
// 准备阶段:锁定资源,写入预提交日志
public boolean prepare() {
try {
// 1. 获取排他锁
lock();
// 2. 写入预提交日志(保证可恢复性)
writePrepareLog();
// 3. 返回准备成功
return true;
} catch (Exception e) {
return false;
}
}
// 提交阶段:真正提交事务
public void commit() {
// 1. 写入提交日志
writeCommitLog();
// 2. 提交事务
doCommit();
// 3. 释放锁
unlock();
}
// 回滚阶段
public void rollback() {
// 1. 根据预提交日志回滚
doRollback();
// 2. 释放锁
unlock();
}
}
2PC 的问题
| 问题 | 描述 | 解决方案 |
|---|---|---|
| 同步阻塞 | prepare阶段会锁定所有资源 | 使用超时机制 |
| 单点故障 | 事务管理器故障导致阻塞 | 引入协调者备份 |
| 数据不一致 | 部分节点提交成功部分失败 | 依赖日志恢复 |
| 脑裂问题 | 网络分区导致多个协调者 | 使用Paxos选主 |
2PC 代码示例
// 简化的2PC实现
public class TwoPhaseCommitDemo {
// 事务管理器
static class TransactionManager {
private final List<Participant> participants = new ArrayList<>();
public void addParticipant(Participant participant) {
participants.add(participant);
}
public boolean commit() {
// 阶段1:准备
if (!prepare()) {
rollback();
return false;
}
// 阶段2:提交
try {
commitAll();
return true;
} catch (Exception e) {
rollback();
return false;
}
}
private boolean prepare() {
for (Participant p : participants) {
if (!p.prepare()) {
return false;
}
}
return true;
}
private void commitAll() {
for (Participant p : participants) {
p.commit();
}
}
private void rollback() {
for (Participant p : participants) {
try {
p.rollback();
} catch (Exception e) {
// 记录日志,后续补偿
}
}
}
}
// 参与者(数据库)
interface Participant {
boolean prepare();
void commit();
void rollback();
}
// 数据库参与者实现
static class DatabaseParticipant implements Participant {
private final String name;
public DatabaseParticipant(String name) {
this.name = name;
}
@Override
public boolean prepare() {
System.out.println(name + " 准备提交");
// 写入预提交日志
return true;
}
@Override
public void commit() {
System.out.println(name + " 提交");
// 写入提交日志
}
@Override
public void rollback() {
System.out.println(name + " 回滚");
// 根据日志回滚
}
}
public static void main(String[] args) {
TransactionManager tm = new TransactionManager();
tm.addParticipant(new DatabaseParticipant("数据库1"));
tm.addParticipant(new DatabaseParticipant("数据库2"));
boolean result = tm.commit();
System.out.println("事务结果: " + (result ? "成功" : "失败"));
}
}
三阶段提交(3PC)
三阶段提交是对2PC的改进,增加了超时机制和预提交阶段。
协议流程
3PC 的改进
| 改进点 | 2PC | 3PC |
|---|---|---|
| 超时机制 | 无 | 有 |
| 阻塞优化 | prepare锁定资源 | canCommit不锁定 |
| 数据一致性 | 可能数据不一致 | 更好的数据一致性 |
TCC(Try-Confirm-Cancel)
TCC是一种补偿式的事务方案,不依赖数据库的分布式事务支持。
原理
TCC 示例:转账
// 转账服务 - TCC实现
class TransferService {
@Transactional
public void transfer(String fromAccount, String toAccount, BigDecimal amount) {
// 1. Try阶段:预留资源
try {
accountService.tryLock(fromAccount, amount); // 锁定转出账户
accountService.tryReserve(toAccount, amount); // 预留转入账户
inventoryService.tryFreeze(fromAccount, amount); // 冻结库存(如需要)
} catch (Exception e) {
// 任意Try失败,进入Cancel阶段
cancel(fromAccount, toAccount, amount);
throw e;
}
// 2. Confirm阶段:确认使用资源
try {
accountService.confirmDeduct(fromAccount, amount);
accountService.confirmAdd(toAccount, amount);
inventoryService.confirmDeduct(fromAccount, amount);
} catch (Exception e) {
// Confirm失败,记录并人工处理
log.error("Confirm阶段失败,需要人工处理", e);
alertSupportTeam(fromAccount, toAccount, amount);
}
}
// Cancel阶段:回滚资源
private void cancel(String fromAccount, String toAccount, BigDecimal amount) {
try {
accountService.cancelLock(fromAccount, amount);
accountService.cancelReserve(toAccount, amount);
inventoryService.cancelFreeze(fromAccount, amount);
} catch (Exception e) {
log.error("Cancel阶段失败,需要人工处理", e);
alertSupportTeam(fromAccount, toAccount, amount);
}
}
}
// 账户服务接口
interface AccountService {
// Try: 尝试锁定账户
void tryLock(String accountId, BigDecimal amount);
// Confirm: 确认扣款
void confirmDeduct(String accountId, BigDecimal amount);
// Cancel: 取消锁定
void cancelLock(String accountId, BigDecimal amount);
// Try: 预留账户
void tryReserve(String accountId, BigDecimal amount);
// Confirm: 确认增加
void confirmAdd(String accountId, BigDecimal amount);
// Cancel: 取消预留
void cancelReserve(String accountId, BigDecimal amount);
}
TCC 实现细节
// TCC框架示例
@TCCTransaction
class OrderService {
@Try
public Order createOrder(OrderDTO orderDTO) {
// 1. 冻结库存
inventoryService.freeze(orderDTO.getItems());
// 2. 锁定余额
accountService.lock(orderDTO.getUserId(), orderDTO.getTotalAmount());
// 3. 创建订单(待确认状态)
Order order = new Order();
order.setStatus(OrderStatus.PENDING_CONFIRM);
orderRepository.save(order);
return order;
}
@Confirm
public void confirmOrder(Order order) {
// 1. 扣减库存
inventoryService.deduct(order.getItems());
// 2. 扣减余额
accountService.deduct(order.getUserId(), order.getTotalAmount());
// 3. 更新订单状态
order.setStatus(OrderStatus.CONFIRMED);
orderRepository.update(order);
}
@Cancel
public void cancelOrder(Order order) {
// 1. 恢复库存
inventoryService.unfreeze(order.getItems());
// 2. 恢复余额
accountService.unlock(order.getUserId(), order.getTotalAmount());
// 3. 更新订单状态
order.setStatus(OrderStatus.CANCELLED);
orderRepository.update(order);
}
}
// TCC注解定义
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TCCTransaction {}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Try {}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Confirm {}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Cancel {}
TCC 的问题与优化
| 问题 | 描述 | 解决方案 |
|---|---|---|
| 空回滚 | Try未执行但执行了Cancel | 记录事务日志,检查状态 |
| 悬挂 | Cancel比Try先执行 | 严格的状态机控制 |
| 幂等性 | 重复调用导致数据错误 | 使用唯一事务ID |
// TCC幂等性实现
class TccIdempotence {
// 使用分布式锁或数据库唯一键保证幂等性
public void confirm(TransactionContext context) {
String tccId = context.getTccId();
// 尝试获取分布式锁
boolean acquired = redis.setnx("tcc:lock:" + tccId, "1", 10);
if (!acquired) {
// 已经处理过,直接返回
return;
}
try {
// 检查是否已处理
TransactionLog log = transactionLogRepository.findByTccId(tccId);
if (log != null && log.getStatus() == TransactionStatus.CONFIRMED) {
return;
}
// 执行确认逻辑
doConfirm(context);
// 记录日志
transactionLogRepository.save(
new TransactionLog(tccId, TransactionStatus.CONFIRMED));
} finally {
redis.del("tcc:lock:" + tccId);
}
}
}
Saga 模式
Saga模式将长事务拆分为多个本地事务,每个本地事务都有对应的补偿操作。
原理
Saga 实现
// Saga协调器
class SagaOrchestrator {
private final Map<String, SagaChapter> chapters = new HashMap<>();
// 定义Saga编排
public void defineSaga(String sagaName, List<Chapter> chapterList) {
chapters.put(sagaName, new SagaChapter(sagaName, chapterList));
}
// 执行Saga
public void execute(String sagaName, Map<String, Object> input) {
SagaChapter saga = chapters.get(sagaName);
List<Chapter> chapters = saga.getChapters();
List<Compensation> compensations = new ArrayList<>();
try {
// 顺序执行所有章节
for (Chapter chapter : chapters) {
chapter.execute(input);
// 保存补偿操作
compensations.add(chapter.getCompensation());
}
} catch (Exception e) {
// 执行补偿(反向顺序)
for (int i = compensations.size() - 1; i >= 0; i--) {
try {
compensations.get(i).execute(input);
} catch (Exception ex) {
// 记录补偿失败,人工处理
log.error("补偿失败", ex);
alert(sagaName, input, compensations.subList(0, i + 1));
}
}
}
}
}
// Saga章节
class Chapter {
private final String name;
private final Executor executor;
private final Executor compensation;
public void execute(Map<String, Object> input) {
executor.execute(input);
}
public Executor getCompensation() {
return compensation;
}
}
Saga 示例:订单处理
// 订单Saga
class OrderSaga {
public void processOrder(Order order) {
Map<String, Object> context = new HashMap<>();
context.put("order", order);
SagaOrchestrator orchestrator = new SagaOrchestrator();
// 定义Saga编排
orchestrator.defineSaga("order", Arrays.asList(
// T1: 创建订单
new Chapter(
"createOrder",
(ctx) -> orderService.create((Order) ctx.get("order")),
(ctx) -> orderService.cancel(((Order) ctx.get("order")).getId())
),
// T2: 冻结库存
new Chapter(
"freezeInventory",
(ctx) -> inventoryService.freeze(order.getItems()),
(ctx) -> inventoryService.unfreeze(order.getItems())
),
// T3: 扣减余额
new Chapter(
"deductBalance",
(ctx) -> accountService.deduct(order.getUserId(), order.getTotalAmount()),
(ctx) -> accountService.refund(order.getUserId(), order.getTotalAmount())
),
// T4: 通知
new Chapter(
"sendNotification",
(ctx) -> notificationService.send(order.getUserId(), "订单已创建"),
(ctx) -> {} // 通知不需要补偿
)
));
// 执行Saga
orchestrator.execute("order", context);
}
}
可靠消息最终一致性
基于消息队列的分布式事务方案。
原理
实现方案
// 可靠消息服务
class ReliableMessageService {
// 发送消息(带本地事务)
public void sendMessageInTransaction(TransactionCallback callback) {
// 1. 开启本地事务
Transaction tx = transactionManager.begin();
try {
// 2. 执行本地业务
callback.execute();
// 3. 发送消息(仅发送到本地消息表)
saveMessageToLocal(tx, createMessage());
// 4. 提交本地事务
tx.commit();
} catch (Exception e) {
// 5. 回滚本地事务
tx.rollback();
throw e;
}
}
// 消息投递(定时扫描)
public void dispatchPendingMessages() {
List<Message> pendingMessages = messageRepository.findPending();
for (Message message : pendingMessages) {
try {
// 发送到消息队列
mqClient.send(message.getTopic(), message.getContent());
// 更新消息状态
message.setStatus(MessageStatus.SENT);
messageRepository.update(message);
} catch (Exception e) {
// 更新重试次数
message.incrementRetryCount();
if (message.getRetryCount() > MAX_RETRY) {
// 超过最大重试,记录并告警
alert(message);
}
}
}
}
}
RocketMQ 事务消息
// RocketMQ 事务消息
class RocketMQTransactionDemo {
public static void main(String[] args) {
TransactionMQProducer producer = new TransactionMQProducer("producer-group");
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 1. 执行本地业务
orderService.createOrder(msg);
// 2. 提交本地事务
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 3. 回滚本地事务
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 回查本地事务状态
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务是否成功
if (orderService.isOrderCreated(msg.getKeys())) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
// 发送事务消息
Message message = new Message("order-topic", "order", "order123", "content".getBytes());
producer.sendMessageInTransaction(message, null);
}
}
方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 2PC | 强一致性 | 同步阻塞、单点故障 | 对一致性要求高的金融场景 |
| 3PC | 改善阻塞 | 复杂、实现困难 | 很少使用 |
| TCC | 性能较高 | 实现复杂、需要改业务 | 业务可控制的场景 |
| Saga | 简单、易实现 | 无隔离性 | 长流程业务 |
| 可靠消息 | 低耦合 | 最终一致性 | 跨服务调用 |
小结
本章我们深入学习了分布式事务的各种方案:
-
2PC(两阶段提交)
- Prepare和Commit两个阶段
- 优点:保证强一致性
- 缺点:同步阻塞、单点故障
-
3PC(三阶段提交)
- 增加了CanCommit和超时机制
- 改善了阻塞问题
-
TCC(Try-Confirm-Cancel)
- 补偿式事务方案
- 需要业务代码实现Try/Confirm/Cancel
- 需处理空回滚、悬挂、幂等性问题
-
Saga模式
- 将长事务拆分为本地事务
- 正向执行,反向补偿
- 适用于长流程业务
-
可靠消息最终一致性
- 基于消息队列
- 最终一致性,低耦合
- 适用于跨服务调用
选择合适的分布式事务方案需要根据业务场景权衡一致性和性能。