跳到主要内容

Change Streams 变更流

Change Streams 是 MongoDB 3.6 引入的重要特性,允许应用程序实时监听数据库的数据变更,是实现实时数据同步、消息通知等功能的关键技术。

什么是 Change Streams?

概念介绍

Change Streams 是一种实时的数据变更订阅机制,应用程序可以通过它监听集合、数据库或整个集群的数据变更事件。

为什么需要 Change Streams?

传统监听数据变更的方式存在各种问题:

方式问题
轮询查询效率低、延迟高、资源浪费
Tail Oplog复杂、不稳定、内部实现可能变化
触发器数据库层面实现,难以与业务集成

Change Streams 的优势

  • 实时性:变更立即推送,无延迟
  • 可靠性:支持断点续传,不丢失事件
  • 安全性:遵循 MongoDB 权限控制
  • 易用性:API 简单,与驱动完美集成
  • 一致性:保证事件顺序与操作顺序一致

前提条件

使用 Change Streams 需要:

  • MongoDB 3.6 或更高版本
  • 副本集(Replica Set)或分片集群(Sharded Cluster)
  • WiredTiger 存储引擎

基本使用

监听集合变更

// 在 mongosh 中监听集合变更
const changeStream = db.users.watch()

// 监听变更事件
changeStream.on('change', (change) => {
printjson(change)
})

// 手动获取下一个变更
while (true) {
const next = changeStream.next()
printjson(next)
}

监听数据库变更

// 监听整个数据库的变更
const changeStream = db.getMongo().watch()

changeStream.on('change', (change) => {
print(`Collection: ${change.ns.coll}`)
printjson(change)
})

监听集群变更

// 监听整个集群的变更
const client = db.getMongo()
const changeStream = client.watch()

changeStream.on('change', (change) => {
print(`Database: ${change.ns.db}`)
print(`Collection: ${change.ns.coll}`)
printjson(change)
})

变更事件结构

事件类型

Change Streams 会返回以下类型的变更事件:

事件类型说明
insert插入文档
update更新文档
replace替换文档
delete删除文档
invalidate集合被删除或重命名

事件结构详解

insert 事件

{
"_id": {
"_data": "8265F5..." // 用于恢复的位置标识
},
"operationType": "insert",
"clusterTime": Timestamp(1710460800, 1),
"wallTime": ISODate("2024-03-15T00:00:00.000Z"),
"fullDocument": {
"_id": ObjectId("..."),
"name": "张三",
"email": "[email protected]",
"age": 28
},
"ns": {
"db": "myapp",
"coll": "users"
},
"documentKey": {
"_id": ObjectId("...")
}
}

关键字段说明

  • _id:变更事件的唯一标识,用于恢复
  • operationType:操作类型
  • clusterTime:操作的时间戳
  • fullDocument:完整的文档内容
  • ns:命名空间(数据库.集合)
  • documentKey:文档的主键

update 事件

{
"_id": { "_data": "..." },
"operationType": "update",
"clusterTime": Timestamp(1710460800, 2),
"wallTime": ISODate("2024-03-15T00:00:01.000Z"),
"ns": { "db": "myapp", "coll": "users" },
"documentKey": { "_id": ObjectId("...") },
"updateDescription": {
"updatedFields": {
"age": 29,
"status": "active"
},
"removedFields": ["tempField"],
"truncatedArrays": []
},
"fullDocument": { // 仅当配置了 fullDocument 选项时存在
"_id": ObjectId("..."),
"name": "张三",
"age": 29,
"status": "active"
}
}

update 事件特有字段

  • updateDescription.updatedFields:被更新的字段
  • updateDescription.removedFields:被删除的字段
  • fullDocument:完整文档(需要配置)

delete 事件

{
"_id": { "_data": "..." },
"operationType": "delete",
"clusterTime": Timestamp(1710460800, 3),
"ns": { "db": "myapp", "coll": "users" },
"documentKey": { "_id": ObjectId("...") }
}

注意:delete 事件不包含被删除文档的内容。

过滤变更事件

按操作类型过滤

// 只监听 insert 和 update 事件
const changeStream = db.users.watch([
{ $match: { operationType: { $in: ['insert', 'update'] } } }
])

changeStream.on('change', (change) => {
print(`操作类型: ${change.operationType}`)
printjson(change)
})

按字段值过滤

// 只监听特定用户的变更
const changeStream = db.users.watch([
{ $match: {
operationType: 'update',
'fullDocument.status': 'active'
}}
])

changeStream.on('change', (change) => {
print(`用户状态变更为活跃: ${change.fullDocument.name}`)
})

按更新字段过滤

// 只监听 status 字段的更新
const changeStream = db.users.watch([
{ $match: {
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}}
])

changeStream.on('change', (change) => {
print(`状态更新: ${change.updateDescription.updatedFields.status}`)
})

配置选项

fullDocument 选项

控制 update 事件中是否包含完整文档:

// 默认:update 事件不包含完整文档
db.users.watch()

// update 时返回完整文档
db.users.watch([], { fullDocument: 'updateLookup' })

// 始终返回完整文档(包括 delete 除外)
db.users.watch([], { fullDocument: 'whenAvailable' })
选项值说明
default只有 insert 和 replace 返回完整文档
updateLookupupdate 事件返回完整文档(额外查询)
whenAvailable尽可能返回完整文档
required必须返回完整文档,否则报错

resumeAfter 实现断点续传

// 保存恢复令牌
let resumeToken
changeStream.on('change', (change) => {
resumeToken = change._id
// 处理变更...
})

// 从断点恢复
const changeStream = db.users.watch([], {
resumeAfter: resumeToken
})

