跳到主要内容

事务

MongoDB 从 4.0 版本开始支持多文档 ACID 事务,允许在多个文档、集合甚至数据库之间执行原子操作。

事务基础

什么是事务?

事务是一组数据库操作,这些操作要么全部成功执行,要么全部不执行。事务保证了数据的完整性和一致性。

为什么需要事务?

在 MongoDB 中,单文档操作本身就是原子的。但是,当需要跨多个文档或集合进行操作时,就需要事务来保证原子性:

// 场景:银行转账
// 需要同时完成两个操作:
// 1. 从账户 A 扣款
// 2. 向账户 B 存款
// 如果任一步骤失败,整个操作应该回滚

ACID 特性

特性说明MongoDB 实现
Atomicity(原子性)事务中的操作要么全部成功,要么全部失败全部提交或全部回滚
Consistency(一致性)事务前后数据保持一致状态通过约束和规则保证
Isolation(隔离性)并发事务互不干扰支持快照隔离
Durability(持久性)提交后数据永久保存通过写关注保证

事务版本支持

版本事务支持范围
MongoDB 4.0副本集上的多文档事务
MongoDB 4.2+副本集和分片集群上的分布式事务

使用事务

基本语法

在 MongoDB Shell 中使用事务:

// 1. 启动会话
const session = db.getMongo().startSession();

// 2. 开始事务
session.startTransaction({
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' }
});

try {
// 3. 执行操作(必须传入 session)
const accounts = session.getDatabase('bank').accounts;

// 从账户 A 扣款 100
accounts.updateOne(
{ _id: 'A' },
{ $inc: { balance: -100 } }
);

// 向账户 B 存款 100
accounts.updateOne(
{ _id: 'B' },
{ $inc: { balance: 100 } }
);

// 4. 提交事务
session.commitTransaction();
print('转账成功');
} catch (error) {
// 5. 发生错误时回滚
session.abortTransaction();
print('转账失败:' + error);
} finally {
// 6. 结束会话
session.endSession();
}

使用 withTransaction 方法(推荐)

MongoDB 提供了 withTransaction 方法,自动处理提交和重试逻辑:

const session = db.getMongo().startSession();

try {
// withTransaction 自动提交或回滚
session.withTransaction(() => {
const accounts = session.getDatabase('bank').accounts;

// 操作 1:扣款
accounts.updateOne(
{ _id: 'A', balance: { $gte: 100 } },
{ $inc: { balance: -100 } }
);

// 操作 2:存款
accounts.updateOne(
{ _id: 'B' },
{ $inc: { balance: 100 } }
);
});
} finally {
session.endSession();
}

为什么推荐 withTransaction?

  • 自动重试:遇到临时错误时自动重试事务
  • 简化代码:无需手动处理提交和回滚
  • 错误处理:内置错误处理逻辑

跨集合事务

事务可以跨越多个集合:

const session = db.getMongo().startSession();

session.withTransaction(() => {
const db = session.getDatabase('ecommerce');

// 1. 更新库存
db.products.updateOne(
{ _id: productId, stock: { $gte: quantity } },
{ $inc: { stock: -quantity } }
);

// 2. 创建订单
db.orders.insertOne({
userId: userId,
productId: productId,
quantity: quantity,
status: 'created',
createdAt: new Date()
});

// 3. 记录日志
db.auditLogs.insertOne({
action: 'order_created',
userId: userId,
timestamp: new Date()
});
});

session.endSession();

跨数据库事务

事务甚至可以跨越不同的数据库:

const session = db.getMongo().startSession();

session.withTransaction(() => {
// 数据库 1:用户数据
const userDb = session.getDatabase('users');
userDb.profiles.updateOne(
{ _id: userId },
{ $set: { status: 'verified' } }
);

// 数据库 2:日志数据
const logDb = session.getDatabase('logs');
logDb.events.insertOne({
type: 'verification',
userId: userId,
timestamp: new Date()
});
});

session.endSession();

事务选项

读关注(Read Concern)

控制事务读取数据的一致性级别:

session.startTransaction({
readConcern: { level: 'snapshot' } // 快照隔离
});
级别说明
local读取本地最新数据,可能读到未提交的数据
available与 local 类似,适用于分片集群
majority只读取已提交到大多数节点的数据
snapshot读取事务开始时的快照数据(推荐)

写关注(Write Concern)

控制事务提交的持久性保证:

session.startTransaction({
writeConcern: { w: 'majority', j: true }
});
选项说明
w: 1确认主节点写入
w: 'majority'确认大多数节点写入(推荐)
j: true确认写入日志

读偏好(Read Preference)

控制事务从哪个节点读取数据:

session.startTransaction({
readPreference: 'primary' // 从主节点读取
});

Node.js 驱动中使用事务

