Node.js 流(Stream)
流(Stream)是 Node.js 中处理流式数据的抽象接口。流提供了一种高效的方式来处理读写数据,特别适合处理大文件或实时数据。
什么是流?
流是一种以连续方式读取或写入数据的方法,而不是一次性将所有数据加载到内存中。
为什么使用流?
传统方式的问题:
const fs = require('fs');
// 一次性读取整个文件到内存
const data = fs.readFileSync('big-file.txt', 'utf8');
// 问题:如果文件很大(如 1GB),会占用大量内存
使用流的优势:
const fs = require('fs');
// 使用流读取文件,每次只读取一部分数据
const readStream = fs.createReadStream('big-file.txt', 'utf8');
readStream.on('data', (chunk) => {
console.log('收到数据块:', chunk.length, '字节');
});
// 优势:内存占用小,可以处理任意大小的文件
解释:
- 传统方式需要将整个文件加载到内存,对于大文件会造成内存溢出
- 流式处理每次只处理一小块数据,内存占用恒定
- 流可以立即开始处理数据,不需要等待全部数据加载完成
流的四种类型
Node.js 中有四种基本的流类型:
| 类型 | 说明 | 示例 |
|---|---|---|
| Readable | 可读流 | fs.createReadStream |
| Writable | 可写流 | fs.createWriteStream |
| Duplex | 双工流(可读可写) | net.Socket |
| Transform | 转换流(可修改数据) | zlib.createGzip |
1. Readable(可读流)
可读流是从中读取数据的流。
常见的可读流:
fs.createReadStream()- 文件读取流- HTTP 请求(客户端接收响应)
process.stdin- 标准输入
const fs = require('fs');
// 创建可读流
const readStream = fs.createReadStream('input.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 每次读取 64KB(默认)
});
// 监听 data 事件接收数据
readStream.on('data', (chunk) => {
console.log('读取到数据:', chunk.length, '字节');
});
// 监听 end 事件
readStream.on('end', () => {
console.log('读取完成');
});
// 监听错误事件
readStream.on('error', (err) => {
console.error('读取错误:', err);
});
解释:
highWaterMark指定内部缓冲区的大小data事件在每次有数据可读时触发end事件在所有数据读取完成后触发- 始终要处理
error事件,避免程序崩溃
2. Writable(可写流)
可写流是向其写入数据的流。
常见的可写流:
fs.createWriteStream()- 文件写入流- HTTP 响应(服务端发送响应)
process.stdout- 标准输出
const fs = require('fs');
// 创建可写流
const writeStream = fs.createWriteStream('output.txt');
// 写入数据
writeStream.write('第一行数据\n');
writeStream.write('第二行数据\n');
// 结束写入
writeStream.end('最后一行数据\n');
// 监听完成事件
writeStream.on('finish', () => {
console.log('写入完成');
});
// 监听错误事件
writeStream.on('error', (err) => {
console.error('写入错误:', err);
});
解释:
write()方法返回一个布尔值,表示是否可以继续写入end()方法结束写入,之后不能再调用write()finish事件在所有数据写入完成后触发
3. Duplex(双工流)
双工流既是可读的也是可写的。
常见的双工流:
- TCP Socket
- WebSocket
const { Duplex } = require('stream');
// 创建自定义双工流
const duplex = new Duplex({
write(chunk, encoding, callback) {
console.log('写入:', chunk.toString());
callback();
},
read(size) {
this.push('可读数据');
this.push(null); // 结束读取
}
});
// 可以同时读取和写入
duplex.write('Hello');
duplex.on('data', (chunk) => {
console.log('读取:', chunk.toString());
});
4. Transform(转换流)
转换流是一种特殊的双工流,可以在读写过程中修改或转换数据。
常见的转换流:
zlib.createGzip()- 压缩流crypto.createCipheriv()- 加密流
const { Transform } = require('stream');
// 创建转换流:将数据转为大写
const upperCase = new Transform({
transform(chunk, encoding, callback) {
// 转换数据
const result = chunk.toString().toUpperCase();
// 推送转换后的数据
callback(null, result);
}
});
// 使用
process.stdin.pipe(upperCase).pipe(process.stdout);
// 输入: hello
// 输出: HELLO
解释:
transform方法接收输入数据,处理后调用callback输出结果callback(null, result)第一个参数是错误,第二个是转换后的数据- 转换流常用于数据处理管道中
管道(pipe)
管道是将可读流的输出直接连接到可写流的输入的方法。
基本用法
const fs = require('fs');
// 将一个文件的内容复制到另一个文件
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('destination.txt');
readStream.pipe(writeStream);
writeStream.on('finish', () => {
console.log('复制完成');
});
解释:
pipe()方法自动管理数据流动和背压- 它返回目标流,支持链式调用
- 自动处理错误和关闭事件
链式管道
const fs = require('fs');
const zlib = require('zlib');
// 读取文件 -> 压缩 -> 写入新文件
fs.createReadStream('input.txt')
.pipe(zlib.createGzip()) // Gzip 压缩
.pipe(fs.createWriteStream('input.txt.gz'));
console.log('文件压缩完成');
实际应用:解压文件
fs.createReadStream('input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('input-unzipped.txt'));
管道链:处理日志
const fs = require('fs');
const { Transform } = require('stream');
const zlib = require('zlib');
// 创建转换流:提取错误日志
const errorFilter = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
const errors = lines.filter(line => line.includes('ERROR'));
callback(null, errors.join('\n') + '\n');
}
});
// 读取日志 -> 过滤错误 -> 压缩 -> 保存
fs.createReadStream('app.log')
.pipe(errorFilter)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('errors.log.gz'));
流的事件
可读流事件
const fs = require('fs');
const readStream = fs.createReadStream('file.txt');
// data - 当有数据可读时触发
readStream.on('data', (chunk) => {
console.log('收到数据块');
});
// end - 当没有更多数据可读时触发
readStream.on('end', () => {
console.log('读取完成');
});
// error - 当接收数据出错时触发
readStream.on('error', (err) => {
console.error('错误:', err);
});
// close - 当流关闭时触发
readStream.on('close', () => {
console.log('流已关闭');
});
// open - 当文件打开时触发(文件流特有)
readStream.on('open', (fd) => {
console.log('文件描述符:', fd);
});
// readable - 当流中有数据可读时触发(暂停模式)
readStream.on('readable', () => {
let chunk;
while (null !== (chunk = readStream.read())) {
console.log('读取到:', chunk.toString());
}
});
可写流事件
const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');
// drain - 当可以继续写入数据时触发
writeStream.on('drain', () => {
console.log('缓冲区已清空,可以继续写入');
});
// finish - 当所有数据写入完成时触发
writeStream.on('finish', () => {
console.log('写入完成');
});
// error - 当写入出错时触发
writeStream.on('error', (err) => {
console.error('写入错误:', err);
});
// close - 当流关闭时触发
writeStream.on('close', () => {
console.log('流已关闭');
});
// pipe - 当可读流通过 pipe 连接到此流时触发
writeStream.on('pipe', (src) => {
console.log('有数据流进入');
});
// unpipe - 当可读流取消 pipe 连接时触发
writeStream.on('unpipe', (src) => {
console.log('数据流已断开');
});
流的两种模式
可读流有两种操作模式:流动模式(flowing)和 暂停模式(paused)。
流动模式
在流动模式下,数据自动从底层系统读取,并通过事件尽快提供给应用程序。
const fs = require('fs');
const readStream = fs.createReadStream('file.txt');
// 添加 data 监听器后,流进入流动模式
readStream.on('data', (chunk) => {
console.log('收到数据:', chunk.length);
});
暂停模式
在暂停模式下,必须显式调用 read() 方法来读取数据。
const fs = require('fs');
const readStream = fs.createReadStream('file.txt');
// 使用 readable 事件和 read() 方法
readStream.on('readable', () => {
let chunk;
// 循环读取所有可用数据
while (null !== (chunk = readStream.read())) {
console.log('读取到:', chunk.toString());
}
});
模式切换
const readStream = fs.createReadStream('file.txt');
// 切换到流动模式
readStream.resume();
// 切换到暂停模式
readStream.pause();
// 使用 pipe() 会自动切换到流动模式
readStream.pipe(writeStream);
解释:
- 添加
data事件监听器会自动切换到流动模式 - 调用
pause()方法切换到暂停模式 - 调用
resume()方法切换到流动模式 - 使用
pipe()会自动管理流模式
背压(Backpressure)
背压是指当写入速度超过读取速度时,数据会在缓冲区中累积。Node.js 流提供了自动处理背压的机制。
问题示例
const fs = require('fs');
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('dest.txt');
// 错误示例:忽略背压
readStream.on('data', (chunk) => {
// 如果写入速度跟不上读取速度,缓冲区会越来越大
writeStream.write(chunk);
});
// 问题:可能导致内存溢出
正确处理背压
const fs = require('fs');
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('dest.txt');
// 方式一:使用 pipe()(推荐)
readStream.pipe(writeStream);
// 方式二:手动处理
readStream.on('data', (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
// 缓冲区满了,暂停读取
readStream.pause();
}
});
writeStream.on('drain', () => {
// 缓冲区清空,恢复读取
readStream.resume();
});
readStream.on('end', () => {
writeStream.end();
});
解释:
write()返回false表示缓冲区已满drain事件表示缓冲区已清空,可以继续写入- 使用
pipe()可以自动处理背压问题
实用工具函数
stream.pipeline
pipeline 用于连接多个流,自动处理错误和清理资源。
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// 使用回调
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('管道失败:', err);
} else {
console.log('管道成功');
}
}
);
stream.pipeline(Promise 版本)
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');
async function compress() {
try {
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz')
);
console.log('压缩完成');
} catch (err) {
console.error('压缩失败:', err);
}
}
compress();
stream.finished
finished 用于等待流结束。
const { finished } = require('stream/promises');
const fs = require('fs');
async function readToEnd() {
const rs = fs.createReadStream('file.txt');
// 等待流结束
await finished(rs);
console.log('流已结束');
// 记得消费流数据
rs.resume();
}
stream.Readable.from
从迭代器创建可读流。
const { Readable } = require('stream');
// 从数组创建
const readable = Readable.from(['a', 'b', 'c']);
readable.on('data', (chunk) => {
console.log(chunk); // 'a', 'b', 'c'
});
// 从异步生成器创建
async function* generate() {
yield 'Hello';
yield 'World';
}
const asyncReadable = Readable.from(generate());
实战示例
进度条复制文件
const fs = require('fs');
async function copyWithProgress(src, dest) {
const { pipeline } = require('stream/promises');
const stat = await fs.promises.stat(src);
let copied = 0;
const readStream = fs.createReadStream(src);
const writeStream = fs.createWriteStream(dest);
readStream.on('data', (chunk) => {
copied += chunk.length;
const percent = (copied / stat.size * 100).toFixed(2);
process.stdout.write(`\r复制进度: ${percent}%`);
});
await pipeline(readStream, writeStream);
console.log('\n复制完成');
}
copyWithProgress('big-file.mp4', 'big-file-copy.mp4');
逐行处理文件
const fs = require('fs');
const readline = require('readline');
async function processLineByLine() {
const fileStream = fs.createReadStream('file.txt');
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity // 处理不同平台的换行符
});
for await (const line of rl) {
console.log(`行内容: ${line}`);
}
}
processLineByLine();
创建自定义转换流
const { Transform } = require('stream');
// JSON 解析转换流
class JSONParser extends Transform {
constructor() {
super({ objectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
callback();
}
_flush(callback) {
try {
const data = JSON.parse(this.buffer);
this.push(data);
callback();
} catch (err) {
callback(err);
}
}
}
// 使用
fs.createReadStream('data.json')
.pipe(new JSONParser())
.on('data', (data) => {
console.log('解析结果:', data);
});
HTTP 流式上传
const http = require('http');
const fs = require('fs');
const server = http.createServer((req, res) => {
if (req.method === 'POST') {
// 直接将请求流保存到文件
const fileStream = fs.createWriteStream('uploaded-file');
req.pipe(fileStream);
fileStream.on('finish', () => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('上传成功');
});
fileStream.on('error', (err) => {
res.writeHead(500);
res.end('上传失败');
});
}
});
server.listen(3000);
最佳实践
1. 始终处理错误
const readStream = fs.createReadStream('file.txt');
// ✅ 推荐:监听错误事件
readStream.on('error', (err) => {
console.error('读取失败:', err);
});
// ✅ 推荐:使用 pipeline 自动处理错误
await pipeline(
fs.createReadStream('file.txt'),
asyncTransform,
fs.createWriteStream('output.txt')
);
2. 清理资源
// 使用 AbortController 取消流
const controller = new AbortController();
const { signal } = controller;
const readStream = fs.createReadStream('file.txt', { signal });
// 需要取消时
controller.abort();
3. 选择合适的 highWaterMark
// 对于大文件处理,增大缓冲区提高性能
const readStream = fs.createReadStream('big-file.txt', {
highWaterMark: 1024 * 1024 // 1MB
});
// 对于实时数据,减小缓冲区降低延迟
const lowLatencyStream = fs.createReadStream('realtime.txt', {
highWaterMark: 1024 // 1KB
});
4. 使用 async iterator
const fs = require('fs');
const { Readable } = require('stream');
// Node.js 10+ 支持异步迭代
async function processStream() {
const readStream = fs.createReadStream('file.txt', 'utf8');
for await (const chunk of readStream) {
console.log('收到数据块:', chunk.length);
}
}
小结
本章我们学习了:
- 流的基本概念:为什么使用流、流的优势
- 四种流类型:Readable、Writable、Duplex、Transform
- 管道操作:使用
pipe()连接流 - 流的事件:data、end、error、drain 等
- 流的两种模式:流动模式和暂停模式
- 背压处理:如何处理读写速度不匹配
- 实用工具函数:pipeline、finished、Readable.from
- 实战示例:文件复制、逐行处理、自定义转换流
练习
- 使用流实现大文件复制,并显示进度
- 创建一个转换流,将文件中的所有小写字母转为大写
- 使用流处理 CSV 文件,逐行解析
- 实现一个日志收集系统,将多个日志文件合并并压缩
- 使用
pipeline实现文件加密功能