跳到主要内容

异步编程

异步编程是 Node.js 的核心特性。理解异步编程模式对于开发高性能 Node.js 应用至关重要。

为什么需要异步

同步 vs 异步

// 同步执行(阻塞)
const fs = require('node:fs');

console.log('开始读取');
const data = fs.readFileSync('file.txt', 'utf8'); // 阻塞等待
console.log('读取完成');
console.log('继续执行');

// 输出顺序:
// 开始读取
// 读取完成
// 继续执行
// 异步执行(非阻塞)
const fs = require('node:fs');

console.log('开始读取');
fs.readFile('file.txt', 'utf8', (err, data) => {
console.log('读取完成');
});
console.log('继续执行');

// 输出顺序:
// 开始读取
// 继续执行
// 读取完成

Node.js 事件循环

事件循环阶段:

回调函数

基本模式

const fs = require('node:fs');

// 回调函数模式:Error-First Callback
fs.readFile('file.txt', 'utf8', (err, data) => {
if (err) {
console.error('读取失败:', err);
return;
}
console.log('文件内容:', data);
});

回调地狱

多层嵌套的回调会导致代码难以阅读和维护:

// 回调地狱示例
fs.readFile('file1.txt', 'utf8', (err, data1) => {
if (err) throw err;

fs.readFile('file2.txt', 'utf8', (err, data2) => {
if (err) throw err;

fs.writeFile('output.txt', data1 + data2, (err) => {
if (err) throw err;

console.log('合并完成');
});
});
});

解决回调地狱

使用命名函数拆分:

function readFirst(callback) {
fs.readFile('file1.txt', 'utf8', callback);
}

function readSecond(data1, callback) {
fs.readFile('file2.txt', 'utf8', (err, data2) => {
if (err) return callback(err);
callback(null, data1, data2);
});
}

function writeOutput(data1, data2, callback) {
fs.writeFile('output.txt', data1 + data2, callback);
}

// 使用
readFirst((err, data1) => {
if (err) throw err;
readSecond(data1, (err, data1, data2) => {
if (err) throw err;
writeOutput(data1, data2, (err) => {
if (err) throw err;
console.log('合并完成');
});
});
});

Promise

创建 Promise

// 基本创建
const promise = new Promise((resolve, reject) => {
// 异步操作
setTimeout(() => {
const success = true;

if (success) {
resolve('操作成功');
} else {
reject(new Error('操作失败'));
}
}, 1000);
});

// 使用
promise
.then(result => {
console.log('成功:', result);
})
.catch(error => {
console.error('失败:', error);
})
.finally(() => {
console.log('完成');
});

封装回调函数

const fs = require('node:fs');

// 封装 fs.readFile
function readFile(path, encoding = 'utf8') {
return new Promise((resolve, reject) => {
fs.readFile(path, encoding, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
}

// 使用
readFile('file.txt')
.then(data => console.log(data))
.catch(err => console.error(err));

Promise 链式调用

readFile('file1.txt')
.then(data1 => {
console.log('文件1:', data1);
return readFile('file2.txt'); // 返回新的 Promise
})
.then(data2 => {
console.log('文件2:', data2);
return writeFile('output.txt', data2); // 返回新的 Promise
})
.then(() => {
console.log('写入完成');
})
.catch(err => {
console.error('出错了:', err);
});

Promise 静态方法

// Promise.resolve / Promise.reject
const p1 = Promise.resolve('立即成功');
const p2 = Promise.reject(new Error('立即失败'));

// Promise.all - 全部成功才成功
Promise.all([
fetch('/api/users'),
fetch('/api/posts'),
])
.then(([users, posts]) => {
console.log('两个请求都完成了');
})
.catch(err => {
console.error('有一个请求失败');
});

// Promise.allSettled - 等待所有 Promise 完成
Promise.allSettled([
Promise.resolve('成功1'),
Promise.reject('失败1'),
])
.then(results => {
console.log(results);
// [
// { status: 'fulfilled', value: '成功1' },
// { status: 'rejected', reason: '失败1' }
// ]
});

// Promise.race - 第一个完成的结果
Promise.race([
fetch('/api/fast'),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('超时')), 5000)
),
])
.then(response => console.log('快速响应'))
.catch(err => console.error('超时'));

// Promise.any - 第一个成功的(ES2021)
Promise.any([
Promise.reject('失败1'),
Promise.resolve('成功1'),
Promise.resolve('成功2'),
])
.then(result => console.log(result)) // '成功1'
.catch(err => console.error('全部失败'));

async/await

基本语法

const fs = require('node:fs/promises');