const { MongoClient } = require('mongodb');

async function transferFunds(client, fromId, toId, amount) {
// 启动会话
const session = client.startSession();

try {
// 使用 withTransaction
await session.withTransaction(async () => {
const db = client.db('bank');
const accounts = db.collection('accounts');

// 检查余额并扣款
const fromAccount = await accounts.findOne(
{ _id: fromId },
{ session }
);

if (fromAccount.balance < amount) {
throw new Error('余额不足');
}

// 执行转账
await accounts.updateOne(
{ _id: fromId },
{ $inc: { balance: -amount } },
{ session }
);

await accounts.updateOne(
{ _id: toId },
{ $inc: { balance: amount } },
{ session }
);
}, {
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' }
});

console.log('转账成功');
} catch (error) {
console.error('转账失败:', error.message);
} finally {
await session.endSession();
}
}

// 使用示例
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
await transferFunds(client, 'A', 'B', 100);
await client.close();

事务限制

时间限制

// 设置事务超时时间(毫秒)
session.startTransaction({
maxTimeMS: 60000 // 60 秒
});

默认情况下:

  • 事务必须在 60 秒内完成
  • 单个操作超时由 maxTimeMS 控制

大小限制

  • 单个事务的操作总大小不能超过 16MB
  • 单个事务修改的文档数量有限制

不支持的操作

以下操作不能在事务中执行:

// 不支持的操作示例
session.withTransaction(() => {
// ❌ 创建集合(某些情况)
db.createCollection('newCollection');

// ❌ 创建索引
db.collection.createIndex({ field: 1 });

// ❌ 删除数据库
db.dropDatabase();

// ❌ 执行 eval
db.eval('function() { return 1; }');
});

事务最佳实践

1. 保持事务简短

// ❌ 不好:事务中包含耗时操作
session.withTransaction(async () => {
await collection.updateOne({...});
await externalApi.call(); // 外部 API 调用
await collection.updateOne({...});
});

// ✅ 好:事务只包含必要的数据库操作
const apiResult = await externalApi.call();
session.withTransaction(async () => {
await collection.updateOne({...});
await collection.updateOne({...});
});

为什么? 事务持有锁的时间越长,对并发性能的影响越大。

2. 使用适当的写关注

// ✅ 生产环境推荐
session.startTransaction({
writeConcern: { w: 'majority', j: true }
});

// 可以接受少量数据丢失的场景
session.startTransaction({
writeConcern: { w: 1 }
});

3. 正确处理错误

const session = client.startSession();

try {
await session.withTransaction(async () => {
// 业务逻辑
});
} catch (error) {
if (error.hasErrorLabel('TransientTransactionError')) {
// 临时错误,可以重试
console.log('临时错误,稍后重试');
} else if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
// 提交结果未知
console.log('提交结果未知,需要检查数据一致性');
} else {
// 业务错误
console.error('业务错误:', error.message);
}
} finally {
await session.endSession();
}

4. 避免热点数据

// ❌ 不好:频繁更新同一文档
for (let i = 0; i < 100; i++) {
session.withTransaction(async () => {
await counters.updateOne(
{ _id: 'global' },
{ $inc: { value: 1 } }
);
});
}

// ✅ 好:使用批量操作或优化数据模型
session.withTransaction(async () => {
await counters.updateOne(
{ _id: 'global' },
{ $inc: { value: 100 } }
);
});

5. 优先考虑单文档原子性

// 如果可以使用嵌入式文档解决,就不需要事务
// ❌ 需要事务
// orders 集合和 inventory 集合分开存储

// ✅ 不需要事务(嵌入式设计)
{
_id: 'order_1',
items: [
{ productId: 'p1', quantity: 2, price: 100 }
],
status: 'pending',
createdAt: ISODate('...')
}

事务监控

查看当前事务

// 查看当前活跃的事务
db.currentOp({ 'transaction': { $exists: true } })

// 查看事务详情
db.currentOp({ 'transaction.id': 123 })

监控事务锁

// 查看锁等待
db.aggregate([
{ $currentOp: { allUsers: true, idleCursors: true } },
{ $match: { 'transaction': { $exists: true } } }
])

事务统计

// 查看服务器状态中的事务统计
db.serverStatus().transactions

小结

本章我们学习了:

  1. 事务基础:ACID 特性和事务的作用
  2. 使用方法:基本语法和 withTransaction
  3. 事务选项:读关注、写关注、读偏好
  4. 代码示例:Shell 和 Node.js 驱动
  5. 限制和最佳实践:保持简短、正确处理错误
  6. 监控方法:查看活跃事务和锁状态

事务是 MongoDB 的重要特性,但应该谨慎使用。在大多数情况下,通过合理的数据建模(嵌入式文档)可以避免使用事务。