跳到主要内容

Node.js 流(Stream)

流(Stream)是 Node.js 中处理流式数据的抽象接口。流提供了一种高效的方式来处理读写数据,特别适合处理大文件或实时数据。

什么是流?

流是一种以连续方式读取或写入数据的方法,而不是一次性将所有数据加载到内存中。

为什么使用流?

传统方式的问题

const fs = require('fs');

// 一次性读取整个文件到内存
const data = fs.readFileSync('big-file.txt', 'utf8');
// 问题:如果文件很大(如 1GB),会占用大量内存

使用流的优势

const fs = require('fs');

// 使用流读取文件,每次只读取一部分数据
const readStream = fs.createReadStream('big-file.txt', 'utf8');

readStream.on('data', (chunk) => {
console.log('收到数据块:', chunk.length, '字节');
});

// 优势:内存占用小,可以处理任意大小的文件

解释

  • 传统方式需要将整个文件加载到内存,对于大文件会造成内存溢出
  • 流式处理每次只处理一小块数据,内存占用恒定
  • 流可以立即开始处理数据,不需要等待全部数据加载完成

流的四种类型

Node.js 中有四种基本的流类型:

类型说明示例
Readable可读流fs.createReadStream
Writable可写流fs.createWriteStream
Duplex双工流(可读可写)net.Socket
Transform转换流(可修改数据)zlib.createGzip

1. Readable(可读流)

可读流是从中读取数据的流。

常见的可读流

  • fs.createReadStream() - 文件读取流
  • HTTP 请求(客户端接收响应)
  • process.stdin - 标准输入
const fs = require('fs');

// 创建可读流
const readStream = fs.createReadStream('input.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 每次读取 64KB(默认)
});

// 监听 data 事件接收数据
readStream.on('data', (chunk) => {
console.log('读取到数据:', chunk.length, '字节');
});

// 监听 end 事件
readStream.on('end', () => {
console.log('读取完成');
});

// 监听错误事件
readStream.on('error', (err) => {
console.error('读取错误:', err);
});

解释

  • highWaterMark 指定内部缓冲区的大小
  • data 事件在每次有数据可读时触发
  • end 事件在所有数据读取完成后触发
  • 始终要处理 error 事件,避免程序崩溃

2. Writable(可写流)

可写流是向其写入数据的流。

常见的可写流

  • fs.createWriteStream() - 文件写入流
  • HTTP 响应(服务端发送响应)
  • process.stdout - 标准输出
const fs = require('fs');

// 创建可写流
const writeStream = fs.createWriteStream('output.txt');

// 写入数据
writeStream.write('第一行数据\n');
writeStream.write('第二行数据\n');

// 结束写入
writeStream.end('最后一行数据\n');

// 监听完成事件
writeStream.on('finish', () => {
console.log('写入完成');
});

// 监听错误事件
writeStream.on('error', (err) => {
console.error('写入错误:', err);
});

解释

  • write() 方法返回一个布尔值,表示是否可以继续写入
  • end() 方法结束写入,之后不能再调用 write()
  • finish 事件在所有数据写入完成后触发

3. Duplex(双工流)

双工流既是可读的也是可写的。

常见的双工流

  • TCP Socket
  • WebSocket
const { Duplex } = require('stream');

// 创建自定义双工流
const duplex = new Duplex({
write(chunk, encoding, callback) {
console.log('写入:', chunk.toString());
callback();
},
read(size) {
this.push('可读数据');
this.push(null); // 结束读取
}
});

// 可以同时读取和写入
duplex.write('Hello');
duplex.on('data', (chunk) => {
console.log('读取:', chunk.toString());
});

4. Transform(转换流)

转换流是一种特殊的双工流,可以在读写过程中修改或转换数据。

常见的转换流

  • zlib.createGzip() - 压缩流
  • crypto.createCipheriv() - 加密流
const { Transform } = require('stream');

// 创建转换流:将数据转为大写
const upperCase = new Transform({
transform(chunk, encoding, callback) {
// 转换数据
const result = chunk.toString().toUpperCase();
// 推送转换后的数据
callback(null, result);
}
});

// 使用
process.stdin.pipe(upperCase).pipe(process.stdout);
// 输入: hello
// 输出: HELLO

解释

  • transform 方法接收输入数据,处理后调用 callback 输出结果
  • callback(null, result) 第一个参数是错误,第二个是转换后的数据
  • 转换流常用于数据处理管道中

管道(pipe)

管道是将可读流的输出直接连接到可写流的输入的方法。

基本用法

const fs = require('fs');

// 将一个文件的内容复制到另一个文件
const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('destination.txt');

readStream.pipe(writeStream);

writeStream.on('finish', () => {
console.log('复制完成');
});

解释

  • pipe() 方法自动管理数据流动和背压
  • 它返回目标流,支持链式调用
  • 自动处理错误和关闭事件

链式管道

const fs = require('fs');
const zlib = require('zlib');

