跳到主要内容

Python gRPC 开发

本章详细介绍如何使用 Python 开发 gRPC 服务端和客户端,包括同步/异步 API、流式处理、错误处理等核心内容。

环境准备

安装依赖

# 安装 gRPC 核心库和工具
pip install grpcio grpcio-tools

# 验证安装
python -c "import grpc; print(grpc.__version__)"

项目结构

python-grpc-project/
├── proto/ # Protocol Buffers 定义
│ └── demo.proto
├── generated/ # 生成的代码
│ ├── demo_pb2.py # 消息定义
│ └── demo_pb2_grpc.py # 服务定义
├── server/ # 服务端代码
│ └── main.py
├── client/ # 客户端代码
│ └── main.py
└── requirements.txt

代码生成

Proto 文件定义

// proto/demo.proto
syntax = "proto3";

package demo;

// 服务定义
service Greeter {
// 一元 RPC
rpc SayHello (HelloRequest) returns (HelloReply);

// 服务端流
rpc StreamGreetings (HelloRequest) returns (stream HelloReply);

// 客户端流
rpc SendGreetings (stream HelloRequest) returns (HelloReply);

// 双向流
rpc Chat (stream HelloRequest) returns (stream HelloReply);
}

message HelloRequest {
string name = 1;
int32 count = 2;
}

message HelloReply {
string message = 1;
int64 timestamp = 2;
}

生成 Python 代码

# 基本生成命令
python -m grpc_tools.protoc \
-I./proto \
--python_out=./generated \
--grpc_python_out=./generated \
proto/demo.proto

# 生成文件说明:
# demo_pb2.py - 消息类的定义
# demo_pb2_grpc.py - 服务类和客户端存根的定义

生成的代码包含:

  • 消息类HelloRequestHelloReply
  • 服务基类GreeterServicer,需要继承实现
  • 客户端存根GreeterStub,用于调用服务
  • 注册函数add_GreeterServicer_to_server,将服务注册到服务器

服务端开发

基本服务实现

# server/main.py
import grpc
from concurrent import futures
import time

# 导入生成的代码
import sys
sys.path.append('../generated')
import demo_pb2
import demo_pb2_grpc


class GreeterServicer(demo_pb2_grpc.GreeterServicer):
"""Greeter 服务实现"""

def SayHello(self, request, context):
"""一元 RPC:返回单个问候"""
# request 是 HelloRequest 实例
# context 提供了 RPC 上下文信息

print(f"收到请求: name={request.name}, count={request.count}")

# 构造响应
return demo_pb2.HelloReply(
message=f"你好, {request.name}!",
timestamp=int(time.time())
)

def StreamGreetings(self, request, context):
"""服务端流:返回多个问候"""
count = request.count if request.count > 0 else 5

for i in range(count):
# 检查客户端是否已取消
if context.is_active():
yield demo_pb2.HelloReply(
message=f"问候 {i+1}: 你好, {request.name}!",
timestamp=int(time.time())
)
time.sleep(0.5) # 模拟处理延迟
else:
print("客户端已取消请求")
break

def SendGreetings(self, request_iterator, context):
"""客户端流:接收多个请求,返回单个响应"""
names = []
total_count = 0

# 遍历客户端发送的请求流
for request in request_iterator:
print(f"收到: name={request.name}")
names.append(request.name)
total_count += 1

# 返回汇总响应
return demo_pb2.HelloReply(
message=f"收到了 {total_count} 个问候,来自: {', '.join(names)}",
timestamp=int(time.time())
)

def Chat(self, request_iterator, context):
"""双向流:实时聊天"""
for request in request_iterator:
# 对每个请求立即返回响应
yield demo_pb2.HelloReply(
message=f"[服务器] 收到你的消息: {request.name}",
timestamp=int(time.time())
)


def serve():
"""启动 gRPC 服务器"""
# 创建线程池服务器
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

# 注册服务
demo_pb2_grpc.add_GreeterServicer_to_server(
GreeterServicer(), server
)

# 绑定端口
server.add_insecure_port('[::]:50051')

