C++ gRPC 开发
C++ 是 gRPC 的原生实现语言,具有最高性能和最完整的功能支持。本章介绍如何使用 C++ 开发高性能 gRPC 服务。
环境准备
系统要求
- C++14 或更高版本编译器(GCC 7+、Clang 6+、MSVC 2017+)
- CMake 3.13+
- pkg-config(Linux/macOS)
安装方式
方式一:使用包管理器安装
Ubuntu/Debian:
sudo apt update
sudo apt install -y libgrpc++-dev protobuf-compiler-grpc
CentOS/RHEL:
sudo yum install -y grpc-devel grpc-plugins
macOS:
brew install grpc
vcpkg(Windows):
vcpkg install grpc:x64-windows
方式二:从源码编译
源码编译可以获得最新功能和更好的控制,但耗时较长:
# 克隆仓库(包含子模块)
git clone --recurse-submodules -b v1.60.0 --depth 1 --shallow-submodules \
https://github.com/grpc/grpc
cd grpc
# 创建构建目录
mkdir -p cmake/build
cd cmake/build
# 配置(根据需要调整选项)
cmake ../.. \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_INSTALL_PREFIX=/usr/local \
-DgRPC_BUILD_TESTS=OFF \
-DgRPC_BUILD_EXAMPLES=OFF
# 编译(使用所有 CPU 核心)
make -j$(nproc)
# 安装
sudo make install
# 更新库缓存
sudo ldconfig
验证安装
# 检查 pkg-config
pkg-config --libs grpc++
pkg-config --libs protobuf
# 检查 protoc 插件
which grpc_cpp_plugin
项目结构
grpc-cpp-project/
├── proto/
│ └── greeter.proto # Proto 定义
├── generated/ # 生成的代码
│ ├── greeter.pb.h
│ ├── greeter.pb.cc
│ ├── greeter.grpc.pb.h
│ └── greeter.grpc.pb.cc
├── server/
│ └── server.cpp # 服务端实现
├── client/
│ └── client.cpp # 客户端实现
└── CMakeLists.txt # CMake 配置
Proto 定义
// proto/greeter.proto
syntax = "proto3";
package greeter;
// 问候服务
service Greeter {
// 一元 RPC
rpc SayHello (HelloRequest) returns (HelloReply);
// 服务端流
rpc StreamGreetings (HelloRequest) returns (stream HelloReply);
// 客户端流
rpc SendGreetings (stream HelloRequest) returns (HelloReply);
// 双向流
rpc Chat (stream HelloRequest) returns (stream HelloReply);
}
message HelloRequest {
string name = 1;
int32 count = 2;
}
message HelloReply {
string message = 1;
int64 timestamp = 2;
}
CMake 配置
# CMakeLists.txt
cmake_minimum_required(VERSION 3.13)
project(grpc_cpp_demo)
# C++ 标准
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# 查找 gRPC 和 Protobuf
find_package(Threads REQUIRED)
find_package(Protobuf CONFIG REQUIRED)
find_package(gRPC CONFIG REQUIRED)
# Proto 文件
get_filename_component(greeter_proto "${CMAKE_CURRENT_SOURCE_DIR}/proto/greeter.proto" ABSOLUTE)
get_filename_component(greeter_proto_path "${greeter_proto}" PATH)
# 生成源文件
set(greeter_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/greeter.pb.cc")
set(greeter_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/greeter.pb.h")
set(greeter_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/greeter.grpc.pb.cc")
set(greeter_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/greeter.grpc.pb.h")
# 生成命令
add_custom_command(
OUTPUT "${greeter_proto_srcs}" "${greeter_proto_hdrs}" "${greeter_grpc_srcs}" "${greeter_grpc_hdrs}"
COMMAND protobuf::protoc
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${greeter_proto_path}"
--plugin=protoc-gen-grpc="${gRPC_CPP_PLUGIN_EXECUTABLE}"
"${greeter_proto}"
DEPENDS "${greeter_proto}"
)
# 包含生成文件的目录
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
# 服务端
add_executable(greeter_server
server/server.cpp
${greeter_proto_srcs}
${greeter_grpc_srcs}
)
target_link_libraries(greeter_server
gRPC::grpc++
protobuf::libprotobuf
Threads::Threads
)
# 客户端
add_executable(greeter_client
client/client.cpp
${greeter_proto_srcs}
${greeter_grpc_srcs}
)
target_link_libraries(greeter_client
gRPC::grpc++
protobuf::libprotobuf
Threads::Threads
)
服务端开发
基本服务实现
// server/server.cpp
#include <iostream>
#include <memory>
#include <string>
#include <chrono>
#include <grpcpp/grpcpp.h>
#include "greeter.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerWriter;
using grpc::ServerReader;
using grpc::ServerReaderWriter;
using grpc::Status;
/**
* Greeter 服务实现
*
* 继承生成的 Service 类,重写 RPC 方法
*/
class GreeterServiceImpl final : public greeter::Greeter::Service {
public:
/**
* 一元 RPC:简单的请求-响应
*/
Status SayHello(ServerContext* context,
const greeter::HelloRequest* request,
greeter::HelloReply* reply) override {
std::cout << "收到请求: name=" << request->name() << std::endl;
// 构造响应
reply->set_message("你好, " + request->name() + "!");
reply->set_timestamp(std::chrono::system_clock::now().time_since_epoch().count());
return Status::OK;
}
/**
* 服务端流:返回多个响应
*/
Status StreamGreetings(ServerContext* context,
const greeter::HelloRequest* request,
ServerWriter<greeter::HelloReply>* writer) override {
std::cout << "收到流请求: name=" << request->name()
<< ", count=" << request->count() << std::endl;
int count = request->count() > 0 ? request->count() : 5;
for (int i = 0; i < count; i++) {
// 检查客户端是否已取消
if (context->IsCancelled()) {
std::cout << "客户端已取消请求" << std::endl;
return Status::CANCELLED;
}
greeter::HelloReply reply;
reply.set_message("问候 " + std::to_string(i + 1) + ": 你好, " + request->name() + "!");
reply->set_timestamp(std::chrono::system_clock::now().time_since_epoch().count());
// 发送响应
if (!writer->Write(reply)) {
// 写入失败,客户端可能已断开
break;
}
// 模拟处理延迟
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
return Status::OK;
}
/**
* 客户端流:接收多个请求,返回单个响应
*/
Status SendGreetings(ServerContext* context,
ServerReader<greeter::HelloRequest>* reader,
greeter::HelloReply* reply) override {
std::vector<std::string> names;
greeter::HelloRequest request;
// 读取所有请求
while (reader->Read(&request)) {
std::cout << "收到: name=" << request.name() << std::endl;
names.push_back(request.name());
}
// 构造汇总响应
std::string names_str;
for (size_t i = 0; i < names.size(); i++) {
if (i > 0) names_str += ", ";
names_str += names[i];
}
reply->set_message("收到了 " + std::to_string(names.size()) + " 个问候,来自: " + names_str);
reply->set_timestamp(std::chrono::system_clock::now().time_since_epoch().count());
return Status::OK;
}
/**
* 双向流:实时聊天
*/
Status Chat(ServerContext* context,
ServerReaderWriter<greeter::HelloReply, greeter::HelloRequest>* stream) override {
greeter::HelloRequest request;
while (stream->Read(&request)) {
std::cout << "聊天消息: " << request.name() << std::endl;
greeter::HelloReply reply;
reply.set_message("[服务器] 收到你的消息: " + request.name());
reply.set_timestamp(std::chrono::system_clock::now().time_since_epoch().count());
stream->Write(reply);
}
return Status::OK;
}
};
/**
* 启动服务器
*/
void RunServer(const std::string& address) {
GreeterServiceImpl service;
ServerBuilder builder;
// 监听地址和端口
builder.AddListeningPort(address, grpc::InsecureServerCredentials());
// 注册服务
builder.RegisterService(&service);
// 构建并启动服务器
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "服务器启动,监听: " << address << std::endl;
// 等待服务器关闭
server->Wait();
}
int main(int argc, char** argv) {
RunServer("0.0.0.0:50051");
return 0;
}
异步服务实现
C++ gRPC 的异步 API 使用 CompletionQueue,可以实现更高性能:
// server/async_server.cpp
#include <iostream>
#include <memory>
#include <thread>
#include <chrono>
#include <grpcpp/grpcpp.h>
#include "greeter.grpc.pb.h"
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
class GreeterServiceImpl final : public greeter::Greeter::AsyncService {
public:
void SayHello(ServerContext* context,
const greeter::HelloRequest* request,
ServerAsyncResponseWriter<greeter::HelloReply>* responder,
grpc::CompletionQueue* cq,
grpc::ServerCompletionQueue* notification_cq,
void* tag) {
// 异步处理逻辑
greeter::HelloReply reply;
reply.set_message("你好, " + request->name() + "!");
reply.set_timestamp(std::chrono::system_clock::now().time_since_epoch().count());
responder->Finish(reply, Status::OK, tag);
}
};
/**
* 请求处理基类
*/
class CallData {
public:
virtual void Proceed() = 0;
virtual ~CallData() = default;
};
/**
* 一元 RPC 的 CallData 实现
*/
class SayHelloCallData : public CallData {
public:
SayHelloCallData(greeter::Greeter::AsyncService* service,
grpc::ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&context_), status_(CREATE) {
Proceed();
}
void Proceed() override {
if (status_ == CREATE) {
status_ = PROCESS;
// 请求处理下一个 RPC
service_->RequestSayHello(&context_, &request_, &responder_, cq_, cq_, this);
} else if (status_ == PROCESS) {
// 为下一个请求创建新的 CallData
new SayHelloCallData(service_, cq_);
// 处理当前请求
reply_.set_message("你好, " + request_.name() + "!");
reply_.set_timestamp(std::chrono::system_clock::now().time_since_epoch().count());
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
GPR_ASSERT(status_ == FINISH);
delete this;
}
}
private:
greeter::Greeter::AsyncService* service_;
grpc::ServerCompletionQueue* cq_;
grpc::ServerContext context_;
greeter::HelloRequest request_;
greeter::HelloReply reply_;
grpc::ServerAsyncResponseWriter<greeter::HelloReply> responder_;
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;
};
/**
* 异步服务器
*/
class AsyncServer final {
public:
AsyncServer(const std::string& address) {
builder_.AddListeningPort(address, grpc::InsecureServerCredentials());
builder_.RegisterService(&service_);
cq_ = builder_.AddCompletionQueue();
server_ = builder_.BuildAndStart();
std::cout << "异步服务器启动,监听: " << address << std::endl;
}
void Run() {
// 预注册足够的请求以实现并发处理
// 避免在慢路径中串行处理
for (int i = 0; i < 100; i++) {
new SayHelloCallData(&service_, cq_.get());
}
void* tag;
bool ok;
while (true) {
// 从队列获取下一个事件
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
// 处理事件
static_cast<CallData*>(tag)->Proceed();
}
}
private:
greeter::Greeter::AsyncService service_;
grpc::ServerBuilder builder_;
std::unique_ptr<grpc::ServerCompletionQueue> cq_;
std::unique_ptr<grpc::Server> server_;
};
int main(int argc, char** argv) {
AsyncServer server("0.0.0.0:50051");
server.Run();
return 0;
}
多线程服务器
#include <grpcpp/grpcpp.h>
#include <thread>
#include <vector>
class MultiThreadedServer {
public:
MultiThreadedServer(const std::string& address, int num_threads)
: address_(address), num_threads_(num_threads) {}
void Run() {
greeter::Greeter::AsyncService service;
grpc::ServerBuilder builder;
builder.AddListeningPort(address_, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
// 创建多个 completion queue
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> cqs;
for (int i = 0; i < num_threads_; i++) {
cqs.push_back(builder.AddCompletionQueue());
}
auto server = builder.BuildAndStart();
std::cout << "多线程服务器启动: " << num_threads_ << " 个线程" << std::endl;
// 每个线程处理一个 completion queue
std::vector<std::thread> threads;
for (int i = 0; i < num_threads_; i++) {
threads.emplace_back([&service, cq = cqs[i].get()]() {
HandleRpcs(&service, cq);
});
}
// 等待所有线程
for (auto& t : threads) {
t.join();
}
}
private:
static void HandleRpcs(greeter::Greeter::AsyncService* service,
grpc::ServerCompletionQueue* cq) {
// 预注册请求
for (int i = 0; i < 100; i++) {
new SayHelloCallData(service, cq);
}
void* tag;
bool ok;
while (cq->Next(&tag, &ok)) {
if (ok) {
static_cast<CallData*>(tag)->Proceed();
}
}
}
std::string address_;
int num_threads_;
};
客户端开发
同步客户端
// client/client.cpp
#include <iostream>
#include <memory>
#include <string>
#include <chrono>
#include <grpcpp/grpcpp.h>
#include "greeter.grpc.pb.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReader;
using grpc::ClientReaderWriter;
using grpc::ClientWriter;
using grpc::Status;
class GreeterClient {
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(greeter::Greeter::NewStub(channel)) {}
/**
* 一元 RPC 调用
*/
void SayHello(const std::string& name) {
std::cout << "\n=== 一元 RPC ===" << std::endl;
greeter::HelloRequest request;
request.set_name(name);
greeter::HelloReply reply;
ClientContext context;
// 设置超时(5秒)
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::seconds(5);
context.set_deadline(deadline);
// 同步调用
Status status = stub_->SayHello(&context, request, &reply);
if (status.ok()) {
std::cout << "响应: " << reply.message() << std::endl;
} else {
std::cout << "错误: " << status.error_code() << ": " << status.error_message() << std::endl;
}
}
/**
* 服务端流调用
*/
void StreamGreetings(const std::string& name, int count) {
std::cout << "\n=== 服务端流 ===" << std::endl;
greeter::HelloRequest request;
request.set_name(name);
request.set_count(count);
ClientContext context;
// 创建流读取器
std::unique_ptr<ClientReader<greeter::HelloReply>> reader(
stub_->StreamGreetings(&context, request));
greeter::HelloReply reply;
while (reader->Read(&reply)) {
std::cout << "收到: " << reply.message() << std::endl;
}
Status status = reader->Finish();
if (!status.ok()) {
std::cout << "错误: " << status.error_message() << std::endl;
}
}
/**
* 客户端流调用
*/
void SendGreetings(const std::vector<std::string>& names) {
std::cout << "\n=== 客户端流 ===" << std::endl;
ClientContext context;
greeter::HelloReply reply;
// 创建流写入器
std::unique_ptr<ClientWriter<greeter::HelloRequest>> writer(
stub_->SendGreetings(&context, &reply));
// 发送多个请求
for (const auto& name : names) {
std::cout << "发送: " << name << std::endl;
greeter::HelloRequest request;
request.set_name(name);
if (!writer->Write(request)) {
break; // 流已关闭
}
}
// 关闭写入端
writer->WritesDone();
// 获取响应
Status status = writer->Finish();
if (status.ok()) {
std::cout << "响应: " << reply.message() << std::endl;
} else {
std::cout << "错误: " << status.error_message() << std::endl;
}
}
/**
* 双向流调用
*/
void Chat(const std::vector<std::string>& messages) {
std::cout << "\n=== 双向流 ===" << std::endl;
ClientContext context;
// 创建双向流
std::shared_ptr<ClientReaderWriter<greeter::HelloRequest, greeter::HelloReply>> stream(
stub_->Chat(&context));
// 写入线程
std::thread writer([stream, messages]() {
for (const auto& msg : messages) {
std::cout << "发送: " << msg << std::endl;
greeter::HelloRequest request;
request.set_name(msg);
stream->Write(request);
}
stream->WritesDone();
});
// 读取响应
greeter::HelloReply reply;
while (stream->Read(&reply)) {
std::cout << "收到: " << reply.message() << std::endl;
}
writer.join();
Status status = stream->Finish();
if (!status.ok()) {
std::cout << "错误: " << status.error_message() << std::endl;
}
}
private:
std::unique_ptr<greeter::Greeter::Stub> stub_;
};
int main(int argc, char** argv) {
// 创建连接
GreeterClient client(grpc::CreateChannel(
"localhost:50051",
grpc::InsecureChannelCredentials()));
// 测试各种 RPC
client.SayHello("张三");
client.StreamGreetings("李四", 3);
client.SendGreetings({"王五", "赵六", "钱七"});
client.Chat({"你好", "今天天气不错", "再见"});
return 0;
}
异步客户端
#include <grpcpp/grpcpp.h>
#include <thread>
#include <chrono>
class AsyncGreeterClient {
public:
AsyncGreeterClient(std::shared_ptr<grpc::Channel> channel)
: stub_(greeter::Greeter::NewStub(channel)) {}
void SayHelloAsync(const std::string& name) {
greeter::HelloRequest request;
request.set_name(name);
// 创建异步调用
grpc::ClientContext context;
grpc::CompletionQueue cq;
std::unique_ptr<grpc::ClientAsyncResponseReader<greeter::HelloReply>> rpc(
stub_->AsyncSayHello(&context, request, &cq));
greeter::HelloReply reply;
Status status;
// 设置完成标签
rpc->Finish(&reply, &status, (void*)1);
// 等待完成
void* got_tag;
bool ok = false;
GPR_ASSERT(cq.Next(&got_tag, &ok));
GPR_ASSERT(got_tag == (void*)1);
GPR_ASSERT(ok);
if (status.ok()) {
std::cout << "响应: " << reply.message() << std::endl;
} else {
std::cout << "错误: " << status.error_message() << std::endl;
}
}
private:
std::unique_ptr<greeter::Greeter::Stub> stub_;
};
错误处理
返回错误
#include <grpcpp/grpcpp.h>
Status SayHello(ServerContext* context,
const greeter::HelloRequest* request,
greeter::HelloReply* reply) override {
// 参数验证
if (request->name().empty()) {
return Status(grpc::StatusCode::INVALID_ARGUMENT, "name 不能为空");
}
// 业务逻辑错误
if (request->name() == "error") {
return Status(grpc::StatusCode::INTERNAL, "内部处理错误");
}
// 权限检查
auto metadata = context->client_metadata();
auto it = metadata.find("authorization");
if (it == metadata.end()) {
return Status(grpc::StatusCode::UNAUTHENTICATED, "缺少认证信息");
}
// 成功
reply->set_message("你好, " + request->name() + "!");
return Status::OK;
}
处理错误
void HandleError(const Status& status) {
if (status.ok()) return;
std::cout << "错误码: " << status.error_code() << std::endl;
std::cout << "错误消息: " << status.error_message() << std::endl;
switch (status.error_code()) {
case grpc::StatusCode::INVALID_ARGUMENT:
std::cout << "参数错误" << std::endl;
break;
case grpc::StatusCode::NOT_FOUND:
std::cout << "资源不存在" << std::endl;
break;
case grpc::StatusCode::UNAUTHENTICATED:
std::cout << "未认证" << std::endl;
break;
case grpc::StatusCode::PERMISSION_DENIED:
std::cout << "权限不足" << std::endl;
break;
case grpc::StatusCode::DEADLINE_EXCEEDED:
std::cout << "请求超时" << std::endl;
break;
case grpc::StatusCode::UNAVAILABLE:
std::cout << "服务不可用" << std::endl;
break;
default:
std::cout << "其他错误" << std::endl;
}
}
元数据处理
服务端读取元数据
Status SayHello(ServerContext* context,
const greeter::HelloRequest* request,
greeter::HelloReply* reply) override {
// 获取客户端元数据
auto metadata = context->client_metadata();
// 获取单个值
auto it = metadata.find("authorization");
if (it != metadata.end()) {
std::string auth(it->second.data(), it->second.size());
std::cout << "认证信息: " << auth << std::endl;
}
// 发送初始元数据(在响应之前)
context->AddInitialMetadata("x-response-time", "100ms");
// 发送尾部元数据
context->AddTrailingMetadata("x-request-id", "req-123");
return Status::OK;
}
客户端发送元数据
void CallWithMetadata(const std::string& name) {
ClientContext context;
// 添加元数据
context.AddMetadata("authorization", "Bearer my-token");
context.AddMetadata("x-request-id", "req-123");
greeter::HelloRequest request;
request.set_name(name);
greeter::HelloReply reply;
Status status = stub_->SayHello(&context, request, &reply);
// 读取响应元数据
auto& metadata = context.GetServerInitialMetadata();
auto it = metadata.find("x-response-time");
if (it != metadata.end()) {
std::cout << "响应时间: " << std::string(it->second.data(), it->second.size()) << std::endl;
}
}
TLS/SSL 安全连接
服务端 TLS
#include <grpcpp/security/server_credentials.h>
#include <fstream>
std::string ReadFile(const std::string& path) {
std::ifstream file(path);
return std::string(std::istreambuf_iterator<char>(file),
std::istreambuf_iterator<char>());
}
void RunSecureServer() {
GreeterServiceImpl service;
// 加载证书
std::string cert = ReadFile("server.crt");
std::string key = ReadFile("server.key");
grpc::SslServerCredentialsOptions ssl_options;
ssl_options.pem_key_cert_pairs.push_back({key, cert});
auto credentials = grpc::SslServerCredentials(ssl_options);
ServerBuilder builder;
builder.AddListeningPort("0.0.0.0:50051", credentials);
builder.RegisterService(&service);
auto server = builder.BuildAndStart();
std::cout << "安全服务器启动" << std::endl;
server->Wait();
}
客户端 TLS
void ConnectWithTls() {
// 加载 CA 证书
std::string ca_cert = ReadFile("ca.crt");
grpc::SslCredentialsOptions ssl_options;
ssl_options.pem_root_certs = ca_cert;
auto credentials = grpc::SslCredentials(ssl_options);
auto channel = grpc::CreateChannel("localhost:50051", credentials);
auto stub = greeter::Greeter::NewStub(channel);
// 正常调用
// ...
}
双向 TLS(mTLS)
void ConnectWithMTls() {
std::string ca_cert = ReadFile("ca.crt");
std::string client_cert = ReadFile("client.crt");
std::string client_key = ReadFile("client.key");
grpc::SslCredentialsOptions ssl_options;
ssl_options.pem_root_certs = ca_cert;
ssl_options.pem_cert_chain = client_cert;
ssl_options.pem_private_key = client_key;
auto credentials = grpc::SslCredentials(ssl_options);
auto channel = grpc::CreateChannel("localhost:50051", credentials);
}
性能优化
连接复用
// 推荐:使用单例模式复用连接
class GrpcClient {
public:
static GrpcClient& Instance() {
static GrpcClient instance;
return instance;
}
greeter::Greeter::Stub& GetStub() { return *stub_; }
private:
GrpcClient() {
channel_ = grpc::CreateChannel(
"localhost:50051",
grpc::InsecureChannelCredentials());
stub_ = greeter::Greeter::NewStub(channel_);
}
std::shared_ptr<grpc::Channel> channel_;
std::unique_ptr<greeter::Greeter::Stub> stub_;
};
线程池配置
// 服务端线程池配置
// 最佳扩展性:每个 completion queue 使用约 2 个线程
int num_cpus = std::thread::hardware_concurrency();
int num_cqs = num_cpus / 2;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> cqs;
for (int i = 0; i < num_cqs; i++) {
cqs.push_back(builder.AddCompletionQueue());
}
Completion Queue 优化
// 预注册足够的请求以实现并发处理
void PreRegisterRequests(greeter::Greeter::AsyncService* service,
grpc::ServerCompletionQueue* cq,
int count) {
for (int i = 0; i < count; i++) {
new SayHelloCallData(service, cq);
}
}
最佳实践
1. 资源管理
// 使用 RAII 管理资源
class ScopedContext {
public:
ScopedContext() : context_(new grpc::ClientContext()) {}
~ScopedContext() { delete context_; }
grpc::ClientContext* get() { return context_; }
private:
grpc::ClientContext* context_;
};
2. 超时设置
void CallWithTimeout(const std::string& name, int timeout_seconds) {
ClientContext context;
auto deadline = std::chrono::system_clock::now() +
std::chrono::seconds(timeout_seconds);
context.set_deadline(deadline);
// 调用...
}
3. 优雅关闭
#include <csignal>
#include <atomic>
std::atomic<bool> shutdown_requested(false);
void SignalHandler(int signal) {
shutdown_requested.store(true);
}
void RunServer() {
GreeterServiceImpl service;
ServerBuilder builder;
builder.AddListeningPort("0.0.0.0:50051",
grpc::InsecureServerCredentials());
builder.RegisterService(&service);
auto server = builder.BuildAndStart();
// 注册信号处理
std::signal(SIGINT, SignalHandler);
std::signal(SIGTERM, SignalHandler);
// 等待关闭信号
while (!shutdown_requested.load()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "正在关闭服务器..." << std::endl;
server->Shutdown();
std::cout << "服务器已关闭" << std::endl;
}
小结
本章介绍了 C++ gRPC 开发的核心内容:
- 环境安装:包管理器和源码编译两种方式
- 同步服务端:简单的请求-响应处理
- 异步服务端:高性能的 CompletionQueue 模式
- 客户端开发:同步和异步调用方式
- 错误处理:状态码和错误消息
- 元数据处理:请求和响应元数据
- 安全连接:TLS 和 mTLS 配置
- 性能优化:连接复用、线程池、预注册请求
C++ 是 gRPC 性能最高的实现,适合对性能有极高要求的场景,如高频交易、实时通信、游戏服务器等。
[!TIP] 对于性能敏感场景,使用异步 API 并配合多线程 Completion Queue 可以获得最佳性能。建议每个 Completion Queue 配置约 2 个工作线程。