GridFS 大文件存储
GridFS 是 MongoDB 用于存储大文件的规范,适合存储超过 16MB 的文件(如图片、视频、PDF 等)。本章介绍 GridFS 的原理和使用方法。
什么是 GridFS?
概念介绍
MongoDB 单个文档的大小限制为 16MB,而 GridFS 通过将大文件分割成多个小块(chunk)来突破这个限制。
GridFS 集合结构
GridFS 使用两个集合来存储文件:
| 集合 | 说明 |
|---|---|
fs.files | 存储文件元数据(文件名、大小、类型等) |
fs.chunks | 存储文件内容(二进制数据块) |
fs.files 文档结构
{
"_id": ObjectId("..."),
"length": 10485760, // 文件大小(字节)
"chunkSize": 261120, // 每个块的大小
"uploadDate": ISODate("..."), // 上传时间
"md5": "d41d8cd98f00b204...", // MD5 校验值
"filename": "document.pdf", // 文件名
"contentType": "application/pdf", // MIME 类型
"metadata": { // 自定义元数据
"uploader": "zhangsan",
"description": "项目文档"
}
}
fs.chunks 文档结构
{
"_id": ObjectId("..."),
"files_id": ObjectId("..."), // 关联 fs.files 的 _id
"n": 0, // 块序号(从 0 开始)
"data": BinData(0, "...") // 二进制数据
}
何时使用 GridFS?
适用场景
- 存储超过 16MB 的文件
- 需要文件和数据库事务一致性
- 需要跨区域复制文件
- 需要按文件内容范围读取
不适用场景
- 文件小于 16MB:直接使用普通文档存储更高效
- 需要原子更新整个文件
- 需要高性能随机访问
GridFS vs 文件系统
| 特性 | GridFS | 文件系统 |
|---|---|---|
| 文件大小限制 | 无限制 | 取决于文件系统 |
| 事务支持 | 支持 | 不支持 |
| 复制和高可用 | MongoDB 原生支持 | 需要额外配置 |
| 随机访问 | 较慢 | 快 |
| 备份恢复 | 与数据库一起 | 需要单独备份 |
使用 mongofiles 工具
mongofiles 是 MongoDB 提供的命令行工具,用于管理 GridFS 文件。
上传文件
# 基本上传
mongofiles put document.pdf
# 指定数据库
mongofiles -d mydb put document.pdf
# 指定主机和端口
mongofiles --host localhost --port 27017 put document.pdf
# 带认证
mongofiles -u admin -p password --authenticationDatabase admin put document.pdf
列出文件
# 列出所有文件
mongofiles list
# 列出特定数据库的文件
mongofiles -d mydb list
# 搜索特定文件
mongofiles list document.pdf
下载文件
# 下载文件
mongofiles get document.pdf
# 下载到指定路径
mongofiles get document.pdf --local /path/to/save/document.pdf
删除文件
# 删除文件
mongofiles delete document.pdf
# 删除所有匹配的文件
mongofiles delete "prefix*"
搜索文件
# 搜索文件名包含特定字符串的文件
mongofiles search doc
# 使用正则表达式搜索
mongofiles search "\.pdf$"
Node.js 驱动中使用 GridFS
基本上传
const { MongoClient, GridFSBucket } = require('mongodb')
const fs = require('fs')
async function uploadFile() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const db = client.db('myapp')
const bucket = new GridFSBucket(db)
// 从本地文件上传
const uploadStream = bucket.openUploadStream('document.pdf', {
chunkSizeBytes: 1048576, // 1MB 块大小
metadata: {
uploader: 'zhangsan',
description: '项目文档',
tags: ['important', 'project']
}
})
const fileStream = fs.createReadStream('./document.pdf')
fileStream.pipe(uploadStream)
.on('finish', () => {
console.log('文件上传完成,ID:', uploadStream.id)
})
.on('error', (error) => {
console.error('上传失败:', error)
})
await client.close()
}
基本下载
async function downloadFile(fileId, outputPath) {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const db = client.db('myapp')
const bucket = new GridFSBucket(db)
const downloadStream = bucket.openDownloadStream(fileId)
const writeStream = fs.createWriteStream(outputPath)
downloadStream.pipe(writeStream)
.on('finish', () => {
console.log('文件下载完成')
})
.on('error', (error) => {
console.error('下载失败:', error)
})
await client.close()
}
按文件名下载
async function downloadByName(filename, outputPath) {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const db = client.db('myapp')
const bucket = new GridFSBucket(db)
// 注意:如果有同名文件,会下载最新的
const downloadStream = bucket.openDownloadStreamByName(filename)
const writeStream = fs.createWriteStream(outputPath)
downloadStream.pipe(writeStream)
.on('finish', () => {
console.log('下载完成')
})
await client.close()
}
列出文件
async function listFiles() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const db = client.db('myapp')
const bucket = new GridFSBucket(db)
const files = await bucket.find().toArray()
files.forEach(file => {
console.log({
id: file._id,
name: file.filename,
size: file.length,
uploadDate: file.uploadDate,
contentType: file.contentType
})
})
await client.close()
}
按条件查找文件
async function findFiles(query) {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const db = client.db('myapp')
const bucket = new GridFSBucket(db)
// 查找特定条件的文件
const files = await bucket.find({
'metadata.uploader': 'zhangsan'
}).toArray()
// 排序和分页
const recentFiles = await bucket.find({})
.sort({ uploadDate: -1 })
.limit(10)
.toArray()
await client.close()
return files
}
删除文件
async function deleteFile(fileId) {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const db = client.db('myapp')
const bucket = new GridFSBucket(db)
try {
await bucket.delete(fileId)
console.log('文件删除成功')
} catch (error) {
console.error('删除失败:', error)
}
await client.close()
}
重命名文件
async function renameFile(fileId, newFilename) {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const db = client.db('myapp')
const bucket = new GridFSBucket(db)
try {
await bucket.rename(fileId, newFilename)
console.log('重命名成功')
} catch (error) {
console.error('重命名失败:', error)
}
await client.close()
}
Express 应用中使用 GridFS
文件上传接口
const express = require('express')
const multer = require('multer')
const { MongoClient, GridFSBucket } = require('mongodb')
const app = express()
const upload = multer({ dest: 'uploads/' })
let db, bucket
async function initDB() {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
db = client.db('myapp')
bucket = new GridFSBucket(db)
}
// 单文件上传
app.post('/upload', upload.single('file'), async (req, res) => {
try {
const fileStream = fs.createReadStream(req.file.path)
const uploadStream = bucket.openUploadStream(req.file.originalname, {
contentType: req.file.mimetype,
metadata: {
originalName: req.file.originalname,
size: req.file.size
}
})
fileStream.pipe(uploadStream)
.on('finish', () => {
// 删除临时文件
fs.unlinkSync(req.file.path)
res.json({
success: true,
fileId: uploadStream.id,
filename: req.file.originalname
})
})
.on('error', (error) => {
res.status(500).json({ error: error.message })
})
} catch (error) {
res.status(500).json({ error: error.message })
}
})
// 多文件上传
app.post('/upload/multiple', upload.array('files', 10), async (req, res) => {
const results = []
for (const file of req.files) {
const fileStream = fs.createReadStream(file.path)
const uploadStream = bucket.openUploadStream(file.originalname, {
contentType: file.mimetype
})
await new Promise((resolve, reject) => {
fileStream.pipe(uploadStream)
.on('finish', () => {
results.push({
id: uploadStream.id,
filename: file.originalname
})
fs.unlinkSync(file.path)
resolve()
})
.on('error', reject)
})
}
res.json({ success: true, files: results })
})
initDB().then(() => {
app.listen(3000, () => console.log('服务器启动'))
})
文件下载接口
// 下载文件
app.get('/download/:id', async (req, res) => {
try {
const fileId = new ObjectId(req.params.id)
// 获取文件信息
const file = await db.collection('fs.files').findOne({ _id: fileId })
if (!file) {
return res.status(404).json({ error: '文件不存在' })
}
// 设置响应头
res.setHeader('Content-Type', file.contentType || 'application/octet-stream')
res.setHeader('Content-Disposition', `attachment; filename="${encodeURIComponent(file.filename)}"`)
res.setHeader('Content-Length', file.length)
// 流式传输
const downloadStream = bucket.openDownloadStream(fileId)
downloadStream.pipe(res)
} catch (error) {
res.status(500).json({ error: error.message })
}
})
// 流式播放视频/音频
app.get('/stream/:id', async (req, res) => {
try {
const fileId = new ObjectId(req.params.id)
const file = await db.collection('fs.files').findOne({ _id: fileId })
if (!file) {
return res.status(404).json({ error: '文件不存在' })
}
const range = req.headers.range
if (!range) {
res.setHeader('Content-Length', file.length)
bucket.openDownloadStream(fileId).pipe(res)
return
}
// 处理 Range 请求(视频拖拽)
const parts = range.replace(/bytes=/, '').split('-')
const start = parseInt(parts[0], 10)
const end = parts[1] ? parseInt(parts[1], 10) : file.length - 1
res.setHeader('Content-Range', `bytes ${start}-${end}/${file.length}`)
res.setHeader('Content-Length', end - start + 1)
res.setHeader('Content-Type', file.contentType || 'video/mp4')
res.status(206)
bucket.openDownloadStream(fileId, { start, end: end + 1 }).pipe(res)
} catch (error) {
res.status(500).json({ error: error.message })
}
})
文件列表接口
// 获取文件列表
app.get('/files', async (req, res) => {
try {
const page = parseInt(req.query.page) || 1
const limit = parseInt(req.query.limit) || 20
const skip = (page - 1) * limit
const files = await bucket.find({})
.sort({ uploadDate: -1 })
.skip(skip)
.limit(limit)
.toArray()
const total = await db.collection('fs.files').countDocuments()
res.json({
files: files.map(f => ({
id: f._id,
name: f.filename,
size: f.length,
contentType: f.contentType,
uploadDate: f.uploadDate
})),
pagination: {
page,
limit,
total,
pages: Math.ceil(total / limit)
}
})
} catch (error) {
res.status(500).json({ error: error.message })
}
})
高级功能
范围读取
GridFS 支持读取文件的特定范围,适合视频播放等场景:
async function readPartial(fileId, start, length) {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const db = client.db('myapp')
const bucket = new GridFSBucket(db)
// 只读取指定范围
const downloadStream = bucket.openDownloadStream(fileId, {
start, // 起始字节
end: start + length // 结束字节
})
const chunks = []
return new Promise((resolve, reject) => {
downloadStream
.on('data', chunk => chunks.push(chunk))
.on('end', () => resolve(Buffer.concat(chunks)))
.on('error', reject)
})
}
计算文件 MD5
const crypto = require('crypto')
async function calculateMD5(fileId) {
const client = new MongoClient('mongodb://localhost:27017')
await client.connect()
const db = client.db('myapp')
const bucket = new GridFSBucket(db)
const hash = crypto.createHash('md5')
const downloadStream = bucket.openDownloadStream(fileId)
return new Promise((resolve, reject) => {
downloadStream
.on('data', chunk => hash.update(chunk))
.on('end', () => resolve(hash.digest('hex')))
.on('error', reject)
})
}
流式处理大文件
const { pipeline } = require('stream')
const zlib = require('zlib')
async function processLargeFile(fileId) {
const bucket = new GridFSBucket(db)
// 创建转换流
const gzip = zlib.createGzip()
const uploadStream = bucket.openUploadStream('compressed.gz')
// 管道处理:读取 -> 压缩 -> 上传
pipeline(
bucket.openDownloadStream(fileId),
gzip,
uploadStream,
(error) => {
if (error) {
console.error('处理失败:', error)
} else {
console.log('处理完成')
}
}
)
}
GridFS 最佳实践
1. 合理设置块大小
// 默认块大小为 255KB
// 对于大文件可以增加块大小
const bucket = new GridFSBucket(db, {
chunkSizeBytes: 1024 * 1024 // 1MB
})
建议:
- 小文件(< 1MB):使用默认 255KB
- 大文件(> 100MB):使用 1MB 或更大
2. 添加适当的索引
// GridFS 默认创建的索引
// fs.chunks: { files_id: 1, n: 1 } (unique)
// fs.files: _id (unique)
// 根据查询需求添加额外索引
db.collection('fs.files').createIndex({ filename: 1 })
db.collection('fs.files').createIndex({ uploadDate: -1 })
db.collection('fs.files').createIndex({ 'metadata.uploader': 1 })
3. 使用元数据
// 充分利用 metadata 存储业务信息
const uploadStream = bucket.openUploadStream('report.pdf', {
metadata: {
uploader: 'zhangsan',
department: '研发部',
project: '项目A',
tags: ['报告', '重要'],
version: '1.0',
createdAt: new Date()
}
})
4. 定期清理无用文件
async function cleanupOldFiles(daysOld = 30) {
const cutoffDate = new Date()
cutoffDate.setDate(cutoffDate.getDate() - daysOld)
const files = await bucket.find({
uploadDate: { $lt: cutoffDate }
}).toArray()
for (const file of files) {
await bucket.delete(file._id)
console.log(`删除文件: ${file.filename}`)
}
}
小结
本章我们学习了:
- GridFS 概念:将大文件分割存储的机制
- 集合结构:fs.files 和 fs.chunks
- mongofiles 工具:命令行文件管理
- Node.js 驱动:上传、下载、删除、查询
- Express 集成:Web 应用的文件上传下载
- 高级功能:范围读取、流式处理
- 最佳实践:块大小、索引、元数据、清理
GridFS 是存储大文件的有效方案,但对于小文件建议直接使用普通文档存储。