事务
MongoDB 从 4.0 版本开始支持多文档 ACID 事务,允许在多个文档、集合甚至数据库之间执行原子操作。
事务基础
什么是事务?
事务是一组数据库操作,这些操作要么全部成功执行,要么全部不执行。事务保证了数据的完整性和一致性。
为什么需要事务?
在 MongoDB 中,单文档操作本身就是原子的。但是,当需要跨多个文档或集合进行操作时,就需要事务来保证原子性:
// 场景:银行转账
// 需要同时完成两个操作:
// 1. 从账户 A 扣款
// 2. 向账户 B 存款
// 如果任一步骤失败,整个操作应该回滚
ACID 特性
| 特性 | 说明 | MongoDB 实现 |
|---|---|---|
| Atomicity(原子性) | 事务中的操作要么全部成功,要么全部失败 | 全部提交或全部回滚 |
| Consistency(一致性) | 事务前后数据保持一致状态 | 通过约束和规则保证 |
| Isolation(隔离性) | 并发事务互不干扰 | 支持快照隔离 |
| Durability(持久性) | 提交后数据永久保存 | 通过写关注保证 |
事务版本支持
| 版本 | 事务支持范围 |
|---|---|
| MongoDB 4.0 | 副本集上的多文档事务 |
| MongoDB 4.2+ | 副本集和分片集群上的分布式事务 |
使用事务
基本语法
在 MongoDB Shell 中使用事务:
// 1. 启动会话
const session = db.getMongo().startSession();
// 2. 开始事务
session.startTransaction({
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' }
});
try {
// 3. 执行操作(必须传入 session)
const accounts = session.getDatabase('bank').accounts;
// 从账户 A 扣款 100
accounts.updateOne(
{ _id: 'A' },
{ $inc: { balance: -100 } }
);
// 向账户 B 存款 100
accounts.updateOne(
{ _id: 'B' },
{ $inc: { balance: 100 } }
);
// 4. 提交事务
session.commitTransaction();
print('转账成功');
} catch (error) {
// 5. 发生错误时回滚
session.abortTransaction();
print('转账失败:' + error);
} finally {
// 6. 结束会话
session.endSession();
}
使用 withTransaction 方法(推荐)
MongoDB 提供了 withTransaction 方法,自动处理提交和重试逻辑:
const session = db.getMongo().startSession();
try {
// withTransaction 自动提交或回滚
session.withTransaction(() => {
const accounts = session.getDatabase('bank').accounts;
// 操作 1:扣款
accounts.updateOne(
{ _id: 'A', balance: { $gte: 100 } },
{ $inc: { balance: -100 } }
);
// 操作 2:存款
accounts.updateOne(
{ _id: 'B' },
{ $inc: { balance: 100 } }
);
});
} finally {
session.endSession();
}
为什么推荐 withTransaction?
- 自动重试:遇到临时错误时自动重试事务
- 简化代码:无需手动处理提交和回滚
- 错误处理:内置错误处理逻辑
跨集合事务
事务可以跨越多个集合:
const session = db.getMongo().startSession();
session.withTransaction(() => {
const db = session.getDatabase('ecommerce');
// 1. 更新库存
db.products.updateOne(
{ _id: productId, stock: { $gte: quantity } },
{ $inc: { stock: -quantity } }
);
// 2. 创建订单
db.orders.insertOne({
userId: userId,
productId: productId,
quantity: quantity,
status: 'created',
createdAt: new Date()
});
// 3. 记录日志
db.auditLogs.insertOne({
action: 'order_created',
userId: userId,
timestamp: new Date()
});
});
session.endSession();
跨数据库事务
事务甚至可以跨越不同的数据库:
const session = db.getMongo().startSession();
session.withTransaction(() => {
// 数据库 1:用户数据
const userDb = session.getDatabase('users');
userDb.profiles.updateOne(
{ _id: userId },
{ $set: { status: 'verified' } }
);
// 数据库 2:日志数据
const logDb = session.getDatabase('logs');
logDb.events.insertOne({
type: 'verification',
userId: userId,
timestamp: new Date()
});
});
session.endSession();
事务选项
读关注(Read Concern)
控制事务读取数据的一致性级别:
session.startTransaction({
readConcern: { level: 'snapshot' } // 快照隔离
});
| 级别 | 说明 |
|---|---|
local | 读取本地最新数据,可能读到未提交的数据 |
available | 与 local 类似,适用于分片集群 |
majority | 只读取已提交到大多数节点的数据 |
snapshot | 读取事务开始时的快照数据(推荐) |
写关注(Write Concern)
控制事务提交的持久性保证:
session.startTransaction({
writeConcern: { w: 'majority', j: true }
});
| 选项 | 说明 |
|---|---|
w: 1 | 确认主节点写入 |
w: 'majority' | 确认大多数节点写入(推荐) |
j: true | 确认写入日志 |
读偏好(Read Preference)
控制事务从哪个节点读取数据:
session.startTransaction({
readPreference: 'primary' // 从主节点读取
});
Node.js 驱动中使用事务
const { MongoClient } = require('mongodb');
async function transferFunds(client, fromId, toId, amount) {
// 启动会话
const session = client.startSession();
try {
// 使用 withTransaction
await session.withTransaction(async () => {
const db = client.db('bank');
const accounts = db.collection('accounts');
// 检查余额并扣款
const fromAccount = await accounts.findOne(
{ _id: fromId },
{ session }
);
if (fromAccount.balance < amount) {
throw new Error('余额不足');
}
// 执行转账
await accounts.updateOne(
{ _id: fromId },
{ $inc: { balance: -amount } },
{ session }
);
await accounts.updateOne(
{ _id: toId },
{ $inc: { balance: amount } },
{ session }
);
}, {
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' }
});
console.log('转账成功');
} catch (error) {
console.error('转账失败:', error.message);
} finally {
await session.endSession();
}
}
// 使用示例
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
await transferFunds(client, 'A', 'B', 100);
await client.close();
事务限制
时间限制
// 设置事务超时时间(毫秒)
session.startTransaction({
maxTimeMS: 60000 // 60 秒
});
默认情况下:
- 事务必须在 60 秒内完成
- 单个操作超时由
maxTimeMS控制
大小限制
- 单个事务的操作总大小不能超过 16MB
- 单个事务修改的文档数量有限制
不支持的操作
以下操作不能在事务中执行:
// 不支持的操作示例
session.withTransaction(() => {
// ❌ 创建集合(某些情况)
db.createCollection('newCollection');
// ❌ 创建索引
db.collection.createIndex({ field: 1 });
// ❌ 删除数据库
db.dropDatabase();
// ❌ 执行 eval
db.eval('function() { return 1; }');
});
事务最佳实践
1. 保持事务简短
// ❌ 不好:事务中包含耗时操作
session.withTransaction(async () => {
await collection.updateOne({...});
await externalApi.call(); // 外部 API 调用
await collection.updateOne({...});
});
// ✅ 好:事务只包含必要的数据库操作
const apiResult = await externalApi.call();
session.withTransaction(async () => {
await collection.updateOne({...});
await collection.updateOne({...});
});
为什么? 事务持有锁的时间越长,对并发性能的影响越大。
2. 使用适当的写关注
// ✅ 生产环境推荐
session.startTransaction({
writeConcern: { w: 'majority', j: true }
});
// 可以接受少量数据丢失的场景
session.startTransaction({
writeConcern: { w: 1 }
});
3. 正确处理错误
const session = client.startSession();
try {
await session.withTransaction(async () => {
// 业务逻辑
});
} catch (error) {
if (error.hasErrorLabel('TransientTransactionError')) {
// 临时错误,可以重试
console.log('临时错误,稍后重试');
} else if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
// 提交结果未知
console.log('提交结果未知,需要检查数据一致性');
} else {
// 业务错误
console.error('业务错误:', error.message);
}
} finally {
await session.endSession();
}
4. 避免热点数据
// ❌ 不好:频繁更新同一文档
for (let i = 0; i < 100; i++) {
session.withTransaction(async () => {
await counters.updateOne(
{ _id: 'global' },
{ $inc: { value: 1 } }
);
});
}
// ✅ 好:使用批量操作或优化数据模型
session.withTransaction(async () => {
await counters.updateOne(
{ _id: 'global' },
{ $inc: { value: 100 } }
);
});
5. 优先考虑单文档原子性
// 如果可以使用嵌入式文档解决,就不需要事务
// ❌ 需要事务
// orders 集合和 inventory 集合分开存储
// ✅ 不需要事务(嵌入式设计)
{
_id: 'order_1',
items: [
{ productId: 'p1', quantity: 2, price: 100 }
],
status: 'pending',
createdAt: ISODate('...')
}
事务监控
查看当前事务
// 查看当前活跃的事务
db.currentOp({ 'transaction': { $exists: true } })
// 查看事务详情
db.currentOp({ 'transaction.id': 123 })
监控事务锁
// 查看锁等待
db.aggregate([
{ $currentOp: { allUsers: true, idleCursors: true } },
{ $match: { 'transaction': { $exists: true } } }
])
事务统计
// 查看服务器状态中的事务统计
db.serverStatus().transactions
小结
本章我们学习了:
- 事务基础:ACID 特性和事务的作用
- 使用方法:基本语法和 withTransaction
- 事务选项:读关注、写关注、读偏好
- 代码示例:Shell 和 Node.js 驱动
- 限制和最佳实践:保持简短、正确处理错误
- 监控方法:查看活跃事务和锁状态
事务是 MongoDB 的重要特性,但应该谨慎使用。在大多数情况下,通过合理的数据建模(嵌入式文档)可以避免使用事务。