// 读取文件 -> 压缩 -> 写入新文件
fs.createReadStream('input.txt')
.pipe(zlib.createGzip()) // Gzip 压缩
.pipe(fs.createWriteStream('input.txt.gz'));

console.log('文件压缩完成');

实际应用:解压文件

fs.createReadStream('input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('input-unzipped.txt'));

管道链:处理日志

const fs = require('fs');
const { Transform } = require('stream');
const zlib = require('zlib');

// 创建转换流:提取错误日志
const errorFilter = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
const errors = lines.filter(line => line.includes('ERROR'));
callback(null, errors.join('\n') + '\n');
}
});

// 读取日志 -> 过滤错误 -> 压缩 -> 保存
fs.createReadStream('app.log')
.pipe(errorFilter)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('errors.log.gz'));

流的事件

可读流事件

const fs = require('fs');
const readStream = fs.createReadStream('file.txt');

// data - 当有数据可读时触发
readStream.on('data', (chunk) => {
console.log('收到数据块');
});

// end - 当没有更多数据可读时触发
readStream.on('end', () => {
console.log('读取完成');
});

// error - 当接收数据出错时触发
readStream.on('error', (err) => {
console.error('错误:', err);
});

// close - 当流关闭时触发
readStream.on('close', () => {
console.log('流已关闭');
});

// open - 当文件打开时触发(文件流特有)
readStream.on('open', (fd) => {
console.log('文件描述符:', fd);
});

// readable - 当流中有数据可读时触发(暂停模式)
readStream.on('readable', () => {
let chunk;
while (null !== (chunk = readStream.read())) {
console.log('读取到:', chunk.toString());
}
});

可写流事件

const fs = require('fs');
const writeStream = fs.createWriteStream('output.txt');

// drain - 当可以继续写入数据时触发
writeStream.on('drain', () => {
console.log('缓冲区已清空,可以继续写入');
});

// finish - 当所有数据写入完成时触发
writeStream.on('finish', () => {
console.log('写入完成');
});

// error - 当写入出错时触发
writeStream.on('error', (err) => {
console.error('写入错误:', err);
});

// close - 当流关闭时触发
writeStream.on('close', () => {
console.log('流已关闭');
});

// pipe - 当可读流通过 pipe 连接到此流时触发
writeStream.on('pipe', (src) => {
console.log('有数据流进入');
});

// unpipe - 当可读流取消 pipe 连接时触发
writeStream.on('unpipe', (src) => {
console.log('数据流已断开');
});

流的两种模式

可读流有两种操作模式:流动模式(flowing)和 暂停模式(paused)。

流动模式

在流动模式下,数据自动从底层系统读取,并通过事件尽快提供给应用程序。

const fs = require('fs');
const readStream = fs.createReadStream('file.txt');

// 添加 data 监听器后,流进入流动模式
readStream.on('data', (chunk) => {
console.log('收到数据:', chunk.length);
});

暂停模式

在暂停模式下,必须显式调用 read() 方法来读取数据。

const fs = require('fs');
const readStream = fs.createReadStream('file.txt');

// 使用 readable 事件和 read() 方法
readStream.on('readable', () => {
let chunk;
// 循环读取所有可用数据
while (null !== (chunk = readStream.read())) {
console.log('读取到:', chunk.toString());
}
});

模式切换

const readStream = fs.createReadStream('file.txt');

// 切换到流动模式
readStream.resume();

// 切换到暂停模式
readStream.pause();

// 使用 pipe() 会自动切换到流动模式
readStream.pipe(writeStream);

解释

  • 添加 data 事件监听器会自动切换到流动模式
  • 调用 pause() 方法切换到暂停模式
  • 调用 resume() 方法切换到流动模式
  • 使用 pipe() 会自动管理流模式

背压(Backpressure)

背压是指当写入速度超过读取速度时,数据会在缓冲区中累积。Node.js 流提供了自动处理背压的机制。

问题示例

const fs = require('fs');

const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('dest.txt');

// 错误示例:忽略背压
readStream.on('data', (chunk) => {
// 如果写入速度跟不上读取速度,缓冲区会越来越大
writeStream.write(chunk);
});
// 问题:可能导致内存溢出

正确处理背压

const fs = require('fs');

const readStream = fs.createReadStream('source.txt');
const writeStream = fs.createWriteStream('dest.txt');

// 方式一:使用 pipe()(推荐)
readStream.pipe(writeStream);

// 方式二:手动处理
readStream.on('data', (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
// 缓冲区满了,暂停读取
readStream.pause();
}
});

writeStream.on('drain', () => {
// 缓冲区清空,恢复读取
readStream.resume();
});

readStream.on('end', () => {
writeStream.end();
});

解释

  • write() 返回 false 表示缓冲区已满
  • drain 事件表示缓冲区已清空,可以继续写入
  • 使用 pipe() 可以自动处理背压问题

实用工具函数

stream.pipeline

pipeline 用于连接多个流,自动处理错误和清理资源。

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// 使用回调
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('管道失败:', err);
} else {
console.log('管道成功');
}
}
);

