Node.js gRPC 开发
本章详细介绍如何使用 Node.js 开发 gRPC 服务端和客户端。Node.js gRPC 有两种实现方式:动态加载 proto 文件和静态代码生成。本教程使用官方推荐的 @grpc/grpc-js 库,这是一个纯 JavaScript 实现。
环境准备
安装依赖
# 创建项目
mkdir grpc-nodejs-demo
cd grpc-nodejs-demo
npm init -y
# 安装核心库(纯 JS 实现,推荐)
npm install @grpc/grpc-js
# 安装 proto 加载器
npm install @grpc/proto-loader
# 可选:安装 protobufjs(用于静态代码生成)
npm install protobufjs
项目结构
grpc-nodejs-demo/
├── proto/ # Protocol Buffers 定义
│ └── greeter.proto
├── server/ # 服务端代码
│ └── server.js
├── client/ # 客户端代码
│ └── client.js
└── package.json
Proto 定义
// proto/greeter.proto
syntax = "proto3";
package greeter;
// 问候服务
service Greeter {
// 一元 RPC:发送问候
rpc SayHello (HelloRequest) returns (HelloReply);
// 服务端流:返回多个问候
rpc SayHelloStream (HelloRequest) returns (stream HelloReply);
// 客户端流:接收多个请求
rpc SendGreetings (stream HelloRequest) returns (HelloReply);
// 双向流:实时聊天
rpc Chat (stream HelloRequest) returns (stream HelloReply);
}
message HelloRequest {
string name = 1;
int32 count = 2;
}
message HelloReply {
string message = 1;
int64 timestamp = 2;
}
服务端开发
基本服务实现
// server/server.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');
// 加载 proto 文件
const PROTO_PATH = path.join(__dirname, '../proto/greeter.proto');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true, // 保持字段名的原始大小写
longs: String, // 将 int64 转换为字符串
enums: String, // 将枚举转换为字符串
defaults: true, // 设置默认值
oneofs: true, // 支持 oneof
});
const proto = grpc.loadPackageDefinition(packageDefinition);
// 服务实现
const greeterService = {
// 一元 RPC
sayHello(call, callback) {
const { name } = call.request;
console.log(`收到请求: name=${name}`);
// 返回响应
callback(null, {
message: `你好, ${name}!`,
timestamp: Date.now(),
});
},
// 服务端流
sayHelloStream(call) {
const { name, count = 5 } = call.request;
console.log(`收到流请求: name=${name}, count=${count}`);
let sent = 0;
const interval = setInterval(() => {
if (sent >= count) {
clearInterval(interval);
call.end(); // 结束流
return;
}
sent++;
call.write({
message: `问候 ${sent}: 你好, ${name}!`,
timestamp: Date.now(),
});
}, 500);
},
// 客户端流
sendGreetings(call, callback) {
const names = [];
// 接收客户端发送的消息
call.on('data', (request) => {
console.log(`收到: name=${request.name}`);
names.push(request.name);
});
// 客户端发送完毕
call.on('end', () => {
callback(null, {
message: `收到了 ${names.length} 个问候,来自: ${names.join(', ')}`,
timestamp: Date.now(),
});
});
// 错误处理
call.on('error', (err) => {
console.error('流错误:', err);
});
},
// 双向流
chat(call) {
call.on('data', (request) => {
console.log(`聊天消息: ${request.name}`);
// 立即回复
call.write({
message: `[服务器] 收到你的消息: ${request.name}`,
timestamp: Date.now(),
});
});
call.on('end', () => {
call.end();
});
call.on('error', (err) => {
console.error('聊天错误:', err);
});
},
};
// 创建并启动服务器
function main() {
const server = new grpc.Server();
// 注册服务
server.addService(proto.greeter.Greeter.service, greeterService);
// 绑定端口
const address = '0.0.0.0:50051';
server.bindAsync(
address,
grpc.ServerCredentials.createInsecure(),
(err, port) => {
if (err) {
console.error('绑定失败:', err);
return;
}
console.log(`服务器启动,监听端口: ${port}`);
}
);
// 优雅关闭
process.on('SIGINT', () => {
console.log('\n正在关闭服务器...');
server.tryShutdown(() => {
console.log('服务器已关闭');
process.exit(0);
});
});
}
main();
服务端配置选项
const server = new grpc.Server({
'grpc.max_receive_message_length': 10 * 1024 * 1024, // 10MB
'grpc.max_send_message_length': 10 * 1024 * 1024, // 10MB
'grpc.keepalive_time_ms': 10000, // Keep-alive 间隔
'grpc.keepalive_timeout_ms': 5000, // Keep-alive 超时
'grpc.max_concurrent_streams': 100, // 最大并发流
});
使用拦截器
Node.js gRPC 可以通过包装服务方法实现拦截器功能:
// 拦截器工厂函数
function createLoggingInterceptor(serviceImplementation) {
const wrapped = {};
for (const [methodName, methodFn] of Object.entries(serviceImplementation)) {
wrapped[methodName] = function(call, callback) {
const start = Date.now();
console.log(`[请求] 方法: ${methodName}`);
// 包装回调
const wrappedCallback = callback ? (err, response) => {
const duration = Date.now() - start;
console.log(`[响应] 方法: ${methodName}, 耗时: ${duration}ms`);
callback(err, response);
} : null;
// 调用原方法
if (wrappedCallback) {
methodFn(call, wrappedCallback);
} else {
// 流式方法
methodFn(call);
}
};
}
return wrapped;
}
// 认证拦截器
function createAuthInterceptor(serviceImplementation, publicKey) {
const wrapped = {};
for (const [methodName, methodFn] of Object.entries(serviceImplementation)) {
wrapped[methodName] = function(call, callback) {
// 获取元数据
const metadata = call.metadata || call.call.metadata;
const authHeader = metadata.get('authorization')[0];
if (!authHeader) {
const error = {
code: grpc.status.UNAUTHENTICATED,
message: '缺少认证信息',
};
if (callback) {
callback(error);
} else {
call.emit('error', error);
}
return;
}
// 验证 token
try {
const decoded = verifyToken(authHeader, publicKey);
// 将用户信息添加到 call 对象
call.user = decoded;
methodFn(call, callback);
} catch (err) {
const error = {
code: grpc.status.UNAUTHENTICATED,
message: '无效的认证信息',
};
if (callback) {
callback(error);
} else {
call.emit('error', error);
}
}
};
}
return wrapped;
}
// 使用拦截器
const serviceWithInterceptors = createLoggingInterceptor(
createAuthInterceptor(greeterService, publicKey)
);
server.addService(proto.greeter.Greeter.service, serviceWithInterceptors);
客户端开发
基本客户端实现
// client/client.js
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
const path = require('path');
// 加载 proto 文件
const PROTO_PATH = path.join(__dirname, '../proto/greeter.proto');
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
});
const proto = grpc.loadPackageDefinition(packageDefinition);
// 创建客户端
const client = new proto.greeter.Greeter(
'localhost:50051',
grpc.credentials.createInsecure()
);
// 一元 RPC 调用
function callSayHello(name) {
console.log('\n=== 一元 RPC ===');
return new Promise((resolve, reject) => {
client.sayHello({ name }, (err, response) => {
if (err) {
console.error('调用失败:', err.message);
reject(err);
return;
}
console.log('响应:', response.message);
resolve(response);
});
});
}
// 服务端流调用
function callSayHelloStream(name, count) {
console.log('\n=== 服务端流 ===');
return new Promise((resolve, reject) => {
const call = client.sayHelloStream({ name, count });
call.on('data', (response) => {
console.log('收到:', response.message);
});
call.on('end', () => {
console.log('流结束');
resolve();
});
call.on('error', (err) => {
console.error('流错误:', err.message);
reject(err);
});
});
}
// 客户端流调用
function callSendGreetings(names) {
console.log('\n=== 客户端流 ===');
return new Promise((resolve, reject) => {
const call = client.sendGreetings((err, response) => {
if (err) {
console.error('调用失败:', err.message);
reject(err);
return;
}
console.log('响应:', response.message);
resolve(response);
});
// 发送多个请求
names.forEach((name, index) => {
setTimeout(() => {
console.log(`发送: ${name}`);
call.write({ name });
// 最后一个发送完毕后结束流
if (index === names.length - 1) {
call.end();
}
}, index * 200);
});
});
}
// 双向流调用
function callChat(messages) {
console.log('\n=== 双向流 ===');
return new Promise((resolve, reject) => {
const call = client.chat();
// 接收消息
call.on('data', (response) => {
console.log('收到:', response.message);
});
call.on('end', () => {
console.log('聊天结束');
resolve();
});
call.on('error', (err) => {
console.error('聊天错误:', err.message);
reject(err);
});
// 发送消息
messages.forEach((msg, index) => {
setTimeout(() => {
console.log(`发送: ${msg}`);
call.write({ name: msg });
if (index === messages.length - 1) {
call.end();
}
}, index * 500);
});
});
}
// 主函数
async function main() {
try {
await callSayHello('张三');
await callSayHelloStream('李四', 3);
await callSendGreetings(['王五', '赵六', '钱七']);
await callChat(['你好', '今天天气不错', '再见']);
} catch (err) {
console.error('测试失败:', err);
}
// 关闭连接(gRPC-js 会自动管理连接)
process.exit(0);
}
main();
客户端配置选项
// 创建带配置的客户端
const client = new proto.greeter.Greeter(
'localhost:50051',
grpc.credentials.createInsecure(),
{
'grpc.max_receive_message_length': 10 * 1024 * 1024,
'grpc.max_send_message_length': 10 * 1024 * 1024,
'grpc.keepalive_time_ms': 10000,
'grpc.keepalive_timeout_ms': 5000,
'grpc.keepalive_permit_without_calls': 1,
}
);
元数据传递
// 发送元数据
function callWithMetadata(name) {
const metadata = new grpc.Metadata();
metadata.set('authorization', 'Bearer my-token');
metadata.set('x-request-id', 'req-12345');
metadata.set('x-custom-header', 'custom-value');
client.sayHello({ name }, metadata, (err, response) => {
if (err) {
console.error('调用失败:', err.message);
return;
}
console.log('响应:', response.message);
});
}
// 接收响应元数据
function callWithResponseMetadata(name) {
const metadata = new grpc.Metadata();
metadata.set('authorization', 'Bearer my-token');
const call = client.sayHello({ name }, metadata, (err, response) => {
if (err) return;
console.log('响应:', response.message);
});
// 获取响应元数据
call.on('metadata', (metadata) => {
console.log('响应元数据:', metadata.getMap());
});
// 获取状态
call.on('status', (status) => {
console.log('状态:', status);
});
}
超时和取消
// 设置超时
function callWithTimeout(name, timeoutMs = 3000) {
const deadline = new Date(Date.now() + timeoutMs);
client.sayHello({ name }, { deadline }, (err, response) => {
if (err) {
if (err.code === grpc.status.DEADLINE_EXCEEDED) {
console.error('请求超时');
} else {
console.error('调用失败:', err.message);
}
return;
}
console.log('响应:', response.message);
});
}
// 取消请求
function callWithCancel(name, cancelAfterMs = 1000) {
const call = client.sayHelloStream({ name, count: 10 });
call.on('data', (response) => {
console.log('收到:', response.message);
});
call.on('error', (err) => {
if (err.code === grpc.status.CANCELLED) {
console.log('请求已取消');
} else {
console.error('错误:', err.message);
}
});
// 延迟取消
setTimeout(() => {
console.log('取消请求...');
call.cancel();
}, cancelAfterMs);
}
错误处理
服务端返回错误
const { status } = grpc;
const greeterService = {
sayHello(call, callback) {
const { name } = call.request;
// 参数验证
if (!name) {
return callback({
code: status.INVALID_ARGUMENT,
message: 'name 不能为空',
});
}
// 业务错误
if (name === 'error') {
return callback({
code: status.INTERNAL,
message: '内部处理错误',
});
}
// 权限错误
if (name === 'forbidden') {
return callback({
code: status.PERMISSION_DENIED,
message: '无权访问',
});
}
// 成功响应
callback(null, {
message: `你好, ${name}!`,
timestamp: Date.now(),
});
},
};
客户端处理错误
const { status } = grpc;
function handleError(err) {
if (!err) return;
console.log('错误码:', err.code);
console.log('错误消息:', err.message);
console.log('错误详情:', err.details);
// 根据错误码处理
switch (err.code) {
case status.INVALID_ARGUMENT:
console.log('参数错误,请检查输入');
break;
case status.NOT_FOUND:
console.log('资源不存在');
break;
case status.PERMISSION_DENIED:
console.log('权限不足');
break;
case status.UNAUTHENTICATED:
console.log('未认证,请先登录');
break;
case status.DEADLINE_EXCEEDED:
console.log('请求超时');
break;
case status.UNAVAILABLE:
console.log('服务不可用,请稍后重试');
break;
case status.INTERNAL:
console.log('服务器内部错误');
break;
default:
console.log('未知错误');
}
}
// 使用
client.sayHello({ name: 'error' }, (err, response) => {
if (err) {
handleError(err);
return;
}
console.log('响应:', response.message);
});
TLS 安全连接
服务端 TLS
const fs = require('fs');
// 加载证书
const serverCert = fs.readFileSync('./certs/server.crt');
const serverKey = fs.readFileSync('./certs/server.key');
// 创建凭证
const credentials = grpc.ServerCredentials.createSsl(
null, // CA 证书(双向 TLS 时需要)
[{
cert_chain: serverCert,
private_key: serverKey,
}],
false // 是否要求客户端证书
);
// 绑定端口
server.bindAsync(
'0.0.0.0:50051',
credentials,
(err, port) => {
if (err) {
console.error('绑定失败:', err);
return;
}
console.log(`安全服务器启动,端口: ${port}`);
}
);
客户端 TLS
const fs = require('fs');
// 加载 CA 证书
const caCert = fs.readFileSync('./certs/ca.crt');
// 创建凭证
const credentials = grpc.credentials.createSsl(caCert);
// 创建客户端
const client = new proto.greeter.Greeter(
'localhost:50051',
credentials
);
双向 TLS(mTLS)
// 服务端
const caCert = fs.readFileSync('./certs/ca.crt');
const serverCert = fs.readFileSync('./certs/server.crt');
const serverKey = fs.readFileSync('./certs/server.key');
const credentials = grpc.ServerCredentials.createSsl(
caCert, // CA 证书用于验证客户端
[{
cert_chain: serverCert,
private_key: serverKey,
}],
true // 要求客户端证书
);
// 客户端
const caCert = fs.readFileSync('./certs/ca.crt');
const clientCert = fs.readFileSync('./certs/client.crt');
const clientKey = fs.readFileSync('./certs/client.key');
const credentials = grpc.credentials.createSsl(
caCert,
clientKey,
clientCert
);
健康检查
服务端健康检查
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
// 加载健康检查 proto
const healthProto = protoLoader.loadSync(
path.join(__dirname, '../node_modules/@grpc/grpc-js/src/proto/health/v1/health.proto'),
{ keepCase: true, longs: String, enums: String }
);
const health = grpc.loadPackageDefinition(healthProto).grpc.health.v1;
// 健康检查服务实现
const healthService = {
check(call, callback) {
const service = call.request.service;
// 检查服务状态
const status = getServiceStatus(service);
if (status === 'UNKNOWN') {
return callback({
code: grpc.status.NOT_FOUND,
message: `服务 ${service} 不存在`,
});
}
callback(null, { status });
},
watch(call) {
// 流式健康检查(简化实现)
const service = call.request.service;
let status = getServiceStatus(service);
const interval = setInterval(() => {
const newStatus = getServiceStatus(service);
if (newStatus !== status) {
status = newStatus;
call.write({ status });
}
}, 1000);
call.on('cancelled', () => {
clearInterval(interval);
});
},
};
function getServiceStatus(service) {
// 实际项目中应该检查服务的真实状态
// 返回: 'UNKNOWN', 'SERVING', 'NOT_SERVING', 'SERVICE_UNKNOWN'
return 'SERVING';
}
// 注册健康检查服务
server.addService(health.Health.service, healthService);
客户端健康检查
async function checkHealth(client, service = '') {
return new Promise((resolve, reject) => {
client.check({ service }, (err, response) => {
if (err) {
if (err.code === grpc.status.NOT_FOUND) {
resolve({ healthy: false, status: 'SERVICE_UNKNOWN' });
} else {
reject(err);
}
return;
}
const isHealthy = response.status === 'SERVING';
resolve({ healthy: isHealthy, status: response.status });
});
});
}
TypeScript 支持
类型定义
# 安装类型定义
npm install -D @types/google-protobuf
TypeScript 示例
// src/client.ts
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import { PackageDefinition } from '@grpc/proto-loader';
interface HelloRequest {
name: string;
count?: number;
}
interface HelloReply {
message: string;
timestamp: number;
}
interface GreeterClient extends grpc.Client {
sayHello(
request: HelloRequest,
callback: grpc.requestCallback<HelloReply>
): grpc.ClientUnaryCall;
sayHelloStream(
request: HelloRequest
): grpc.ClientReadableStream<HelloReply>;
}
// 加载 proto
const packageDefinition: PackageDefinition = protoLoader.loadSync(
'./proto/greeter.proto',
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
}
);
const proto = grpc.loadPackageDefinition(packageDefinition);
// 创建客户端
const client = new proto.greeter.Greeter(
'localhost:50051',
grpc.credentials.createInsecure()
) as GreeterClient;
// 调用服务
client.sayHello({ name: 'TypeScript' }, (err, response) => {
if (err) {
console.error('错误:', err.message);
return;
}
console.log('响应:', response.message);
});
最佳实践
1. 连接复用
// 推荐:使用单例模式复用客户端
class GRPCClient {
constructor() {
if (GRPCClient.instance) {
return GRPCClient.instance;
}
this.client = new proto.greeter.Greeter(
'localhost:50051',
grpc.credentials.createInsecure()
);
GRPCClient.instance = this;
}
sayHello(name) {
return new Promise((resolve, reject) => {
this.client.sayHello({ name }, (err, response) => {
if (err) reject(err);
else resolve(response);
});
});
}
}
module.exports = new GRPCClient();
2. 错误处理封装
// 封装 gRPC 调用
async function grpcCall(fn, ...args) {
return new Promise((resolve, reject) => {
fn(...args, (err, response) => {
if (err) {
// 统一错误处理
const error = {
code: err.code,
message: err.message,
isRetryable: [grpc.status.UNAVAILABLE, grpc.status.DEADLINE_EXCEEDED]
.includes(err.code),
};
reject(error);
} else {
resolve(response);
}
});
});
}
// 使用
try {
const response = await grpcCall(client.sayHello.bind(client), { name: 'Test' });
console.log(response.message);
} catch (err) {
if (err.isRetryable) {
console.log('可以重试');
}
}
3. 负载均衡
// 使用 DNS 轮询实现简单的负载均衡
const client = new proto.greeter.Greeter(
'dns:///my-service.example.com:50051',
grpc.credentials.createInsecure(),
{
'grpc.default_authority': 'my-service.example.com',
'grpc.lb_policy_name': 'round_robin',
}
);
4. 优雅关闭
// 服务端优雅关闭
function setupGracefulShutdown(server) {
let isShuttingDown = false;
const shutdown = () => {
if (isShuttingDown) return;
isShuttingDown = true;
console.log('\n正在关闭服务器...');
// 停止接受新请求
server.tryShutdown(() => {
console.log('服务器已关闭');
process.exit(0);
});
// 强制关闭超时
setTimeout(() => {
console.log('强制关闭');
server.forceShutdown();
process.exit(1);
}, 10000);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
}
5. 性能优化
// 避免频繁创建客户端
// 推荐:全局共享客户端实例
// 使用 Promise 封装异步调用
function promisifyCall(client, methodName, request, metadata = {}) {
return new Promise((resolve, reject) => {
const call = client[methodName].bind(client);
call(request, metadata, (err, response) => {
if (err) reject(err);
else resolve(response);
});
});
}
// 批量处理请求
async function batchProcess(client, requests) {
const results = await Promise.all(
requests.map(req => promisifyCall(client, 'sayHello', req))
);
return results;
}
动态加载 vs 静态生成
动态加载(推荐开发环境)
// 运行时加载 proto 文件
const packageDefinition = protoLoader.loadSync('./greeter.proto', options);
const proto = grpc.loadPackageDefinition(packageDefinition);
优点:
- 开发简单,无需构建步骤
- proto 文件修改后无需重新生成代码
缺点:
- 运行时开销
- 无类型检查
静态代码生成(推荐生产环境)
# 安装工具
npm install -D grpc-tools grpc_tools_node_protoc_ts
# 生成 JavaScript 代码
grpc_tools_node_protoc \
--js_out=import_style=commonjs,binary:./generated \
--grpc_out=./generated \
--plugin=protoc-gen-grpc=`which grpc_tools_node_protoc_plugin` \
-I./proto \
./proto/greeter.proto
# 生成 TypeScript 定义
grpc_tools_node_protoc \
--plugin=protoc-gen-ts=`which protoc-gen-ts` \
--ts_out=./generated \
-I./proto \
./proto/greeter.proto
使用生成的代码:
const messages = require('./generated/greeter_pb');
const services = require('./generated/greeter_grpc_pb');
// 创建请求
const request = new messages.HelloRequest();
request.setName('World');
// 创建客户端
const client = new services.GreeterClient(
'localhost:50051',
grpc.credentials.createInsecure()
);
// 调用服务
client.sayHello(request, (err, response) => {
if (err) {
console.error('错误:', err.message);
return;
}
console.log('响应:', response.getMessage());
});
优点:
- 更好的性能
- 类型安全(TypeScript)
- 代码补全支持
缺点:
- 需要构建步骤
- proto 修改后需要重新生成
小结
本章介绍了 Node.js gRPC 开发的核心内容:
- 环境准备:安装
@grpc/grpc-js和@grpc/proto-loader - 服务端开发:实现各种 RPC 类型、拦截器、TLS 配置
- 客户端开发:同步/异步调用、元数据传递、超时处理
- 错误处理:标准错误码和错误处理模式
- 安全通信:TLS 和 mTLS 配置
- 最佳实践:连接复用、错误封装、负载均衡、优雅关闭
Node.js gRPC 开发相对简单,适合快速构建微服务和 API 网关。对于高性能场景,建议使用静态代码生成方式。
[!TIP]
@grpc/grpc-js是纯 JavaScript 实现,不需要编译原生模块,比旧版grpc包更易于安装和部署。新项目应优先使用@grpc/grpc-js。