分布式事务
分布式事务是指在多个独立的数据库或服务之间保证数据一致性的机制。与单机事务不同,分布式事务需要协调多个节点上的操作,确保它们要么全部成功,要么全部失败,从而维护数据的一致性。
在微服务架构中,一个业务操作可能涉及多个服务的多个数据库。例如,一个电商下单操作可能涉及:库存服务扣减库存、订单服务创建订单、支付服务扣减余额。这些操作分布在不同的服务和数据库中,如何保证它们的一致性?这就是分布式事务要解决的问题。
分布式事务的本质挑战
理解分布式事务的挑战,需要先理解单机事务与分布式事务的根本区别。
本地事务的 ACID 保证
单机事务由数据库的 ACID 特性保证:
- 原子性(Atomicity):事务中的所有操作要么全部完成,要么全部回滚
- 一致性(Consistency):事务执行前后,数据库从一个一致状态转换到另一个一致状态
- 隔离性(Isolation):并发事务之间互不干扰
- 持久性(Durability):事务一旦提交,数据永久保存
这些保证由数据库的事务管理器统一实现,对应用透明。开发者只需声明事务边界(BEGIN、COMMIT、ROLLBACK),数据库负责保证 ACID。
分布式事务的困境
当事务跨越多个数据库时,ACID 保证就变得困难了。问题的根源在于:没有统一的协调者来保证所有节点同时提交或回滚。
具体来说,分布式事务面临以下挑战:
网络不可靠:消息可能丢失、延迟、乱序。当一个节点没有响应时,无法确定它是处理成功了、失败了、还是还在处理中。
节点可能故障:某个节点可能在事务执行过程中崩溃。协调者需要能够检测故障并恢复事务状态。
无全局时钟:不同节点的时钟可能不同步,无法用时间戳来确定事件的先后顺序。
锁跨节点难协调:单机事务可以通过锁来保证隔离性,但跨节点的锁协调代价高昂,容易死锁。
两阶段提交(2PC)
两阶段提交(Two-Phase Commit,2PC)是最经典的分布式事务协议,由 Jim Gray 在 1978 年提出。它的核心思想是引入一个协调者(Coordinator)来统一管理所有参与者(Participant)的事务提交。
协议流程
2PC 分为两个阶段:准备阶段(Prepare)和提交阶段(Commit)。
第一阶段:准备(Prepare)
// 事务管理器
class TwoPhaseCommit {
// 第一阶段:准备
public boolean prepare(List<Resource> resources) {
for (Resource resource : resources) {
try {
// 要求资源管理器准备好提交
if (!resource.prepare()) {
// 任意一个失败,通知所有回滚
rollbackAll(resources);
return false;
}
} catch (Exception e) {
rollbackAll(resources);
return false;
}
}
return true;
}
// 第二阶段:提交
public void commit(List<Resource> resources) {
for (Resource resource : resources) {
resource.commit();
}
}
// 回滚所有资源
private void rollbackAll(List<Resource> resources) {
for (Resource resource : resources) {
try {
resource.rollback();
} catch (Exception e) {
// 记录日志,后续补偿
log.error("回滚失败", e);
}
}
}
}
// 资源管理器(数据库)
class DatabaseResource {
// 准备阶段:锁定资源,写入预提交日志
public boolean prepare() {
try {
// 1. 获取排他锁
lock();
// 2. 写入预提交日志(保证可恢复性)
writePrepareLog();
// 3. 返回准备成功
return true;
} catch (Exception e) {
return false;
}
}
// 提交阶段:真正提交事务
public void commit() {
// 1. 写入提交日志
writeCommitLog();
// 2. 提交事务
doCommit();
// 3. 释放锁
unlock();
}
// 回滚阶段
public void rollback() {
// 1. 根据预提交日志回滚
doRollback();
// 2. 释放锁
unlock();
}
}
2PC 的问题
| 问题 | 描述 | 解决方案 |
|---|---|---|
| 同步阻塞 | prepare阶段会锁定所有资源 | 使用超时机制 |
| 单点故障 | 事务管理器故障导致阻塞 | 引入协调者备份 |
| 数据不一致 | 部分节点提交成功部分失败 | 依赖日志恢复 |
| 脑裂问题 | 网络分区导致多个协调者 | 使用Paxos选主 |
2PC 代码示例
// 简化的2PC实现
public class TwoPhaseCommitDemo {
// 事务管理器
static class TransactionManager {
private final List<Participant> participants = new ArrayList<>();
public void addParticipant(Participant participant) {
participants.add(participant);
}
public boolean commit() {
// 阶段1:准备
if (!prepare()) {
rollback();
return false;
}
// 阶段2:提交
try {
commitAll();
return true;
} catch (Exception e) {
rollback();
return false;
}
}
private boolean prepare() {
for (Participant p : participants) {
if (!p.prepare()) {
return false;
}
}
return true;
}
private void commitAll() {
for (Participant p : participants) {
p.commit();
}
}
private void rollback() {
for (Participant p : participants) {
try {
p.rollback();
} catch (Exception e) {
// 记录日志,后续补偿
}
}
}
}
// 参与者(数据库)
interface Participant {
boolean prepare();
void commit();
void rollback();
}
// 数据库参与者实现
static class DatabaseParticipant implements Participant {
private final String name;
public DatabaseParticipant(String name) {
this.name = name;
}
@Override
public boolean prepare() {
System.out.println(name + " 准备提交");
// 写入预提交日志
return true;
}
@Override
public void commit() {
System.out.println(name + " 提交");
// 写入提交日志
}
@Override
public void rollback() {
System.out.println(name + " 回滚");
// 根据日志回滚
}
}
public static void main(String[] args) {
TransactionManager tm = new TransactionManager();
tm.addParticipant(new DatabaseParticipant("数据库1"));
tm.addParticipant(new DatabaseParticipant("数据库2"));
boolean result = tm.commit();
System.out.println("事务结果: " + (result ? "成功" : "失败"));
}
}
三阶段提交(3PC)
三阶段提交是对2PC的改进,增加了超时机制和预提交阶段。
协议流程
3PC 的改进
| 改进点 | 2PC | 3PC |
|---|---|---|
| 超时机制 | 无 | 有 |
| 阻塞优化 | prepare锁定资源 | canCommit不锁定 |
| 数据一致性 | 可能数据不一致 | 更好的数据一致性 |
TCC(Try-Confirm-Cancel)
TCC是一种补偿式的事务方案,不依赖数据库的分布式事务支持。
原理
TCC 示例:转账
// 转账服务 - TCC实现
class TransferService {
@Transactional
public void transfer(String fromAccount, String toAccount, BigDecimal amount) {
// 1. Try阶段:预留资源
try {
accountService.tryLock(fromAccount, amount); // 锁定转出账户
accountService.tryReserve(toAccount, amount); // 预留转入账户
inventoryService.tryFreeze(fromAccount, amount); // 冻结库存(如需要)
} catch (Exception e) {
// 任意Try失败,进入Cancel阶段
cancel(fromAccount, toAccount, amount);
throw e;
}
// 2. Confirm阶段:确认使用资源
try {
accountService.confirmDeduct(fromAccount, amount);
accountService.confirmAdd(toAccount, amount);
inventoryService.confirmDeduct(fromAccount, amount);
} catch (Exception e) {
// Confirm失败,记录并人工处理
log.error("Confirm阶段失败,需要人工处理", e);
alertSupportTeam(fromAccount, toAccount, amount);
}
}
// Cancel阶段:回滚资源
private void cancel(String fromAccount, String toAccount, BigDecimal amount) {
try {
accountService.cancelLock(fromAccount, amount);
accountService.cancelReserve(toAccount, amount);
inventoryService.cancelFreeze(fromAccount, amount);
} catch (Exception e) {
log.error("Cancel阶段失败,需要人工处理", e);
alertSupportTeam(fromAccount, toAccount, amount);
}
}
}
// 账户服务接口
interface AccountService {
// Try: 尝试锁定账户
void tryLock(String accountId, BigDecimal amount);
// Confirm: 确认扣款
void confirmDeduct(String accountId, BigDecimal amount);
// Cancel: 取消锁定
void cancelLock(String accountId, BigDecimal amount);
// Try: 预留账户
void tryReserve(String accountId, BigDecimal amount);
// Confirm: 确认增加
void confirmAdd(String accountId, BigDecimal amount);
// Cancel: 取消预留
void cancelReserve(String accountId, BigDecimal amount);
}
TCC 实现细节
Seata TCC 框架集成
Seata 是目前最流行的分布式事务框架之一,由阿里开源。它提供了完整的 TCC 模式支持,能够解决幂等性、空回滚、悬挂等问题。
Maven 依赖:
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
TCC 接口定义:
// TCC 资源接口 - 使用 Seata 注解
public interface InventoryTccAction {
/**
* Try 阶段:冻结库存
* @TwoPhaseBusinessAction 注解标识这是一个 TCC 资源
* - name: 全局唯一的 TCC 资源名称
* - commitMethod: Confirm 阶段执行的方法
* - rollbackMethod: Cancel 阶段执行的方法
*/
@TwoPhaseBusinessAction(
name = "inventoryTccAction",
commitMethod = "commit",
rollbackMethod = "rollback"
)
boolean prepare(
BusinessActionContext actionContext,
@BusinessActionContextParameter(paramName = "productId") String productId,
@BusinessActionContextParameter(paramName = "count") int count
);
/**
* Confirm 阶段:确认扣减库存
*/
boolean commit(BusinessActionContext actionContext);
/**
* Cancel 阶段:释放冻结的库存
*/
boolean rollback(BusinessActionContext actionContext);
}
// TCC 接口实现
@Service
public class InventoryTccActionImpl implements InventoryTccAction {
@Autowired
private InventoryMapper inventoryMapper;
@Autowired
private FreezeInventoryMapper freezeMapper;
@Override
@Transactional
public boolean prepare(BusinessActionContext actionContext,
String productId, int count) {
// 1. 检查库存是否充足
Inventory inventory = inventoryMapper.selectByProductId(productId);
if (inventory == null || inventory.getAvailable() < count) {
throw new RuntimeException("库存不足");
}
// 2. 冻结库存(写入冻结记录表)
FreezeInventory freeze = new FreezeInventory();
freeze.setXid(actionContext.getXid()); // 全局事务ID
freeze.setProductId(productId);
freeze.setCount(count);
freeze.setStatus(FreezeStatus.FREEZED);
freezeMapper.insert(freeze);
// 3. 扣减可用库存
inventory.setAvailable(inventory.getAvailable() - count);
inventoryMapper.updateById(inventory);
return true;
}
@Override
@Transactional
public boolean commit(BusinessActionContext actionContext) {
// 从上下文获取参数
String productId = actionContext.getActionContext("productId", String.class);
int count = actionContext.getActionContext("count", Integer.class);
// 1. 删除冻结记录(已提交)
freezeMapper.deleteByXid(actionContext.getXid());
// 2. 更新总库存(可选,取决于业务逻辑)
// inventoryMapper.decreaseTotal(productId, count);
return true;
}
@Override
@Transactional
public boolean rollback(BusinessActionContext actionContext) {
String productId = actionContext.getActionContext("productId", String.class);
int count = actionContext.getActionContext("count", Integer.class);
// 1. 查询冻结记录
FreezeInventory freeze = freezeMapper.selectByXid(actionContext.getXid());
// 处理空回滚:如果没有冻结记录,说明 Try 未执行,直接返回
if (freeze == null) {
return true;
}
// 2. 恢复可用库存
Inventory inventory = inventoryMapper.selectByProductId(productId);
inventory.setAvailable(inventory.getAvailable() + count);
inventoryMapper.updateById(inventory);
// 3. 删除冻结记录
freezeMapper.deleteByXid(actionContext.getXid());
return true;
}
}
业务服务调用:
@Service
public class OrderService {
@Autowired
private InventoryTccAction inventoryTccAction;
@Autowired
private AccountTccAction accountTccAction;
/**
* 创建订单 - 使用 @GlobalTransactional 开启全局事务
*/
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public Order createOrder(OrderDTO orderDTO) {
// 1. Try 阶段:冻结库存
inventoryTccAction.prepare(null, orderDTO.getProductId(), orderDTO.getCount());
// 2. Try 阶段:锁定账户余额
accountTccAction.prepare(null, orderDTO.getUserId(), orderDTO.getAmount());
// 3. 创建订单记录
Order order = new Order();
order.setProductId(orderDTO.getProductId());
order.setUserId(orderDTO.getUserId());
order.setAmount(orderDTO.getAmount());
order.setStatus(OrderStatus.PENDING);
orderMapper.insert(order);
// 如果以上操作都成功,Seata 会自动调用各 TCC 资源的 commit 方法
// 如果任意操作失败,Seata 会自动调用各 TCC 资源的 rollback 方法
return order;
}
}
Seata 配置(application.yml):
seata:
enabled: true
application-id: order-service
tx-service-group: my_tx_group
service:
vgroup-mapping:
my_tx_group: default
grouplist:
default: 127.0.0.1:8091
config:
type: nacos
registry:
type: nacos
手写简化版 TCC 框架
为了更好地理解 TCC 原理,这里展示一个简化版的 TCC 框架实现:
// TCC 事务上下文
public class TccContext {
private String xid; // 全局事务ID
private String branchId; // 分支事务ID
private TransactionStatus status; // 事务状态
private Map<String, Object> args; // 业务参数
private long createTime; // 创建时间
}
// TCC 事务管理器
@Component
public class TccTransactionManager {
@Autowired
private TccTransactionRepository repository;
// 开启全局事务
public String begin() {
String xid = generateXid();
TccTransaction transaction = new TccTransaction();
transaction.setXid(xid);
transaction.setStatus(TransactionStatus.TRYING);
transaction.setCreateTime(System.currentTimeMillis());
repository.save(transaction);
return xid;
}
// 注册分支事务
public String registerBranch(String xid, String resourceName) {
String branchId = generateBranchId();
TccBranch branch = new TccBranch();
branch.setXid(xid);
branch.setBranchId(branchId);
branch.setResourceName(resourceName);
branch.setStatus(BranchStatus.REGISTERED);
repository.saveBranch(branch);
return branchId;
}
// 提交全局事务
public void commit(String xid) {
TccTransaction transaction = repository.findByXid(xid);
transaction.setStatus(TransactionStatus.CONFIRMING);
repository.update(transaction);
// 调用所有分支的 Confirm 方法
List<TccBranch> branches = repository.findBranchesByXid(xid);
for (TccBranch branch : branches) {
TccResource resource = getResource(branch.getResourceName());
resource.confirm(buildContext(xid, branch));
branch.setStatus(BranchStatus.CONFIRMED);
repository.updateBranch(branch);
}
transaction.setStatus(TransactionStatus.COMMITTED);
repository.update(transaction);
}
// 回滚全局事务
public void rollback(String xid) {
TccTransaction transaction = repository.findByXid(xid);
transaction.setStatus(TransactionStatus.CANCELLING);
repository.update(transaction);
// 调用所有分支的 Cancel 方法
List<TccBranch> branches = repository.findBranchesByXid(xid);
for (TccBranch branch : branches) {
try {
TccResource resource = getResource(branch.getResourceName());
resource.cancel(buildContext(xid, branch));
} catch (Exception e) {
// 记录失败,后续重试
log.error("Cancel 分支事务失败: {}", branch.getBranchId(), e);
}
branch.setStatus(BranchStatus.CANCELLED);
repository.updateBranch(branch);
}
transaction.setStatus(TransactionStatus.CANCELLED);
repository.update(transaction);
}
}
TCC 的三大问题与解决方案
TCC 模式在实际应用中需要解决三个核心问题:空回滚、悬挂、幂等性。这些问题如果处理不当,会导致数据不一致。
| 问题 | 描述 | 根本原因 | 解决方案 |
|---|---|---|---|
| 空回滚 | Try 未执行但 Cancel 执行了 | Try 超时后触发回滚 | 记录事务状态,Cancel 检查 Try 是否执行 |
| 悬挂 | Cancel 比 Try 先执行 | 网络延迟导致请求乱序 | Try 执行前检查是否已 Cancel |
| 幂等性 | 重复调用导致数据错误 | 网络重试机制 | 使用唯一事务ID,记录执行状态 |
空回滚问题
场景:Try 阶段由于网络超时,事务协调器认为 Try 失败,触发 Cancel。但实际上 Try 可能只是网络延迟,并未真正执行。
解决方案:在 Cancel 阶段检查 Try 是否真正执行过。
@Service
public class InventoryTccActionImpl implements InventoryTccAction {
// 解决空回滚:Cancel 时检查 Try 是否执行
@Override
@Transactional
public boolean rollback(BusinessActionContext actionContext) {
String xid = actionContext.getXid();
// 1. 查询冻结记录(Try 阶段写入)
FreezeInventory freeze = freezeMapper.selectByXid(xid);
// 关键:如果没有冻结记录,说明 Try 未执行,这是空回滚
if (freeze == null) {
log.warn("空回滚检测: xid={}, Try 阶段未执行", xid);
// 插入一条空回滚记录,防止后续 Try 执行(悬挂问题)
FreezeInventory emptyRollback = new FreezeInventory();
emptyRollback.setXid(xid);
emptyRollback.setStatus(FreezeStatus.EMPTY_ROLLBACK);
freezeMapper.insert(emptyRollback);
return true; // 直接返回成功
}
// 2. 检查是否已经回滚过(幂等性)
if (freeze.getStatus() == FreezeStatus.ROLLBACKED) {
return true;
}
// 3. 正常回滚逻辑
String productId = freeze.getProductId();
int count = freeze.getCount();
Inventory inventory = inventoryMapper.selectByProductId(productId);
inventory.setAvailable(inventory.getAvailable() + count);
inventoryMapper.updateById(inventory);
freeze.setStatus(FreezeStatus.ROLLBACKED);
freezeMapper.updateById(freeze);
return true;
}
}
悬挂问题
场景:Try 请求因为网络拥堵延迟到达,Cancel 已经先执行完毕。此时 Try 再执行会冻结资源,但没有人会来解冻。
解决方案:Try 执行前检查是否已经 Cancel。
@Service
public class InventoryTccActionImpl implements InventoryTccAction {
// 解决悬挂:Try 执行前检查是否已 Cancel
@Override
@Transactional
public boolean prepare(BusinessActionContext actionContext,
String productId, int count) {
String xid = actionContext.getXid();
// 关键:检查是否已经存在回滚记录(空回滚时插入的)
FreezeInventory existingFreeze = freezeMapper.selectByXid(xid);
if (existingFreeze != null) {
// 已存在记录,可能是:
// 1. 空回滚记录 -> 不应该执行 Try
// 2. 已有的冻结记录 -> 幂等性处理
log.warn("悬挂检测: xid={}, 已存在事务记录", xid);
return false; // 拒绝执行 Try
}
// 正常的 Try 执行逻辑...
Inventory inventory = inventoryMapper.selectByProductId(productId);
if (inventory == null || inventory.getAvailable() < count) {
throw new RuntimeException("库存不足");
}
FreezeInventory freeze = new FreezeInventory();
freeze.setXid(xid);
freeze.setProductId(productId);
freeze.setCount(count);
freeze.setStatus(FreezeStatus.FREEZED);
freezeMapper.insert(freeze);
inventory.setAvailable(inventory.getAvailable() - count);
inventoryMapper.updateById(inventory);
return true;
}
}
幂等性问题
场景:Confirm 或 Cancel 由于网络问题重试执行,导致重复操作数据。
解决方案:记录事务执行状态,每次执行前检查。
@Service
public class InventoryTccActionImpl implements InventoryTccAction {
@Override
@Transactional
public boolean commit(BusinessActionContext actionContext) {
String xid = actionContext.getXid();
// 1. 查询冻结记录
FreezeInventory freeze = freezeMapper.selectByXid(xid);
// 幂等性检查:如果记录不存在或已提交,直接返回成功
if (freeze == null || freeze.getStatus() == FreezeStatus.COMMITTED) {
log.info("幂等性检测: xid={}, 已提交或无记录", xid);
return true;
}
// 幂等性检查:如果状态不对(如已回滚),报错
if (freeze.getStatus() == FreezeStatus.ROLLBACKED) {
throw new IllegalStateException("事务已回滚,无法提交");
}
// 2. 执行提交逻辑
freeze.setStatus(FreezeStatus.COMMITTED);
freezeMapper.updateById(freeze);
// 可选:删除冻结记录(或保留用于审计)
// freezeMapper.deleteById(freeze.getId());
return true;
}
@Override
@Transactional
public boolean rollback(BusinessActionContext actionContext) {
String xid = actionContext.getXid();
FreezeInventory freeze = freezeMapper.selectByXid(xid);
// 幂等性检查:如果已回滚,直接返回
if (freeze != null && freeze.getStatus() == FreezeStatus.ROLLBACKED) {
log.info("幂等性检测: xid={}, 已回滚", xid);
return true;
}
// ... 其余回滚逻辑
return true;
}
}
冻结记录表设计:
CREATE TABLE freeze_inventory (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
xid VARCHAR(128) NOT NULL COMMENT '全局事务ID',
branch_id VARCHAR(128) COMMENT '分支事务ID',
product_id VARCHAR(64) NOT NULL COMMENT '商品ID',
count INT NOT NULL COMMENT '冻结数量',
status TINYINT NOT NULL COMMENT '状态: 1-冻结 2-已提交 3-已回滚 4-空回滚',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_xid (xid),
KEY idx_product (product_id)
) COMMENT='库存冻结记录表';
总结:TCC 三大问题的统一解决方案
// 统一的事务状态管理
public enum FreezeStatus {
FREEZED(1, "冻结中"), // Try 成功
COMMITTED(2, "已提交"), // Confirm 成功
ROLLBACKED(3, "已回滚"), // Cancel 成功
EMPTY_ROLLBACK(4, "空回滚"); // Try 未执行但 Cancel 触发
}
// 完整的 TCC 实现模板
@Service
public class TemplateTccAction {
// Try: 资源预留
public boolean tryPhase(String xid, Object... args) {
// 1. 检查是否已存在记录(防止悬挂)
if (exists(xid)) {
return false;
}
// 2. 业务检查和资源预留
// 3. 写入状态为 FREEZED 的记录
return true;
}
// Confirm: 确认使用
public boolean confirmPhase(String xid) {
// 1. 查询记录
Record record = findByXid(xid);
// 2. 幂等检查
if (record == null || record.isCommitted()) {
return true;
}
// 3. 状态检查
if (!record.isFreezed()) {
throw new IllegalStateException();
}
// 4. 执行确认逻辑,更新状态
return true;
}
// Cancel: 回滚
public boolean cancelPhase(String xid) {
// 1. 查询记录
Record record = findByXid(xid);
// 2. 空回滚处理
if (record == null) {
insertEmptyRollbackRecord(xid);
return true;
}
// 3. 幂等检查
if (record.isRollbacked()) {
return true;
}
// 4. 执行回滚逻辑,更新状态
return true;
}
}
## Saga 模式
Saga模式将长事务拆分为多个本地事务,每个本地事务都有对应的补偿操作。
### 原理
```mermaid
sequenceDiagram
participant S as Saga协调器
participant S1 as 服务1
participant S2 as 服务2
participant S3 as 服务3
S->>S1: T1: 创建订单
S1-->>S: 成功
S->>S2: T2: 冻结库存
S2-->>S: 成功
S->>S3: T3: 扣款
S3-->>S: 失败!
S->>S3: C3: 退款
S3-->>S: 成功
S->>S2: C2: 解冻库存
S2-->>S: 成功
S->>S1: C1: 取消订单
S1-->>S: 成功
Saga 实现
// Saga协调器
class SagaOrchestrator {
private final Map<String, SagaChapter> chapters = new HashMap<>();
// 定义Saga编排
public void defineSaga(String sagaName, List<Chapter> chapterList) {
chapters.put(sagaName, new SagaChapter(sagaName, chapterList));
}
// 执行Saga
public void execute(String sagaName, Map<String, Object> input) {
SagaChapter saga = chapters.get(sagaName);
List<Chapter> chapters = saga.getChapters();
List<Compensation> compensations = new ArrayList<>();
try {
// 顺序执行所有章节
for (Chapter chapter : chapters) {
chapter.execute(input);
// 保存补偿操作
compensations.add(chapter.getCompensation());
}
} catch (Exception e) {
// 执行补偿(反向顺序)
for (int i = compensations.size() - 1; i >= 0; i--) {
try {
compensations.get(i).execute(input);
} catch (Exception ex) {
// 记录补偿失败,人工处理
log.error("补偿失败", ex);
alert(sagaName, input, compensations.subList(0, i + 1));
}
}
}
}
}
// Saga章节
class Chapter {
private final String name;
private final Executor executor;
private final Executor compensation;
public void execute(Map<String, Object> input) {
executor.execute(input);
}
public Executor getCompensation() {
return compensation;
}
}
Saga 示例:订单处理
// 订单Saga
class OrderSaga {
public void processOrder(Order order) {
Map<String, Object> context = new HashMap<>();
context.put("order", order);
SagaOrchestrator orchestrator = new SagaOrchestrator();
// 定义Saga编排
orchestrator.defineSaga("order", Arrays.asList(
// T1: 创建订单
new Chapter(
"createOrder",
(ctx) -> orderService.create((Order) ctx.get("order")),
(ctx) -> orderService.cancel(((Order) ctx.get("order")).getId())
),
// T2: 冻结库存
new Chapter(
"freezeInventory",
(ctx) -> inventoryService.freeze(order.getItems()),
(ctx) -> inventoryService.unfreeze(order.getItems())
),
// T3: 扣减余额
new Chapter(
"deductBalance",
(ctx) -> accountService.deduct(order.getUserId(), order.getTotalAmount()),
(ctx) -> accountService.refund(order.getUserId(), order.getTotalAmount())
),
// T4: 通知
new Chapter(
"sendNotification",
(ctx) -> notificationService.send(order.getUserId(), "订单已创建"),
(ctx) -> {} // 通知不需要补偿
)
));
// 执行Saga
orchestrator.execute("order", context);
}
}
可靠消息最终一致性
基于消息队列的分布式事务方案。
原理
实现方案
// 可靠消息服务
class ReliableMessageService {
// 发送消息(带本地事务)
public void sendMessageInTransaction(TransactionCallback callback) {
// 1. 开启本地事务
Transaction tx = transactionManager.begin();
try {
// 2. 执行本地业务
callback.execute();
// 3. 发送消息(仅发送到本地消息表)
saveMessageToLocal(tx, createMessage());
// 4. 提交本地事务
tx.commit();
} catch (Exception e) {
// 5. 回滚本地事务
tx.rollback();
throw e;
}
}
// 消息投递(定时扫描)
public void dispatchPendingMessages() {
List<Message> pendingMessages = messageRepository.findPending();
for (Message message : pendingMessages) {
try {
// 发送到消息队列
mqClient.send(message.getTopic(), message.getContent());
// 更新消息状态
message.setStatus(MessageStatus.SENT);
messageRepository.update(message);
} catch (Exception e) {
// 更新重试次数
message.incrementRetryCount();
if (message.getRetryCount() > MAX_RETRY) {
// 超过最大重试,记录并告警
alert(message);
}
}
}
}
}
RocketMQ 事务消息
// RocketMQ 事务消息
class RocketMQTransactionDemo {
public static void main(String[] args) {
TransactionMQProducer producer = new TransactionMQProducer("producer-group");
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 1. 执行本地业务
orderService.createOrder(msg);
// 2. 提交本地事务
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 3. 回滚本地事务
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 回查本地事务状态
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务是否成功
if (orderService.isOrderCreated(msg.getKeys())) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
// 发送事务消息
Message message = new Message("order-topic", "order", "order123", "content".getBytes());
producer.sendMessageInTransaction(message, null);
}
}
方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 2PC | 强一致性 | 同步阻塞、单点故障 | 对一致性要求高的金融场景 |
| 3PC | 改善阻塞 | 复杂、实现困难 | 很少使用 |
| TCC | 性能较高 | 实现复杂、需要改业务 | 业务可控制的场景 |
| Saga | 简单、易实现 | 无隔离性 | 长流程业务 |
| 可靠消息 | 低耦合 | 最终一致性 | 跨服务调用 |
小结
本章我们深入学习了分布式事务的各种方案:
-
2PC(两阶段提交)
- Prepare和Commit两个阶段
- 优点:保证强一致性
- 缺点:同步阻塞、单点故障
-
3PC(三阶段提交)
- 增加了CanCommit和超时机制
- 改善了阻塞问题
-
TCC(Try-Confirm-Cancel)
- 补偿式事务方案
- 需要业务代码实现Try/Confirm/Cancel
- 需处理空回滚、悬挂、幂等性问题
-
Saga模式
- 将长事务拆分为本地事务
- 正向执行,反向补偿
- 适用于长流程业务
-
可靠消息最终一致性
- 基于消息队列
- 最终一致性,低耦合
- 适用于跨服务调用
选择合适的分布式事务方案需要根据业务场景权衡一致性和性能。