Change Streams 变更流
Change Streams 是 MongoDB 3.6 引入的重要特性,允许应用程序实时监听数据库的数据变更,是实现实时数据同步、消息通知等功能的关键技术。
什么是 Change Streams?
概念介绍
Change Streams 是一种实时的数据变更订阅机制,应用程序可以通过它监听集合、数据库或整个集群的数据变更事件。
为什么需要 Change Streams?
传统监听数据变更的方式存在各种问题:
| 方式 | 问题 |
|---|---|
| 轮询查询 | 效率低、延迟高、资源浪费 |
| Tail Oplog | 复杂、不稳定、内部实现可能变化 |
| 触发器 | 数据库层面实现,难以与业务集成 |
Change Streams 的优势:
- 实时性:变更立即推送,无延迟
- 可靠性:支持断点续传,不丢失事件
- 安全性:遵循 MongoDB 权限控制
- 易用性:API 简单,与驱动完美集成
- 一致性:保证事件顺序与操作顺序一致
前提条件
使用 Change Streams 需要:
- MongoDB 3.6 或更高版本
- 副本集(Replica Set)或分片集群(Sharded Cluster)
- WiredTiger 存储引擎
基本使用
监听集合变更
// 在 mongosh 中监听集合变更
const changeStream = db.users.watch()
// 监听变更事件
changeStream.on('change', (change) => {
printjson(change)
})
// 手动获取下一个变更
while (true) {
const next = changeStream.next()
printjson(next)
}
监听数据库变更
// 监听整个数据库的变更
const changeStream = db.getMongo().watch()
changeStream.on('change', (change) => {
print(`Collection: ${change.ns.coll}`)
printjson(change)
})
监听集群变更
// 监听整个集群的变更
const client = db.getMongo()
const changeStream = client.watch()
changeStream.on('change', (change) => {
print(`Database: ${change.ns.db}`)
print(`Collection: ${change.ns.coll}`)
printjson(change)
})
变更事件结构
事件类型
Change Streams 会返回以下类型的变更事件:
| 事件类型 | 说明 |
|---|---|
insert | 插入文档 |
update | 更新文档 |
replace | 替换文档 |
delete | 删除文档 |
invalidate | 集合被删除或重命名 |
事件结构详解
insert 事件
{
"_id": {
"_data": "8265F5..." // 用于恢复的位置标识
},
"operationType": "insert",
"clusterTime": Timestamp(1710460800, 1),
"wallTime": ISODate("2024-03-15T00:00:00.000Z"),
"fullDocument": {
"_id": ObjectId("..."),
"name": "张三",
"email": "[email protected]",
"age": 28
},
"ns": {
"db": "myapp",
"coll": "users"
},
"documentKey": {
"_id": ObjectId("...")
}
}
关键字段说明:
_id:变更事件的唯一标识,用于恢复operationType:操作类型clusterTime:操作的时间戳fullDocument:完整的文档内容ns:命名空间(数据库.集合)documentKey:文档的主键
update 事件
{
"_id": { "_data": "..." },
"operationType": "update",
"clusterTime": Timestamp(1710460800, 2),
"wallTime": ISODate("2024-03-15T00:00:01.000Z"),
"ns": { "db": "myapp", "coll": "users" },
"documentKey": { "_id": ObjectId("...") },
"updateDescription": {
"updatedFields": {
"age": 29,
"status": "active"
},
"removedFields": ["tempField"],
"truncatedArrays": []
},
"fullDocument": { // 仅当配置了 fullDocument 选项时存在
"_id": ObjectId("..."),
"name": "张三",
"age": 29,
"status": "active"
}
}
update 事件特有字段:
updateDescription.updatedFields:被更新的字段updateDescription.removedFields:被删除的字段fullDocument:完整文档(需要配置)
delete 事件
{
"_id": { "_data": "..." },
"operationType": "delete",
"clusterTime": Timestamp(1710460800, 3),
"ns": { "db": "myapp", "coll": "users" },
"documentKey": { "_id": ObjectId("...") }
}
注意:delete 事件不包含被删除文档的内容。
过滤变更事件
按操作类型过滤
// 只监听 insert 和 update 事件
const changeStream = db.users.watch([
{ $match: { operationType: { $in: ['insert', 'update'] } } }
])
changeStream.on('change', (change) => {
print(`操作类型: ${change.operationType}`)
printjson(change)
})
按字段值过滤
// 只监听特定用户的变更
const changeStream = db.users.watch([
{ $match: {
operationType: 'update',
'fullDocument.status': 'active'
}}
])
changeStream.on('change', (change) => {
print(`用户状态变更为活跃: ${change.fullDocument.name}`)
})
按更新字段过滤
// 只监听 status 字段的更新
const changeStream = db.users.watch([
{ $match: {
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}}
])
changeStream.on('change', (change) => {
print(`状态更新: ${change.updateDescription.updatedFields.status}`)
})
配置选项
fullDocument 选项
控制 update 事件中是否包含完整文档:
// 默认:update 事件不包含完整文档
db.users.watch()
// update 时返回完整文档
db.users.watch([], { fullDocument: 'updateLookup' })
// 始终返回完整文档(包括 delete 除外)
db.users.watch([], { fullDocument: 'whenAvailable' })
| 选项值 | 说明 |
|---|---|
default | 只有 insert 和 replace 返回完整文档 |
updateLookup | update 事件返回完整文档(额外查询) |
whenAvailable | 尽可能返回完整文档 |
required | 必须返回完整文档,否则报错 |
resumeAfter 实现断点续传
// 保存恢复令牌
let resumeToken
changeStream.on('change', (change) => {
resumeToken = change._id
// 处理变更...
})
// 从断点恢复
const changeStream = db.users.watch([], {
resumeAfter: resumeToken
})
startAtOperationTime
从特定时间点开始监听:
// 从指定时间点开始
const timestamp = Timestamp(1710460800, 0)
const changeStream = db.users.watch([], {
startAtOperationTime: timestamp
})
Node.js 驱动中使用
基本用法
const { MongoClient } = require('mongodb')
async function watchChanges() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const collection = client.db('myapp').collection('users')
// 创建变更流
const changeStream = collection.watch()
// 监听变更
changeStream.on('change', (change) => {
console.log('变更事件:', change.operationType)
console.log('文档ID:', change.documentKey._id)
switch (change.operationType) {
case 'insert':
console.log('新文档:', change.fullDocument)
break
case 'update':
console.log('更新字段:', change.updateDescription.updatedFields)
break
case 'delete':
console.log('删除文档')
break
}
})
// 错误处理
changeStream.on('error', (error) => {
console.error('变更流错误:', error)
})
}
watchChanges().catch(console.error)
带过滤的监听
async function watchActiveUsers() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const collection = client.db('myapp').collection('users')
// 只监听活跃用户的状态变更
const changeStream = collection.watch(
[
{
$match: {
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}
}
],
{ fullDocument: 'updateLookup' }
)
try {
for await (const change of changeStream) {
const userId = change.documentKey._id
const newStatus = change.updateDescription.updatedFields.status
// 触发业务逻辑
await handleStatusChange(userId, newStatus)
}
} catch (error) {
console.error('监听错误:', error)
}
}
断点续传实现
const fs = require('fs')
class ChangeStreamResumer {
constructor(collection, resumeFile) {
this.collection = collection
this.resumeFile = resumeFile
this.resumeToken = this.loadResumeToken()
}
loadResumeToken() {
try {
if (fs.existsSync(this.resumeFile)) {
return JSON.parse(fs.readFileSync(this.resumeFile, 'utf8'))
}
} catch (error) {
console.error('加载恢复令牌失败:', error)
}
return null
}
saveResumeToken(token) {
fs.writeFileSync(this.resumeFile, JSON.stringify(token))
}
async start(onChange) {
const options = this.resumeToken
? { resumeAfter: this.resumeToken }
: {}
const changeStream = this.collection.watch([], options)
try {
for await (const change of changeStream) {
this.resumeToken = change._id
this.saveResumeToken(this.resumeToken)
await onChange(change)
}
} catch (error) {
console.error('变更流错误:', error)
setTimeout(() => this.start(onChange), 5000)
}
}
}
实战场景
1. 实时通知系统
async function setupNotificationSystem() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const orders = client.db('myapp').collection('orders')
const changeStream = orders.watch(
[
{
$match: {
operationType: 'update',
'updateDescription.updatedFields.status': { $exists: true }
}
}
],
{ fullDocument: 'updateLookup' }
)
for await (const change of changeStream) {
const order = change.fullDocument
const newStatus = change.updateDescription.updatedFields.status
// 发送通知
await sendNotification(order.userId, {
type: 'order_status',
orderId: order._id,
status: newStatus
})
}
}
2. 数据同步到搜索引擎
async function syncToElasticsearch() {
const mongoClient = new MongoClient('mongodb://localhost:27017')
await mongoClient.connect()
const products = mongoClient.db('myapp').collection('products')
const changeStream = products.watch([], { fullDocument: 'updateLookup' })
for await (const change of changeStream) {
switch (change.operationType) {
case 'insert':
case 'update':
case 'replace':
await esClient.index({
index: 'products',
id: change.documentKey._id.toString(),
body: change.fullDocument
})
break
case 'delete':
await esClient.delete({
index: 'products',
id: change.documentKey._id.toString()
})
break
}
}
}
3. 审计日志
async function setupAuditLog() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const auditLog = client.db('myapp').collection('audit_log')
const users = client.db('myapp').collection('users')
const changeStream = users.watch([], {
fullDocument: 'updateLookup',
fullDocumentBeforeChange: 'whenAvailable'
})
for await (const change of changeStream) {
await auditLog.insertOne({
collection: 'users',
operationType: change.operationType,
documentId: change.documentKey._id,
oldValue: change.fullDocumentBeforeChange,
newValue: change.fullDocument,
changedFields: change.updateDescription?.updatedFields,
timestamp: new Date()
})
}
}
最佳实践
1. 正确处理错误和重连
async function robustWatch() {
while (true) {
try {
const changeStream = collection.watch()
for await (const change of changeStream) {
try {
await processChange(change)
} catch (error) {
console.error('处理变更失败:', error)
}
}
} catch (error) {
console.error('变更流断开:', error)
await new Promise(resolve => setTimeout(resolve, 5000))
}
}
}
2. 保存恢复令牌
// 定期保存,而不是每次变更都保存
let lastSaveTime = 0
const SAVE_INTERVAL = 5000
changeStream.on('change', (change) => {
// 处理变更...
const now = Date.now()
if (now - lastSaveTime > SAVE_INTERVAL) {
saveResumeToken(change._id)
lastSaveTime = now
}
})
3. 合理设置缓冲区
const changeStream = collection.watch([], {
maxAwaitTimeMS: 1000 // 最大等待时间
})
小结
本章我们学习了:
- Change Streams 概念:实时数据变更订阅机制
- 基本使用:监听集合、数据库、集群变更
- 事件结构:insert、update、delete、replace 事件
- 过滤条件:按操作类型和字段值过滤
- 配置选项:fullDocument、resumeAfter、startAtOperationTime
- Node.js 实现:基本用法和断点续传
- 实战场景:通知系统、数据同步、审计日志
- 最佳实践:错误处理、恢复令牌、缓冲区设置