异步编程
异步编程是 Node.js 的核心特性。理解异步编程模式对于开发高性能 Node.js 应用至关重要。
为什么需要异步
同步 vs 异步
// 同步执行(阻塞)
const fs = require('node:fs');
console.log('开始读取');
const data = fs.readFileSync('file.txt', 'utf8'); // 阻塞等待
console.log('读取完成');
console.log('继续执行');
// 输出顺序:
// 开始读取
// 读取完成
// 继续执行
// 异步执行(非阻塞)
const fs = require('node:fs');
console.log('开始读取');
fs.readFile('file.txt', 'utf8', (err, data) => {
console.log('读取完成');
});
console.log('继续执行');
// 输出顺序:
// 开始读取
// 继续执行
// 读取完成
Node.js 事件循环
事件循环阶段:
回调函数
基本模式
const fs = require('node:fs');
// 回调函数模式:Error-First Callback
fs.readFile('file.txt', 'utf8', (err, data) => {
if (err) {
console.error('读取失败:', err);
return;
}
console.log('文件内容:', data);
});
回调地狱
多层嵌套的回调会导致代码难以阅读和维护:
// 回调地狱示例
fs.readFile('file1.txt', 'utf8', (err, data1) => {
if (err) throw err;
fs.readFile('file2.txt', 'utf8', (err, data2) => {
if (err) throw err;
fs.writeFile('output.txt', data1 + data2, (err) => {
if (err) throw err;
console.log('合并完成');
});
});
});
解决回调地狱
使用命名函数拆分:
function readFirst(callback) {
fs.readFile('file1.txt', 'utf8', callback);
}
function readSecond(data1, callback) {
fs.readFile('file2.txt', 'utf8', (err, data2) => {
if (err) return callback(err);
callback(null, data1, data2);
});
}
function writeOutput(data1, data2, callback) {
fs.writeFile('output.txt', data1 + data2, callback);
}
// 使用
readFirst((err, data1) => {
if (err) throw err;
readSecond(data1, (err, data1, data2) => {
if (err) throw err;
writeOutput(data1, data2, (err) => {
if (err) throw err;
console.log('合并完成');
});
});
});
Promise
创建 Promise
// 基本创建
const promise = new Promise((resolve, reject) => {
// 异步操作
setTimeout(() => {
const success = true;
if (success) {
resolve('操作成功');
} else {
reject(new Error('操作失败'));
}
}, 1000);
});
// 使用
promise
.then(result => {
console.log('成功:', result);
})
.catch(error => {
console.error('失败:', error);
})
.finally(() => {
console.log('完成');
});
封装回调函数
const fs = require('node:fs');
// 封装 fs.readFile
function readFile(path, encoding = 'utf8') {
return new Promise((resolve, reject) => {
fs.readFile(path, encoding, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
}
// 使用
readFile('file.txt')
.then(data => console.log(data))
.catch(err => console.error(err));
Promise 链式调用
readFile('file1.txt')
.then(data1 => {
console.log('文件1:', data1);
return readFile('file2.txt'); // 返回新的 Promise
})
.then(data2 => {
console.log('文件2:', data2);
return writeFile('output.txt', data2); // 返回新的 Promise
})
.then(() => {
console.log('写入完成');
})
.catch(err => {
console.error('出错了:', err);
});
Promise 静态方法
// Promise.resolve / Promise.reject
const p1 = Promise.resolve('立即成功');
const p2 = Promise.reject(new Error('立即失败'));
// Promise.all - 全部成功才成功
Promise.all([
fetch('/api/users'),
fetch('/api/posts'),
])
.then(([users, posts]) => {
console.log('两个请求都完成了');
})
.catch(err => {
console.error('有一个请求失败');
});
// Promise.allSettled - 等待所有 Promise 完成
Promise.allSettled([
Promise.resolve('成功1'),
Promise.reject('失败1'),
])
.then(results => {
console.log(results);
// [
// { status: 'fulfilled', value: '成功1' },
// { status: 'rejected', reason: '失败1' }
// ]
});
// Promise.race - 第一个完成的结果
Promise.race([
fetch('/api/fast'),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('超时')), 5000)
),
])
.then(response => console.log('快速响应'))
.catch(err => console.error('超时'));
// Promise.any - 第一个成功的(ES2021)
Promise.any([
Promise.reject('失败1'),
Promise.resolve('成功1'),
Promise.resolve('成功2'),
])
.then(result => console.log(result)) // '成功1'
.catch(err => console.error('全部失败'));
async/await
基本语法
const fs = require('node:fs/promises');
// async 函数返回 Promise
async function readFile() {
try {
// await 等待 Promise 完成
const data = await fs.readFile('file.txt', 'utf8');
console.log('文件内容:', data);
return data;
} catch (err) {
console.error('读取失败:', err);
throw err;
}
}
// 调用 async 函数
readFile()
.then(data => console.log('返回:', data))
.catch(err => console.error('捕获:', err));
错误处理
async function processFiles() {
try {
const data1 = await fs.readFile('file1.txt', 'utf8');
const data2 = await fs.readFile('file2.txt', 'utf8');
await fs.writeFile('output.txt', data1 + data2);
console.log('合并完成');
} catch (err) {
console.error('处理失败:', err);
}
}
并行执行
// 串行执行(一个接一个)
async function serial() {
const start = Date.now();
await new Promise(r => setTimeout(r, 1000));
await new Promise(r => setTimeout(r, 1000));
console.log(`串行耗时: ${Date.now() - start}ms`); // ~2000ms
}
// 并行执行
async function parallel() {
const start = Date.now();
await Promise.all([
new Promise(r => setTimeout(r, 1000)),
new Promise(r => setTimeout(r, 1000)),
]);
console.log(`并行耗时: ${Date.now() - start}ms`); // ~1000ms
}
循环中的异步
// 错误:forEach 不会等待 async
async function wrong() {
[1, 2, 3].forEach(async (n) => {
await new Promise(r => setTimeout(r, 100));
console.log(n);
});
console.log('done'); // 先输出 'done'
}
// 正确:使用 for...of
async function correct() {
for (const n of [1, 2, 3]) {
await new Promise(r => setTimeout(r, 100));
console.log(n);
}
console.log('done'); // 最后输出 'done'
}
// 并行处理
async function parallel() {
await Promise.all([1, 2, 3].map(async (n) => {
await new Promise(r => setTimeout(r, 100));
console.log(n);
}));
console.log('done');
}
异步工具函数
promisify
将回调风格的函数转换为 Promise:
const util = require('node:util');
const fs = require('node:fs');
// 手动 promisify
const readFile = util.promisify(fs.readFile);
// 使用
readFile('file.txt', 'utf8')
.then(data => console.log(data))
.catch(err => console.error(err));
自定义 promisify
const util = require('node:util');
// 自定义函数
function delay(ms, callback) {
setTimeout(() => {
callback(null, `等待了 ${ms}ms`);
}, ms);
}
// 转换
const delayAsync = util.promisify(delay);
// 使用
await delayAsync(1000); // '等待了 1000ms'
callbackify
将 Promise 转换为回调风格:
const util = require('node:util');
async function asyncFunction() {
return 'result';
}
const callbackFunction = util.callbackify(asyncFunction);
callbackFunction((err, result) => {
console.log(result); // 'result'
});
实际应用示例
API 请求封装
const https = require('node:https');
function fetch(url) {
return new Promise((resolve, reject) => {
https.get(url, (res) => {
let data = '';
res.on('data', chunk => data += chunk);
res.on('end', () => {
try {
resolve(JSON.parse(data));
} catch (err) {
reject(new Error('JSON 解析失败'));
}
});
}).on('error', reject);
});
}
// 使用
async function main() {
try {
const user = await fetch('https://api.github.com/users/nodejs');
console.log(user.login);
} catch (err) {
console.error('请求失败:', err);
}
}
数据库操作
const { Pool } = require('pg');
const pool = new Pool({
connectionString: 'postgresql://user:pass@localhost/db'
});
// 封装查询
async function query(sql, params = []) {
const client = await pool.connect();
try {
const result = await client.query(sql, params);
return result.rows;
} finally {
client.release();
}
}
// 使用
async function getUsers() {
const users = await query('SELECT * FROM users');
return users;
}
文件处理管道
const fs = require('node:fs/promises');
const path = require('node:path');
async function processDirectory(dir) {
const files = await fs.readdir(dir);
const results = await Promise.all(
files.map(async (file) => {
const filePath = path.join(dir, file);
const stat = await fs.stat(filePath);
return {
name: file,
path: filePath,
size: stat.size,
isDirectory: stat.isDirectory(),
};
})
);
return results;
}
小结
本章我们学习了:
- 异步原理:事件循环、非阻塞 I/O
- 回调函数:Error-First 模式、回调地狱
- Promise:创建、链式调用、静态方法
- async/await:语法、错误处理、并行执行
- 工具函数:promisify、callbackify
练习
- 封装一个支持重试的异步请求函数
- 使用 async/await 重写回调地狱的代码
- 实现一个并发限制的 Promise 队列
- 使用 Promise.all 优化多个独立异步操作