// async 函数返回 Promise
async function readFile() {
try {
// await 等待 Promise 完成
const data = await fs.readFile('file.txt', 'utf8');
console.log('文件内容:', data);
return data;
} catch (err) {
console.error('读取失败:', err);
throw err;
}
}

// 调用 async 函数
readFile()
.then(data => console.log('返回:', data))
.catch(err => console.error('捕获:', err));

错误处理

async function processFiles() {
try {
const data1 = await fs.readFile('file1.txt', 'utf8');
const data2 = await fs.readFile('file2.txt', 'utf8');

await fs.writeFile('output.txt', data1 + data2);
console.log('合并完成');
} catch (err) {
console.error('处理失败:', err);
}
}

并行执行

// 串行执行(一个接一个)
async function serial() {
const start = Date.now();

await new Promise(r => setTimeout(r, 1000));
await new Promise(r => setTimeout(r, 1000));

console.log(`串行耗时: ${Date.now() - start}ms`); // ~2000ms
}

// 并行执行
async function parallel() {
const start = Date.now();

await Promise.all([
new Promise(r => setTimeout(r, 1000)),
new Promise(r => setTimeout(r, 1000)),
]);

console.log(`并行耗时: ${Date.now() - start}ms`); // ~1000ms
}

循环中的异步

// 错误:forEach 不会等待 async
async function wrong() {
[1, 2, 3].forEach(async (n) => {
await new Promise(r => setTimeout(r, 100));
console.log(n);
});
console.log('done'); // 先输出 'done'
}

// 正确:使用 for...of
async function correct() {
for (const n of [1, 2, 3]) {
await new Promise(r => setTimeout(r, 100));
console.log(n);
}
console.log('done'); // 最后输出 'done'
}

// 并行处理
async function parallel() {
await Promise.all([1, 2, 3].map(async (n) => {
await new Promise(r => setTimeout(r, 100));
console.log(n);
}));
console.log('done');
}

异步工具函数

promisify

将回调风格的函数转换为 Promise:

const util = require('node:util');
const fs = require('node:fs');

// 手动 promisify
const readFile = util.promisify(fs.readFile);

// 使用
readFile('file.txt', 'utf8')
.then(data => console.log(data))
.catch(err => console.error(err));

自定义 promisify

const util = require('node:util');

// 自定义函数
function delay(ms, callback) {
setTimeout(() => {
callback(null, `等待了 ${ms}ms`);
}, ms);
}

// 转换
const delayAsync = util.promisify(delay);

// 使用
await delayAsync(1000); // '等待了 1000ms'

callbackify

将 Promise 转换为回调风格:

const util = require('node:util');

async function asyncFunction() {
return 'result';
}

const callbackFunction = util.callbackify(asyncFunction);

callbackFunction((err, result) => {
console.log(result); // 'result'
});

实际应用示例

API 请求封装

const https = require('node:https');

function fetch(url) {
return new Promise((resolve, reject) => {
https.get(url, (res) => {
let data = '';

res.on('data', chunk => data += chunk);
res.on('end', () => {
try {
resolve(JSON.parse(data));
} catch (err) {
reject(new Error('JSON 解析失败'));
}
});
}).on('error', reject);
});
}

// 使用
async function main() {
try {
const user = await fetch('https://api.github.com/users/nodejs');
console.log(user.login);
} catch (err) {
console.error('请求失败:', err);
}
}

数据库操作

const { Pool } = require('pg');

const pool = new Pool({
connectionString: 'postgresql://user:pass@localhost/db'
});

// 封装查询
async function query(sql, params = []) {
const client = await pool.connect();
try {
const result = await client.query(sql, params);
return result.rows;
} finally {
client.release();
}
}

// 使用
async function getUsers() {
const users = await query('SELECT * FROM users');
return users;
}

文件处理管道

const fs = require('node:fs/promises');
const path = require('node:path');

async function processDirectory(dir) {
const files = await fs.readdir(dir);

const results = await Promise.all(
files.map(async (file) => {
const filePath = path.join(dir, file);
const stat = await fs.stat(filePath);

return {
name: file,
path: filePath,
size: stat.size,
isDirectory: stat.isDirectory(),
};
})
);

return results;
}

小结

本章我们学习了:

  1. 异步原理:事件循环、非阻塞 I/O
  2. 回调函数:Error-First 模式、回调地狱
  3. Promise:创建、链式调用、静态方法
  4. async/await:语法、错误处理、并行执行
  5. 工具函数:promisify、callbackify

练习

  1. 封装一个支持重试的异步请求函数
  2. 使用 async/await 重写回调地狱的代码
  3. 实现一个并发限制的 Promise 队列
  4. 使用 Promise.all 优化多个独立异步操作