# 启动服务器
server.start()
print("服务器启动,监听端口 50051")

# 等待终止
server.wait_for_termination()


if __name__ == '__main__':
serve()

服务端配置选项

def serve_with_options():
"""带配置选项的服务器"""

# 创建服务器时设置选项
options = [
# 最大接收消息大小(默认 4MB)
('grpc.max_receive_message_length', 10 * 1024 * 1024),
# 最大发送消息大小
('grpc.max_send_message_length', 10 * 1024 * 1024),
# Keep-alive 设置
('grpc.keepalive_time_ms', 10000),
('grpc.keepalive_timeout_ms', 5000),
('grpc.keepalive_permit_without_calls', True),
# HTTP/2 设置
('grpc.max_concurrent_streams', 100),
]

server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=options
)

# ... 其余配置

优雅关闭

import signal

def serve_graceful():
"""支持优雅关闭的服务器"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
demo_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("服务器启动")

# 注册信号处理
def shutdown(signum, frame):
print("\n收到关闭信号,正在停止服务器...")
# 优雅关闭:等待现有请求完成
server.stop(grace=5) # 等待 5 秒
print("服务器已停止")

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

server.wait_for_termination()

拦截器

from grpc_interceptor import ServerInterceptor
from grpc_interceptor.exceptions import GrpcException

class LoggingInterceptor(ServerInterceptor):
"""日志拦截器"""

def intercept(self, method, request_or_iterator, context, method_name):
# 记录请求
print(f"[请求] 方法: {method_name}")

try:
# 调用实际方法
response = method(request_or_iterator, context)
print(f"[响应] 方法: {method_name} 成功")
return response
except Exception as e:
print(f"[错误] 方法: {method_name}, 错误: {e}")
raise

class AuthInterceptor(ServerInterceptor):
"""认证拦截器"""

def intercept(self, method, request_or_iterator, context, method_name):
# 从元数据获取 token
metadata = dict(context.invocation_metadata())
token = metadata.get('authorization')

if not token:
context.abort(grpc.StatusCode.UNAUTHENTICATED, "缺少认证信息")

# 验证 token
if not self._validate_token(token):
context.abort(grpc.StatusCode.UNAUTHENTICATED, "无效的 token")

return method(request_or_iterator, context)

def _validate_token(self, token):
# 实际的 token 验证逻辑
return token.startswith("Bearer ")


# 使用拦截器
# 需要安装: pip install grpc-interceptor
from grpc_interceptor.server import _add_all_interceptors

def serve_with_interceptors():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

# 添加拦截器
interceptors = [
LoggingInterceptor(),
AuthInterceptor(),
]
_add_all_interceptors(server, interceptors)

demo_pb2_grpc.add_GreeterServicer_to_server(GreeterServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()

客户端开发

基本客户端实现

# client/main.py
import grpc
import time

import sys
sys.path.append('../generated')
import demo_pb2
import demo_pb2_grpc


def run_unary(stub):
"""一元 RPC 调用"""
print("\n=== 一元 RPC ===")

# 创建请求
request = demo_pb2.HelloRequest(name="张三", count=3)

# 同步调用
response = stub.SayHello(request)
print(f"响应: {response.message}, 时间戳: {response.timestamp}")

# 异步调用(使用 future)
# 注意:在 Python 同步栈中使用 future API 会创建额外线程,影响性能
# 推荐使用 asyncio 异步 API 替代,或者直接使用同步调用
future = stub.SayHello.future(request)
# 可以做其他事情...
response = future.result(timeout=5) # 等待结果
print(f"异步响应: {response.message}")


def run_server_stream(stub):
"""服务端流调用"""
print("\n=== 服务端流 ===")

request = demo_pb2.HelloRequest(name="李四", count=3)

# 调用返回迭代器
for response in stub.StreamGreetings(request):
print(f"收到: {response.message}")


def run_client_stream(stub):
"""客户端流调用"""
print("\n=== 客户端流 ===")

# 创建请求生成器
def generate_requests():
names = ["王五", "赵六", "钱七"]
for name in names:
yield demo_pb2.HelloRequest(name=name)
time.sleep(0.5)

# 发送流并获取单个响应
response = stub.SendGreetings(generate_requests())
print(f"响应: {response.message}")


def run_bidirectional_stream(stub):
"""双向流调用"""
print("\n=== 双向流 ===")

# 创建请求生成器
def generate_requests():
messages = ["你好", "今天天气不错", "再见"]
for msg in messages:
yield demo_pb2.HelloRequest(name=msg)
time.sleep(1)

# 发送流并接收流
responses = stub.Chat(generate_requests())

for response in responses:
print(f"收到: {response.message}")


def main():
# 创建连接
channel = grpc.insecure_channel('localhost:50051')

# 创建存根
stub = demo_pb2_grpc.GreeterStub(channel)

# 调用各种 RPC
run_unary(stub)
run_server_stream(stub)
run_client_stream(stub)
run_bidirectional_stream(stub)

# 关闭连接
channel.close()


if __name__ == '__main__':
main()

客户端配置选项

def create_channel_with_options():
"""创建带配置的连接"""

options = [
# 最大消息大小
('grpc.max_receive_message_length', 10 * 1024 * 1024),
('grpc.max_send_message_length', 10 * 1024 * 1024),
# Keep-alive
('grpc.keepalive_time_ms', 10000),
('grpc.keepalive_timeout_ms', 5000),
('grpc.keepalive_permit_without_calls', True),
# 重试配置
('grpc.enable_retries', True),
# 服务配置(JSON 格式)
('grpc.service_config', json.dumps({
'methodConfig': [{
'name': [{'service': 'demo.Greeter'}],
'retryPolicy': {
'maxAttempts': 3,
'initialBackoff': '0.1s',
'maxBackoff': '1s',
'backoffMultiplier': 2,
'retryableStatusCodes': ['UNAVAILABLE']
}
}]
}))
]

return grpc.insecure_channel('localhost:50051', options=options)

元数据传递

def call_with_metadata(stub):
"""带元数据的调用"""

# 创建元数据
metadata = [
('authorization', 'Bearer my-token'),
('request-id', 'req-12345'),
('client-version', '1.0.0'),
]

request = demo_pb2.HelloRequest(name="测试用户")

# 发送带元数据的请求
response = stub.SayHello(
request,
metadata=metadata
)

print(f"响应: {response.message}")


def receive_metadata(stub):
"""接收响应元数据"""

request = demo_pb2.HelloRequest(name="测试用户")

# 调用并获取初始元数据和尾随元数据
response, call = stub.SayHello.with_call(request)

# 获取响应初始元数据
initial_metadata = dict(call.initial_metadata())
print(f"初始元数据: {initial_metadata}")

# 获取尾随元数据
trailing_metadata = dict(call.trailing_metadata())
print(f"尾随元数据: {trailing_metadata}")

超时和取消

def call_with_timeout(stub):
"""带超时的调用"""

request = demo_pb2.HelloRequest(name="测试用户")

try:
# 设置 3 秒超时
response = stub.SayHello(request, timeout=3)
print(f"响应: {response.message}")
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
print("请求超时")
else:
print(f"错误: {e.code()} - {e.details()}")


def call_with_cancel(stub):
"""可取消的调用"""

request = demo_pb2.HelloRequest(name="测试用户", count=10)

# 使用 future 进行异步调用
future = stub.StreamGreetings.future(request)

# 等待一段时间后取消
time.sleep(1)
future.cancel()

if future.cancelled():
print("请求已取消")
else:
try:
for response in future.result():
print(f"响应: {response.message}")
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.CANCELLED:
print("请求被取消")

错误处理

服务端返回错误

from grpc import StatusCode

class GreeterServicer(demo_pb2_grpc.GreeterServicer):

def SayHello(self, request, context):
# 参数验证
if not request.name:
# 使用 context.abort 返回错误
context.abort(
StatusCode.INVALID_ARGUMENT,
"name 不能为空"
)

# 模拟业务逻辑错误
if request.name == "error":
context.abort(
StatusCode.INTERNAL,
"内部处理错误"
)

# 正常返回
return demo_pb2.HelloReply(message=f"你好, {request.name}!")


# 使用 grpc-interceptor 抛出异常
from grpc_interceptor.exceptions import GrpcException, NotFound

class GreeterServicerV2(demo_pb2_grpc.GreeterServicer):

def SayHello(self, request, context):
if not request.name:
raise GrpcException(
status_code=StatusCode.INVALID_ARGUMENT,
details="name 不能为空"
)

return demo_pb2.HelloReply(message=f"你好, {request.name}!")

客户端处理错误

def handle_errors(stub):
"""处理各种错误"""

request = demo_pb2.HelloRequest(name="error")

try:
response = stub.SayHello(request)
print(f"响应: {response.message}")

except grpc.RpcError as e:
# e 是 grpc.RpcError 实例
code = e.code() # 错误码
details = e.details() # 错误详情

print(f"错误码: {code}")
print(f"错误详情: {details}")

# 根据错误码处理
if code == StatusCode.INVALID_ARGUMENT:
print("参数错误,请检查输入")
elif code == StatusCode.NOT_FOUND:
print("资源不存在")
elif code == StatusCode.PERMISSION_DENIED:
print("权限不足")
elif code == StatusCode.UNAUTHENTICATED:
print("未认证,请先登录")
elif code == StatusCode.DEADLINE_EXCEEDED:
print("请求超时")
elif code == StatusCode.UNAVAILABLE:
print("服务不可用,请稍后重试")
elif code == StatusCode.INTERNAL:
print("服务器内部错误")
else:
print(f"未知错误: {code}")

异步 gRPC(asyncio)

Python gRPC 从 1.32 版本开始支持 asyncio,可以构建高性能异步服务。

异步服务端

# server/async_server.py
import grpc
from grpc import aio
import asyncio

import demo_pb2
import demo_pb2_grpc


class AsyncGreeterServicer(demo_pb2_grpc.GreeterServicer):
"""异步服务实现"""

async def SayHello(self, request, context):
"""异步一元 RPC"""
# 可以使用 await 调用其他异步函数
await asyncio.sleep(0.1) # 模拟异步操作

return demo_pb2.HelloReply(
message=f"你好, {request.name}!"
)

async def StreamGreetings(self, request, context):
"""异步服务端流"""
for i in range(request.count):
# 异步等待
await asyncio.sleep(0.5)

yield demo_pb2.HelloReply(
message=f"问候 {i+1}: 你好, {request.name}!"
)

async def SendGreetings(self, request_iterator, context):
"""异步客户端流"""
names = []

# 异步遍历请求流
async for request in request_iterator:
names.append(request.name)

return demo_pb2.HelloReply(
message=f"收到来自 {len(names)} 人的问候"
)

async def Chat(self, request_iterator, context):
"""异步双向流"""
async for request in request_iterator:
yield demo_pb2.HelloReply(
message=f"收到: {request.name}"
)


async def serve():
"""启动异步服务器"""
server = aio.server()

demo_pb2_grpc.add_GreeterServicer_to_server(
AsyncGreeterServicer(), server
)

server.add_insecure_port('[::]:50051')

await server.start()
print("异步服务器启动,监听端口 50051")

await server.wait_for_termination()


if __name__ == '__main__':
asyncio.run(serve())

异步客户端

# client/async_client.py
import grpc
from grpc import aio
import asyncio

import demo_pb2
import demo_pb2_grpc


async def run_unary(stub):
"""异步一元 RPC"""
request = demo_pb2.HelloRequest(name="异步用户")

# 异步调用
response = await stub.SayHello(request)
print(f"响应: {response.message}")


async def run_server_stream(stub):
"""异步服务端流"""
request = demo_pb2.HelloRequest(name="异步用户", count=3)

# 异步迭代
async for response in stub.StreamGreetings(request):
print(f"收到: {response.message}")


async def run_client_stream(stub):
"""异步客户端流"""
# 异步生成器
async def generate_requests():
names = ["用户1", "用户2", "用户3"]
for name in names:
yield demo_pb2.HelloRequest(name=name)
await asyncio.sleep(0.5)

response = await stub.SendGreetings(generate_requests())
print(f"响应: {response.message}")


async def run_bidirectional(stub):
"""异步双向流"""
async def generate_requests():
for i in range(3):
yield demo_pb2.HelloRequest(name=f"消息{i}")
await asyncio.sleep(1)

async for response in stub.Chat(generate_requests()):
print(f"收到: {response.message}")


async def main():
# 创建异步通道
async with aio.insecure_channel('localhost:50051') as channel:
stub = demo_pb2_grpc.GreeterStub(channel)

await run_unary(stub)
await run_server_stream(stub)
await run_client_stream(stub)
await run_bidirectional(stub)


if __name__ == '__main__':
asyncio.run(main())

健康检查

服务端健康检查

from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from grpc_health.v1.health_servicer import HealthServicer

def serve_with_health():
"""带健康检查的服务器"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