stream.pipeline(Promise 版本)

const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');

async function compress() {
try {
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz')
);
console.log('压缩完成');
} catch (err) {
console.error('压缩失败:', err);
}
}

compress();

stream.finished

finished 用于等待流结束。

const { finished } = require('stream/promises');
const fs = require('fs');

async function readToEnd() {
const rs = fs.createReadStream('file.txt');

// 等待流结束
await finished(rs);
console.log('流已结束');

// 记得消费流数据
rs.resume();
}

stream.Readable.from

从迭代器创建可读流。

const { Readable } = require('stream');

// 从数组创建
const readable = Readable.from(['a', 'b', 'c']);

readable.on('data', (chunk) => {
console.log(chunk); // 'a', 'b', 'c'
});

// 从异步生成器创建
async function* generate() {
yield 'Hello';
yield 'World';
}

const asyncReadable = Readable.from(generate());

实战示例

进度条复制文件

const fs = require('fs');

async function copyWithProgress(src, dest) {
const { pipeline } = require('stream/promises');

const stat = await fs.promises.stat(src);
let copied = 0;

const readStream = fs.createReadStream(src);
const writeStream = fs.createWriteStream(dest);

readStream.on('data', (chunk) => {
copied += chunk.length;
const percent = (copied / stat.size * 100).toFixed(2);
process.stdout.write(`\r复制进度: ${percent}%`);
});

await pipeline(readStream, writeStream);
console.log('\n复制完成');
}

copyWithProgress('big-file.mp4', 'big-file-copy.mp4');

逐行处理文件

const fs = require('fs');
const readline = require('readline');

async function processLineByLine() {
const fileStream = fs.createReadStream('file.txt');

const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity // 处理不同平台的换行符
});

for await (const line of rl) {
console.log(`行内容: ${line}`);
}
}

processLineByLine();

创建自定义转换流

const { Transform } = require('stream');

// JSON 解析转换流
class JSONParser extends Transform {
constructor() {
super({ objectMode: true });
this.buffer = '';
}

_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
callback();
}

_flush(callback) {
try {
const data = JSON.parse(this.buffer);
this.push(data);
callback();
} catch (err) {
callback(err);
}
}
}

// 使用
fs.createReadStream('data.json')
.pipe(new JSONParser())
.on('data', (data) => {
console.log('解析结果:', data);
});

HTTP 流式上传

const http = require('http');
const fs = require('fs');

const server = http.createServer((req, res) => {
if (req.method === 'POST') {
// 直接将请求流保存到文件
const fileStream = fs.createWriteStream('uploaded-file');

req.pipe(fileStream);

fileStream.on('finish', () => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('上传成功');
});

fileStream.on('error', (err) => {
res.writeHead(500);
res.end('上传失败');
});
}
});

server.listen(3000);

最佳实践

1. 始终处理错误

const readStream = fs.createReadStream('file.txt');

// ✅ 推荐:监听错误事件
readStream.on('error', (err) => {
console.error('读取失败:', err);
});

// ✅ 推荐:使用 pipeline 自动处理错误
await pipeline(
fs.createReadStream('file.txt'),
asyncTransform,
fs.createWriteStream('output.txt')
);

2. 清理资源

// 使用 AbortController 取消流
const controller = new AbortController();
const { signal } = controller;

const readStream = fs.createReadStream('file.txt', { signal });

// 需要取消时
controller.abort();

3. 选择合适的 highWaterMark

// 对于大文件处理,增大缓冲区提高性能
const readStream = fs.createReadStream('big-file.txt', {
highWaterMark: 1024 * 1024 // 1MB
});

// 对于实时数据,减小缓冲区降低延迟
const lowLatencyStream = fs.createReadStream('realtime.txt', {
highWaterMark: 1024 // 1KB
});

4. 使用 async iterator

const fs = require('fs');
const { Readable } = require('stream');

// Node.js 10+ 支持异步迭代
async function processStream() {
const readStream = fs.createReadStream('file.txt', 'utf8');

for await (const chunk of readStream) {
console.log('收到数据块:', chunk.length);
}
}

小结

本章我们学习了:

  1. 流的基本概念:为什么使用流、流的优势
  2. 四种流类型:Readable、Writable、Duplex、Transform
  3. 管道操作:使用 pipe() 连接流
  4. 流的事件:data、end、error、drain 等
  5. 流的两种模式:流动模式和暂停模式
  6. 背压处理:如何处理读写速度不匹配
  7. 实用工具函数:pipeline、finished、Readable.from
  8. 实战示例:文件复制、逐行处理、自定义转换流

练习

  1. 使用流实现大文件复制,并显示进度
  2. 创建一个转换流,将文件中的所有小写字母转为大写
  3. 使用流处理 CSV 文件,逐行解析
  4. 实现一个日志收集系统,将多个日志文件合并并压缩
  5. 使用 pipeline 实现文件加密功能