startAtOperationTime

从特定时间点开始监听:

// 从指定时间点开始
const timestamp = Timestamp(1710460800, 0)
const changeStream = db.users.watch([], {
startAtOperationTime: timestamp
})

Node.js 驱动中使用

基本用法

const { MongoClient } = require('mongodb')

async function watchChanges() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()

const collection = client.db('myapp').collection('users')

// 创建变更流
const changeStream = collection.watch()

// 监听变更
changeStream.on('change', (change) => {
console.log('变更事件:', change.operationType)
console.log('文档ID:', change.documentKey._id)

switch (change.operationType) {
case 'insert':
console.log('新文档:', change.fullDocument)
break
case 'update':
console.log('更新字段:', change.updateDescription.updatedFields)
break
case 'delete':
console.log('删除文档')
break
}
})

// 错误处理
changeStream.on('error', (error) => {
console.error('变更流错误:', error)
})
}

watchChanges().catch(console.error)

带过滤的监听

async function watchActiveUsers() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()

const collection = client.db('myapp').collection('users')

// 只监听活跃用户的状态变更
const changeStream = collection.watch(
[
{
$match: {
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}
}
],
{ fullDocument: 'updateLookup' }
)

try {
for await (const change of changeStream) {
const userId = change.documentKey._id
const newStatus = change.updateDescription.updatedFields.status

// 触发业务逻辑
await handleStatusChange(userId, newStatus)
}
} catch (error) {
console.error('监听错误:', error)
}
}

断点续传实现

const fs = require('fs')

class ChangeStreamResumer {
constructor(collection, resumeFile) {
this.collection = collection
this.resumeFile = resumeFile
this.resumeToken = this.loadResumeToken()
}

loadResumeToken() {
try {
if (fs.existsSync(this.resumeFile)) {
return JSON.parse(fs.readFileSync(this.resumeFile, 'utf8'))
}
} catch (error) {
console.error('加载恢复令牌失败:', error)
}
return null
}

saveResumeToken(token) {
fs.writeFileSync(this.resumeFile, JSON.stringify(token))
}

async start(onChange) {
const options = this.resumeToken
? { resumeAfter: this.resumeToken }
: {}

const changeStream = this.collection.watch([], options)

try {
for await (const change of changeStream) {
this.resumeToken = change._id
this.saveResumeToken(this.resumeToken)
await onChange(change)
}
} catch (error) {
console.error('变更流错误:', error)
setTimeout(() => this.start(onChange), 5000)
}
}
}

实战场景

1. 实时通知系统

async function setupNotificationSystem() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()

const orders = client.db('myapp').collection('orders')

const changeStream = orders.watch(
[
{
$match: {
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}
}
],
{ fullDocument: 'updateLookup' }
)

for await (const change of changeStream) {
const order = change.fullDocument
const newStatus = change.updateDescription.updatedFields.status

// 发送通知
await sendNotification(order.userId, {
type: 'order_status',
orderId: order._id,
status: newStatus
})
}
}

2. 数据同步到搜索引擎

async function syncToElasticsearch() {
const mongoClient = new MongoClient('mongodb://localhost:27017')
await mongoClient.connect()

const products = mongoClient.db('myapp').collection('products')
const changeStream = products.watch([], { fullDocument: 'updateLookup' })

for await (const change of changeStream) {
switch (change.operationType) {
case 'insert':
case 'update':
case 'replace':
await esClient.index({
index: 'products',
id: change.documentKey._id.toString(),
body: change.fullDocument
})
break
case 'delete':
await esClient.delete({
index: 'products',
id: change.documentKey._id.toString()
})
break
}
}
}

3. 审计日志

async function setupAuditLog() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()

const auditLog = client.db('myapp').collection('audit_log')
const users = client.db('myapp').collection('users')

const changeStream = users.watch([], {
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable'
})

for await (const change of changeStream) {
await auditLog.insertOne({
collection: 'users',
operationType: change.operationType,
documentId: change.documentKey._id,
oldValue: change.fullDocumentBeforeChange,
newValue: change.fullDocument,
changedFields: change.updateDescription?.updatedFields,
timestamp: new Date()
})
}
}

最佳实践

1. 正确处理错误和重连

async function robustWatch() {
while (true) {
try {
const changeStream = collection.watch()

for await (const change of changeStream) {
try {
await processChange(change)
} catch (error) {
console.error('处理变更失败:', error)
}
}
} catch (error) {
console.error('变更流断开:', error)
await new Promise(resolve => setTimeout(resolve, 5000))
}
}
}

2. 保存恢复令牌

// 定期保存,而不是每次变更都保存
let lastSaveTime = 0
const SAVE_INTERVAL = 5000

changeStream.on('change', (change) => {
// 处理变更...

const now = Date.now()
if (now - lastSaveTime > SAVE_INTERVAL) {
saveResumeToken(change._id)
lastSaveTime = now
}
})

3. 合理设置缓冲区

const changeStream = collection.watch([], {
maxAwaitTimeMS: 1000 // 最大等待时间
})

小结

本章我们学习了:

  1. Change Streams 概念:实时数据变更订阅机制
  2. 基本使用:监听集合、数据库、集群变更
  3. 事件结构:insert、update、delete、replace 事件
  4. 过滤条件:按操作类型和字段值过滤
  5. 配置选项:fullDocument、resumeAfter、startAtOperationTime
  6. Node.js 实现:基本用法和断点续传
  7. 实战场景:通知系统、数据同步、审计日志
  8. 最佳实践:错误处理、恢复令牌、缓冲区设置