# 注册业务服务
demo_pb2_grpc.add_GreeterServicer_to_server(
GreeterServicer(), server
)

# 注册健康检查服务
health_servicer = HealthServicer()
health_pb2_grpc.add_HealthServicer_to_server(
health_servicer, server
)

# 设置服务状态
# 参数: 服务名, 状态
# 服务名为空字符串表示整个服务器
health_servicer.set("", health_pb2.HealthCheckResponse.SERVING)
health_servicer.set("demo.Greeter", health_pb2.HealthCheckResponse.SERVING)

server.add_insecure_port('[::]:50051')
server.start()

# 模拟服务状态变化
def mark_unhealthy():
health_servicer.set("demo.Greeter", health_pb2.HealthCheckResponse.NOT_SERVING)

server.wait_for_termination()

客户端健康检查

from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc

def check_health(channel):
"""手动健康检查"""
stub = health_pb2_grpc.HealthStub(channel)

try:
# 检查整个服务器
response = stub.Check(health_pb2.HealthCheckRequest())
print(f"服务器状态: {health_pb2.HealthCheckResponse.ServingStatus.Name(response.status)}")

# 检查特定服务
response = stub.Check(
health_pb2.HealthCheckRequest(service="demo.Greeter")
)
print(f"Greeter 服务状态: {health_pb2.HealthCheckResponse.ServingStatus.Name(response.status)}")

