Java gRPC 开发
本章详细介绍如何使用 Java 开发 gRPC 服务端和客户端,包括项目构建、服务实现、流式处理、拦截器等核心内容。
环境准备
系统要求
- JDK 8 或更高版本(推荐 JDK 11+)
- Maven 3.6+ 或 Gradle 6.0+
Maven 项目配置
<!-- pom.xml -->
<project>
<properties>
<grpc.version>1.71.0</grpc.version>
<protobuf.version>3.25.3</protobuf.version>
</properties>
<dependencies>
<!-- gRPC 核心库 -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<!-- 必需:用于处理 javax.annotation.Generated -->
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<extensions>
<!-- OS 检测扩展 -->
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<!-- Protocol Buffers 编译插件 -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Gradle 项目配置
// build.gradle
plugins {
id 'java'
id 'com.google.protobuf' version '0.9.4'
}
def grpcVersion = '1.71.0'
def protobufVersion = '3.25.3'
dependencies {
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
compileOnly 'org.apache.tomcat:annotations-api:6.0.53'
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:${protobufVersion}"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}
项目结构
grpc-java-project/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/example/grpc/
│ │ │ ├── server/ # 服务端代码
│ │ │ │ └── GreeterServer.java
│ │ │ ├── service/ # 服务实现
│ │ │ │ └── GreeterServiceImpl.java
│ │ │ └── interceptor/ # 拦截器
│ │ │ └── LoggingInterceptor.java
│ │ ├── proto/ # Proto 定义文件
│ │ │ └── greeter.proto
│ │ └── resources/
│ └── test/
│ └── java/
└── pom.xml
Proto 定义与代码生成
Proto 文件
// src/main/proto/greeter.proto
syntax = "proto3";
package com.example.grpc;
option java_multiple_files = true;
option java_package = "com.example.grpc";
option java_outer_classname = "GreeterProto";
// 问候服务
service Greeter {
// 一元 RPC:简单请求-响应
rpc SayHello (HelloRequest) returns (HelloReply);
// 服务端流:返回多个响应
rpc StreamGreetings (HelloRequest) returns (stream HelloReply);
// 客户端流:接收多个请求
rpc SendGreetings (stream HelloRequest) returns (HelloReply);
// 双向流:双方独立读写
rpc Chat (stream HelloRequest) returns (stream HelloReply);
}
message HelloRequest {
string name = 1;
int32 count = 2;
}
message HelloReply {
string message = 1;
int64 timestamp = 2;
}
生成的代码
执行 mvn compile 或 gradle build 后,会在 target/generated-sources/protobuf 目录下生成:
- 消息类:
HelloRequest.java、HelloReply.java - 服务基类:
GreeterGrpc.GreeterImplBase(服务端继承) - 客户端存根:
GreeterGrpc.GreeterBlockingStub(同步)、GreeterGrpc.GreeterStub(异步)
服务端开发
基本服务实现
package com.example.grpc.service;
import com.example.grpc.GreeterGrpc;
import com.example.grpc.HelloRequest;
import com.example.grpc.HelloReply;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
/**
* Greeter 服务实现
*
* 继承 GreeterImplBase,重写需要的方法。
* 未重写的方法会抛出 UnimplementedOperationException。
*/
public class GreeterServiceImpl extends GreeterGrpc.GreeterImplBase {
/**
* 一元 RPC:简单请求-响应
*/
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
// 1. 处理请求
String name = request.getName();
System.out.println("收到请求: name=" + name);
// 2. 构造响应
HelloReply reply = HelloReply.newBuilder()
.setMessage("你好, " + name + "!")
.setTimestamp(Instant.now().getEpochSecond())
.build();
// 3. 发送响应
responseObserver.onNext(reply);
// 4. 完成调用
responseObserver.onCompleted();
}
/**
* 服务端流:返回多个响应
*/
@Override
public void streamGreetings(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
int count = request.getCount() > 0 ? request.getCount() : 5;
for (int i = 0; i < count; i++) {
// 检查客户端是否已取消
if (responseObserver.isCancelled()) {
System.out.println("客户端已取消请求");
return;
}
HelloReply reply = HelloReply.newBuilder()
.setMessage(String.format("问候 %d: 你好, %s!", i + 1, request.getName()))
.setTimestamp(Instant.now().getEpochSecond())
.build();
responseObserver.onNext(reply);
// 模拟处理延迟
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
responseObserver.onError(e);
return;
}
}
responseObserver.onCompleted();
}
/**
* 客户端流:接收多个请求,返回单个响应
*/
@Override
public StreamObserver<HelloRequest> sendGreetings(StreamObserver<HelloReply> responseObserver) {
return new StreamObserver<HelloRequest>() {
private final StringBuilder names = new StringBuilder();
private int count = 0;
@Override
public void onNext(HelloRequest request) {
// 每收到一个请求就处理
System.out.println("收到: name=" + request.getName());
if (names.length() > 0) {
names.append(", ");
}
names.append(request.getName());
count++;
}
@Override
public void onError(Throwable t) {
System.err.println("客户端流错误: " + t.getMessage());
}
@Override
public void onCompleted() {
// 客户端发送完成后,返回汇总响应
HelloReply reply = HelloReply.newBuilder()
.setMessage(String.format("收到了 %d 个问候,来自: %s", count, names))
.setTimestamp(Instant.now().getEpochSecond())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
};
}
/**
* 双向流:实时聊天
*/
@Override
public StreamObserver<HelloRequest> chat(StreamObserver<HelloReply> responseObserver) {
return new StreamObserver<HelloRequest>() {
@Override
public void onNext(HelloRequest request) {
// 收到消息后立即回复
HelloReply reply = HelloReply.newBuilder()
.setMessage("[服务器] 收到你的消息: " + request.getName())
.setTimestamp(Instant.now().getEpochSecond())
.build();
responseObserver.onNext(reply);
}
@Override
public void onError(Throwable t) {
System.err.println("聊天错误: " + t.getMessage());
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
启动服务器
package com.example.grpc.server;
import com.example.grpc.service.GreeterServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* gRPC 服务器
*/
public class GreeterServer {
private final int port;
private Server server;
public GreeterServer(int port) {
this.port = port;
}
/**
* 启动服务器
*/
public void start() throws IOException {
server = ServerBuilder.forPort(port)
// 添加服务
.addService(new GreeterServiceImpl())
// 启用服务反射(用于 grpcurl 等工具)
.addService(ProtoReflectionService.newInstance())
// 构建
.build()
.start();
System.out.println("服务器启动,监听端口: " + port);
// 注册关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("正在关闭服务器...");
try {
GreeterServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("服务器已关闭");
}));
}
/**
* 停止服务器
*/
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
/**
* 阻塞等待服务器终止
*/
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final GreeterServer server = new GreeterServer(50051);
server.start();
server.blockUntilShutdown();
}
}
服务器配置选项
import io.grpc.ServerBuilder;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
// 基本配置
Server server = ServerBuilder.forPort(50051)
// 最大消息大小
.maxInboundMessageSize(10 * 1024 * 1024) // 10MB
// 最大元数据大小
.maxInboundMetadataSize(1 * 1024 * 1024) // 1MB
// 添加服务
.addService(new GreeterServiceImpl())
// 添加拦截器
.intercept(new LoggingInterceptor())
.build();
// 使用 Netty 的高级配置
Server server = NettyServerBuilder.forPort(50051)
// 工作线程数
.bossEventLoopGroup(new NioEventLoopGroup(1))
.workerEventLoopGroup(new NioEventLoopGroup(4))
// Keep-alive 配置
.keepAliveTime(30, TimeUnit.SECONDS)
.keepAliveTimeout(10, TimeUnit.SECONDS)
.permitKeepAliveWithoutCalls(true)
// 最大并发连接数
.maxConcurrentCallsPerConnection(100)
.build();
客户端开发
基本客户端
package com.example.grpc.client;
import com.example.grpc.GreeterGrpc;
import com.example.grpc.HelloRequest;
import com.example.grpc.HelloReply;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
* gRPC 客户端
*/
public class GreeterClient {
private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;
private final GreeterGrpc.GreeterStub asyncStub;
public GreeterClient(String host, int port) {
// 创建连接
this.channel = ManagedChannelBuilder.forAddress(host, port)
// 使用明文传输(开发环境)
.usePlaintext()
.build();
// 创建存根
this.blockingStub = GreeterGrpc.newBlockingStub(channel);
this.asyncStub = GreeterGrpc.newStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/**
* 一元 RPC:同步调用
*/
public void sayHello(String name) {
System.out.println("\n=== 一元 RPC ===");
HelloRequest request = HelloRequest.newBuilder()
.setName(name)
.build();
HelloReply reply = blockingStub.sayHello(request);
System.out.println("响应: " + reply.getMessage());
}
/**
* 服务端流:阻塞式迭代
*/
public void streamGreetings(String name, int count) {
System.out.println("\n=== 服务端流 ===");
HelloRequest request = HelloRequest.newBuilder()
.setName(name)
.setCount(count)
.build();
// 返回迭代器
Iterator<HelloReply> replies = blockingStub.streamGreetings(request);
while (replies.hasNext()) {
HelloReply reply = replies.next();
System.out.println("收到: " + reply.getMessage());
}
}
/**
* 客户端流:异步调用
*/
public void sendGreetings(String[] names) throws InterruptedException {
System.out.println("\n=== 客户端流 ===");
// 创建响应观察者
StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
System.out.println("响应: " + reply.getMessage());
}
@Override
public void onError(Throwable t) {
System.err.println("错误: " + t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("完成");
}
};
// 获取请求观察者
StreamObserver<HelloRequest> requestObserver = asyncStub.sendGreetings(responseObserver);
// 发送多个请求
for (String name : names) {
HelloRequest request = HelloRequest.newBuilder()
.setName(name)
.build();
requestObserver.onNext(request);
}
// 发送完成
requestObserver.onCompleted();
// 等待响应
Thread.sleep(1000);
}
/**
* 双向流:异步调用
*/
public void chat(String[] messages) throws InterruptedException {
System.out.println("\n=== 双向流 ===");
// 创建响应观察者
StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
System.out.println("收到: " + reply.getMessage());
}
@Override
public void onError(Throwable t) {
System.err.println("错误: " + t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("聊天结束");
}
};
// 获取请求观察者
StreamObserver<HelloRequest> requestObserver = asyncStub.chat(responseObserver);
// 发送消息
for (String message : messages) {
HelloRequest request = HelloRequest.newBuilder()
.setName(message)
.build();
requestObserver.onNext(request);
Thread.sleep(500);
}
// 发送完成
requestObserver.onCompleted();
// 等待响应
Thread.sleep(1000);
}
public static void main(String[] args) throws Exception {
GreeterClient client = new GreeterClient("localhost", 50051);
try {
client.sayHello("张三");
client.streamGreetings("李四", 3);
client.sendGreetings(new String[]{"王五", "赵六", "钱七"});
client.chat(new String[]{"你好", "今天天气不错", "再见"});
} finally {
client.shutdown();
}
}
}
客户端配置选项
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
// 基本配置
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
// 明文传输
.usePlaintext()
// 最大消息大小
.maxInboundMessageSize(10 * 1024 * 1024)
// 最大元数据大小
.maxInboundMetadataSize(1 * 1024 * 1024)
// 禁用 DNS 轮询
.disableServiceConfigLookUp()
// 默认负载均衡策略
.defaultLoadBalancingPolicy("round_robin")
.build();
// Keep-alive 配置
ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:50051")
.keepAliveTime(30, TimeUnit.SECONDS) // 发送 ping 间隔
.keepAliveTimeout(10, TimeUnit.SECONDS) // ping 超时
.keepAliveWithoutCalls(true) // 空闲时也发送 ping
.build();
// 使用 Netty 的高级配置
ManagedChannel channel = NettyChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
// 连接池配置
.eventLoopGroup(new NioEventLoopGroup(4))
// 连接超时
.connectTimeout(10, TimeUnit.SECONDS)
.build();
拦截器
Java gRPC 支持两种拦截器:服务端拦截器和客户端拦截器。
服务端拦截器
package com.example.grpc.interceptor;
import io.grpc.*;
import io.grpc.Metadata.Key;
import java.time.Instant;
/**
* 日志拦截器
*/
public class LoggingInterceptor implements ServerInterceptor {
// 定义元数据键
private static final Key<String> AUTHORIZATION_KEY =
Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
long startTime = System.currentTimeMillis();
String methodName = call.getMethodDescriptor().getFullMethodName();
// 记录请求
System.out.println("[请求] 方法: " + methodName);
// 获取元数据
String authHeader = headers.get(AUTHORIZATION_KEY);
if (authHeader != null) {
System.out.println("[请求] 认证: " + authHeader);
}
// 包装 ServerCall 以记录响应
ServerCall<ReqT, RespT> wrappedCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
long duration = System.currentTimeMillis() - startTime;
System.out.println("[响应] 方法: " + methodName +
", 状态: " + status.getCode() +
", 耗时: " + duration + "ms");
super.close(status, trailers);
}
};
return next.startCall(wrappedCall, headers);
}
}
/**
* 认证拦截器
*/
public class AuthInterceptor implements ServerInterceptor {
private static final Key<String> AUTHORIZATION_KEY =
Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// 获取认证信息
String authHeader = headers.get(AUTHORIZATION_KEY);
if (authHeader == null) {
// 拒绝未认证的请求
call.close(Status.UNAUTHENTICATED.withDescription("缺少认证信息"), new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
// 验证 token
String userId = validateToken(authHeader);
if (userId == null) {
call.close(Status.UNAUTHENTICATED.withDescription("无效的认证信息"), new Metadata());
return new ServerCall.Listener<ReqT>() {};
}
// 将用户信息存入 Context
Context context = Context.current()
.withValue(Context.key("user_id"), userId);
return Contexts.interceptCall(context, call, headers, next);
}
private String validateToken(String token) {
// 实际的 token 验证逻辑
if (token.startsWith("Bearer valid-")) {
return token.substring(13);
}
return null;
}
}
/**
* 异常处理拦截器
*/
public class ExceptionInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(call, headers)) {
@Override
public void onHalfClose() {
try {
super.onHalfClose();
} catch (RuntimeException e) {
handleException(e, call);
}
}
@Override
public void onMessage(ReqT message) {
try {
super.onMessage(message);
} catch (RuntimeException e) {
handleException(e, call);
}
}
private void handleException(RuntimeException e, ServerCall<?, ?> call) {
if (e instanceof StatusRuntimeException) {
call.close(((StatusRuntimeException) e).getStatus(), new Metadata());
} else {
System.err.println("未处理的异常: " + e.getMessage());
call.close(Status.INTERNAL.withDescription("内部服务错误"), new Metadata());
}
}
};
}
}
使用拦截器
// 服务端
Server server = ServerBuilder.forPort(50051)
.addService(new GreeterServiceImpl())
// 全局拦截器(按添加顺序执行)
.intercept(new LoggingInterceptor())
.intercept(new AuthInterceptor())
.intercept(new ExceptionInterceptor())
.build();
// 客户端
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.intercept(new ClientLoggingInterceptor())
.build();
客户端拦截器
/**
* 客户端日志拦截器
*/
public class ClientLoggingInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
System.out.println("[客户端请求] 方法: " + method.getFullMethodName());
long startTime = System.currentTimeMillis();
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// 添加自定义元数据
headers.put(Metadata.Key.of("client-version", Metadata.ASCII_STRING_MARSHALLER), "1.0.0");
// 包装监听器以记录响应
Listener<RespT> wrappedListener = new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
long duration = System.currentTimeMillis() - startTime;
System.out.println("[客户端响应] 状态: " + status.getCode() + ", 耗时: " + duration + "ms");
super.onClose(status, trailers);
}
};
super.start(wrappedListener, headers);
}
};
}
}
/**
* 认证拦截器
*/
public class ClientAuthInterceptor implements ClientInterceptor {
private final String token;
public ClientAuthInterceptor(String token) {
this.token = token;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// 添加认证头
headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER),
"Bearer " + token);
super.start(responseListener, headers);
}
};
}
}
错误处理
服务端返回错误
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
// 参数验证
if (request.getName().isEmpty()) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("name 不能为空")
.asRuntimeException());
return;
}
// 业务逻辑错误
if (request.getName().equals("error")) {
responseObserver.onError(Status.INTERNAL
.withDescription("内部处理错误")
.asRuntimeException());
return;
}
// 正常返回
HelloReply reply = HelloReply.newBuilder()
.setMessage("你好, " + request.getName() + "!")
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
// 使用增强的错误信息
import io.grpc.protobuf.StatusProto;
@Override
public void sayHelloWithError(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
// 构造详细的错误信息
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT_VALUE)
.setMessage("参数验证失败")
.addDetails(Any.pack(com.google.rpc.BadRequest.newBuilder()
.addFieldViolations(com.google.rpc.BadRequest.FieldViolation.newBuilder()
.setField("name")
.setDescription("name 不能为空")
.build())
.build()))
.build();
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
客户端处理错误
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
public void callWithExceptionHandling(String name) {
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
try {
HelloReply reply = blockingStub.sayHello(request);
System.out.println("响应: " + reply.getMessage());
} catch (StatusRuntimeException e) {
Status status = e.getStatus();
Status.Code code = status.getCode();
String description = status.getDescription();
System.err.println("错误码: " + code);
System.err.println("错误描述: " + description);
// 根据错误码处理
switch (code) {
case INVALID_ARGUMENT:
System.err.println("参数错误,请检查输入");
break;
case NOT_FOUND:
System.err.println("资源不存在");
break;
case PERMISSION_DENIED:
System.err.println("权限不足");
break;
case UNAUTHENTICATED:
System.err.println("未认证,请先登录");
break;
case DEADLINE_EXCEEDED:
System.err.println("请求超时");
break;
case UNAVAILABLE:
System.err.println("服务不可用,请稍后重试");
break;
case INTERNAL:
System.err.println("服务器内部错误");
break;
default:
System.err.println("未知错误: " + code);
}
}
}
超时与取消
import io.grpc.Deadline;
import java.util.concurrent.TimeUnit;
// 设置超时
HelloReply reply = blockingStub
.withDeadline(Deadline.after(3, TimeUnit.SECONDS))
.sayHello(request);
// 或使用 after 方法
HelloReply reply = blockingStub
.withDeadlineAfter(3, TimeUnit.SECONDS)
.sayHello(request);
// 异步调用取消
StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloReply>() {
// ...
};
StreamObserver<HelloRequest> requestObserver = asyncStub
.withDeadlineAfter(5, TimeUnit.SECONDS)
.chat(responseObserver);
// 发送一些请求后取消
requestObserver.onNext(request);
// 如果需要取消
// requestObserver.onError(new CancellationException("用户取消"));
TLS/SSL 安全连接
服务端 TLS 配置
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
// 加载证书
InputStream certChain = new FileInputStream("server.crt");
InputStream privateKey = new FileInputStream("server.key");
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(certChain, privateKey);
Server server = NettyServerBuilder.forPort(50051)
.sslContext(GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL).build())
.addService(new GreeterServiceImpl())
.build();
客户端 TLS 配置
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
// 方式一:使用系统信任的证书
ManagedChannel channel = NettyChannelBuilder.forAddress("localhost", 50051)
.sslContext(GrpcSslContexts.forClient().build())
.build();
// 方式二:跳过证书验证(仅测试用)
ManagedChannel channel = NettyChannelBuilder.forAddress("localhost", 50051)
.sslContext(GrpcSslContexts.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build())
.build();
// 方式三:使用自定义 CA 证书
InputStream caCert = new FileInputStream("ca.crt");
ManagedChannel channel = NettyChannelBuilder.forAddress("localhost", 50051)
.sslContext(GrpcSslContexts.forClient()
.trustManager(caCert)
.build())
.build();
最佳实践
1. 连接管理
// 推荐:使用单例模式复用连接
public class GrpcClientManager {
private static volatile ManagedChannel channel;
public static ManagedChannel getChannel() {
if (channel == null) {
synchronized (GrpcClientManager.class) {
if (channel == null) {
channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
}
}
}
return channel;
}
public static void shutdown() {
if (channel != null) {
channel.shutdown();
}
}
}
2. 资源清理
// 使用 try-with-resources 模式(ManagedChannel 实现了 Closeable)
try (ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build()) {
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
// 使用 stub...
}
// 或确保关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));
3. 线程安全
// ManagedChannel 和 Stub 都是线程安全的,可以安全共享
// 但 StreamObserver 不是线程安全的
// 正确:多个请求使用同一个 channel
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
// 可以在多个线程中使用同一个 stub
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
final int id = i;
executor.submit(() -> {
HelloReply reply = stub.sayHello(HelloRequest.newBuilder()
.setName("User-" + id)
.build());
System.out.println(reply.getMessage());
});
}
小结
本章学习了 Java gRPC 开发的核心内容:
- 项目配置:Maven 和 Gradle 的依赖和插件配置
- 服务端开发:服务实现、服务器启动和配置
- 客户端开发:同步和异步调用、流式处理
- 拦截器:服务端和客户端拦截器的实现
- 错误处理:错误码和异常处理
- 安全连接:TLS/SSL 配置
- 最佳实践:连接管理、资源清理、线程安全
Java gRPC 库成熟稳定,是企业级微服务开发的优选方案。结合 Spring Boot,可以快速构建生产级的 gRPC 服务。
[!TIP] 完整示例代码可以在 gRPC Java 官方示例 找到。