except grpc.RpcError as e:
print(f"健康检查失败: {e.code()}")

最佳实践

1. 连接复用

# 推荐:使用单例模式复用连接
class GRPCClient:
_instance = None
_channel = None
_stub = None

def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._channel = grpc.insecure_channel('localhost:50051')
cls._stub = demo_pb2_grpc.GreeterStub(cls._channel)
return cls._instance

@property
def stub(self):
return self._stub

def close(self):
if self._channel:
self._channel.close()


# 使用
client = GRPCClient()
response = client.stub.SayHello(request)

2. 资源清理

# 使用上下文管理器
from contextlib import contextmanager

@contextmanager
def grpc_channel(address):
channel = grpc.insecure_channel(address)
try:
yield channel
finally:
channel.close()

# 使用
with grpc_channel('localhost:50051') as channel:
stub = demo_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(request)

3. 异常处理包装

def grpc_call(func):
"""gRPC 调用装饰器"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except grpc.RpcError as e:
# 统一的错误处理
if e.code() == grpc.StatusCode.UNAVAILABLE:
raise ServiceUnavailableError("服务不可用")
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
raise TimeoutError("请求超时")
else:
raise
return wrapper

@grpc_call
def call_service(stub, request):
return stub.SayHello(request)

4. 性能优化建议

Python gRPC 有一些特有的性能特点需要注意,这些特点与其他语言(如 Go、Java、C++)的 gRPC 实现不同。

核心性能特点

根据 gRPC 官方文档,Python gRPC 的流式 RPC 会为接收和发送消息创建额外的线程,这使得流式 RPC 在 Python 中比一元 RPC 慢得多。这与 Go、Java 等语言的 gRPC 实现有显著不同。

避免使用同步栈中的 future API

在 Python 同步栈中使用 stub.Method.future() 会创建额外的线程,增加开销。这是 Python gRPC 特有的性能陷阱:

# ❌ 不推荐:在同步栈中使用 future API
# 这会创建额外线程,增加性能开销
future = stub.SayHello.future(request)
response = future.result()

# ✅ 推荐:直接使用同步调用
response = stub.SayHello(request)

# ✅ 推荐:使用 asyncio 异步 API(Python 3.7+)
response = await stub.SayHello(request)

为什么 future API 会创建额外线程?

Python gRPC 的同步 API 基于 C 核心库,而 future API 需要一个独立线程来等待异步操作完成,这就是性能损失的原因。如果你需要在同步代码中进行并发调用,应该考虑迁移到 asyncio API。

流式 RPC 性能考虑

Python gRPC 的流式 RPC 会创建额外线程处理消息,性能不如一元 RPC:

RPC 类型线程开销性能特点建议
一元 RPC最小最优,无额外线程优先使用
服务端流接收线程较慢,每条消息有线程切换开销数据量小时可用
客户端流发送线程较慢,有发送线程开销使用 asyncio 改善
双向流两个线程最慢,双倍线程开销高性能场景用 Go/C++

使用 asyncio 提升性能

对于高性能场景,强烈推荐使用 asyncio。asyncio API 完全在单个线程中运行,避免了线程切换开销:

import grpc
from grpc import aio
import asyncio

async def high_performance_client():
# 使用异步通道
async with aio.insecure_channel('localhost:50051') as channel:
stub = demo_pb2_grpc.GreeterStub(channel)

# 并发发送多个请求(真正的并发,无额外线程)
tasks = [
stub.SayHello(demo_pb2.HelloRequest(name=f"User{i}"))
for i in range(100)
]
responses = await asyncio.gather(*tasks)
return responses

# 运行
asyncio.run(high_performance_client())

asyncio vs 同步 API 性能对比

# 同步 API:100 个请求会串行执行
def sync_client():
for i in range(100):
response = stub.SayHello(request) # 每次等待响应

# asyncio API:100 个请求并发执行
async def async_client():
tasks = [stub.SayHello(request) for _ in range(100)]
responses = await asyncio.gather(*tasks) # 并发执行

实验性单线程服务端流

对于服务端流 RPC,可以使用实验性的单线程实现来减少线程开销。这个选项可以让服务端流 RPC 的性能接近一元 RPC:

# 启用实验性的单线程服务端流实现
channel = grpc.insecure_channel(
'localhost:50051',
options=[
('grpc.experimental.single_threaded_unary_stream', True),
]
)
# 可以节省约 7% 的每消息延迟

[!WARNING] 此选项是实验性的,未来版本可能会更改或移除。在生产环境使用前请充分测试。

性能优化总结

  1. 优先使用一元 RPC:Python 中一元 RPC 性能最优
  2. 避免同步栈中的 future API:会创建额外线程
  3. 高性能场景使用 asyncio:真正的单线程并发
  4. 考虑实验性单线程流:服务端流场景可尝试
  5. 极端性能需求考虑 Go/C++:Python gRPC 性能上限有限

小结

本章我们学习了:

  1. 环境准备:安装 gRPC Python 库和代码生成工具
  2. 服务端开发:实现各种 RPC 类型、配置选项、拦截器
  3. 客户端开发:同步/异步调用、元数据传递、超时处理
  4. 错误处理:服务端返回错误、客户端处理错误
  5. 异步 gRPC:使用 asyncio 构建高性能服务
  6. 健康检查:服务端和客户端的健康检查实现
  7. 最佳实践:连接复用、资源清理、异常处理、性能优化

Python gRPC 开发相对简单,适合快速构建微服务。对于高性能场景,建议使用 asyncio 异步 API,并注意避免使用同步栈中的 future API。

[!TIP] Python gRPC 的流式 RPC 会创建额外的线程,性能不如一元 RPC。如果需要高性能流式处理,建议考虑 Go 或 C++ 实现。对于服务端流,可以尝试实验性的单线程实现选项来减少开销。