异步网络编程
asyncio 提供了完整的异步网络编程支持,包括 TCP 客户端/服务器、UDP 通信等。本章介绍如何使用 asyncio 进行网络编程。
Streams API 概述
asyncio 提供两种网络编程 API:
| API 层级 | 特点 | 适用场景 |
|---|---|---|
| Streams(高层) | 使用 StreamReader/StreamWriter,语法简洁 | 大多数应用开发 |
| Transports/Protocols(底层) | 基于回调,性能更高 | 框架开发、高性能场景 |
本教程主要介绍高层 Streams API,它使用 async/await 语法,更容易理解和使用。
TCP 客户端
基本连接
使用 asyncio.open_connection() 建立 TCP 连接:
import asyncio
async def tcp_client():
# 建立连接,返回 reader 和 writer
reader, writer = await asyncio.open_connection(
host='127.0.0.1',
port=8888
)
# 发送数据
message = "Hello Server"
writer.write(message.encode())
await writer.drain() # 等待数据发送完成
# 接收数据
data = await reader.read(100) # 最多读取 100 字节
print(f"收到: {data.decode()}")
# 关闭连接
writer.close()
await writer.wait_closed()
asyncio.run(tcp_client())
open_connection() 返回两个对象:
- StreamReader:用于读取数据
- StreamWriter:用于写入数据
StreamReader 常用方法
StreamReader 提供多种读取数据的方式:
import asyncio
async def read_examples():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
# 1. read(n) - 读取最多 n 字节
data = await reader.read(1024)
# 2. read() - 不指定大小,读取直到 EOF
all_data = await reader.read()
# 3. readline() - 读取一行(以 \n 结尾)
line = await reader.readline()
# 4. readuntil(separator) - 读取直到指定分隔符
data = await reader.readuntil(b'\n')
# 5. readexactly(n) - 精确读取 n 字节
# 如果不足 n 字节,抛出 IncompleteReadError
exact_data = await reader.readexactly(10)
# 6. at_eof() - 检查是否到达末尾
if reader.at_eof():
print("已到达数据末尾")
writer.close()
await writer.wait_closed()
各方法的区别:
| 方法 | 说明 | 适用场景 |
|---|---|---|
read(n) | 最多读取 n 字节 | 不确定数据量 |
read() | 读取直到 EOF | 读取完整响应 |
readline() | 读取一行 | 文本协议(如 HTTP) |
readuntil() | 读取到分隔符 | 自定义协议 |
readexactly() | 精确读取 n 字节 | 固定长度协议 |
StreamWriter 常用方法
StreamWriter 用于发送数据:
import asyncio
async def write_examples():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
# 1. write(data) - 写入数据
writer.write(b'Hello')
# 2. writelines(lines) - 写入多行
writer.writelines([b'Line1\n', b'Line2\n'])
# 3. drain() - 等待发送缓冲区清空
await writer.drain()
# 4. write_eof() - 发送 EOF
writer.write_eof()
# 5. close() - 关闭连接
writer.close()
# 6. wait_closed() - 等待连接完全关闭
await writer.wait_closed()
# 7. get_extra_info() - 获取额外信息
peername = writer.get_extra_info('peername') # 对端地址
socket = writer.get_extra_info('socket') # 底层 socket
重要:write() 方法不会阻塞,数据会被放入发送缓冲区。必须调用 await writer.drain() 等待数据实际发送出去。
完整的 TCP 客户端示例
import asyncio
class TCPClient:
"""TCP 客户端封装"""
def __init__(self, host, port):
self.host = host
self.port = port
self.reader = None
self.writer = None
async def connect(self):
"""建立连接"""
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port
)
print(f"已连接到 {self.host}:{self.port}")
async def send(self, message):
"""发送消息"""
if self.writer is None:
raise RuntimeError("未连接")
self.writer.write(message.encode())
await self.writer.drain()
print(f"发送: {message}")
async def receive(self, size=1024):
"""接收消息"""
if self.reader is None:
raise RuntimeError("未连接")
data = await self.reader.read(size)
return data.decode()
async def close(self):
"""关闭连接"""
if self.writer:
self.writer.close()
await self.writer.wait_closed()
print("连接已关闭")
async def main():
client = TCPClient('127.0.0.1', 8888)
try:
await client.connect()
# 发送多条消息
for i in range(3):
await client.send(f"消息 {i}")
response = await client.receive()
print(f"收到响应: {response}")
except ConnectionRefusedError:
print("无法连接到服务器")
finally:
await client.close()
asyncio.run(main())
TCP 服务器
基本服务器
使用 asyncio.start_server() 创建 TCP 服务器:
import asyncio
async def handle_client(reader, writer):
"""处理客户端连接"""
# 获取客户端地址
addr = writer.get_extra_info('peername')
print(f"新连接: {addr}")
try:
while True:
# 读取数据
data = await reader.read(1024)
if not data: # 连接关闭
break
message = data.decode()
print(f"收到 {addr}: {message}")
# 发送响应
response = f"Echo: {message}"
writer.write(response.encode())
await writer.drain()
except ConnectionError:
print(f"连接错误: {addr}")
finally:
writer.close()
await writer.wait_closed()
print(f"连接关闭: {addr}")
async def main():
# 创建服务器
server = await asyncio.start_server(
handle_client,
host='127.0.0.1',
port=8888
)
addr = server.sockets[0].getsockname()
print(f"服务器运行在 {addr}")
# 运行服务器
async with server:
await server.serve_forever()
asyncio.run(main())
服务器配置选项
start_server() 支持多种配置参数:
import asyncio
async def handle_client(reader, writer):
# 处理逻辑
pass
async def main():
server = await asyncio.start_server(
handle_client,
# 监听地址
host='0.0.0.0', # 所有网卡
port=8888,
# 连接队列
backlog=100, # 最大等待连接数
# 重用地址
reuse_address=True, # 允许重用处于 TIME_WAIT 的地址
# SSL/TLS
# ssl=ssl_context,
# 开始服务
start_serving=True, # 立即开始接受连接
)
async with server:
await server.serve_forever()
多客户端并发处理
asyncio 服务器自动并发处理多个客户端:
import asyncio
async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f"客户端连接: {addr}")
# 模拟处理时间
await asyncio.sleep(2)
writer.write(b"处理完成")
await writer.drain()
writer.close()
await writer.wait_closed()
print(f"客户端断开: {addr}")
async def main():
server = await asyncio.start_server(
handle_client,
'127.0.0.1', 8888
)
print("服务器启动,可以同时处理多个客户端")
async with server:
await server.serve_forever()
# 同时连接多个客户端,它们会并发处理
asyncio.run(main())
服务器状态管理
import asyncio
class ChatServer:
"""简单的聊天服务器"""
def __init__(self):
self.clients = set() # 存储所有客户端
async def handle_client(self, reader, writer):
addr = writer.get_extra_info('peername')
self.clients.add(writer)
print(f"客户端加入: {addr}, 当前在线: {len(self.clients)}")
try:
# 发送欢迎消息
writer.write(f"欢迎! 当前在线 {len(self.clients)} 人\n".encode())
await writer.drain()
while True:
data = await reader.read(1024)
if not data:
break
message = data.decode().strip()
# 广播给所有客户端
broadcast = f"[{addr}] {message}\n"
for client in self.clients:
if client != writer:
client.write(broadcast.encode())
await client.drain()
except ConnectionError:
pass
finally:
self.clients.remove(writer)
writer.close()
await writer.wait_closed()
print(f"客户端离开: {addr}, 当前在线: {len(self.clients)}")
async def main():
server = ChatServer()
async with await asyncio.start_server(
server.handle_client,
'127.0.0.1', 8888
) as s:
print("聊天服务器运行在 127.0.0.1:8888")
await s.serve_forever()
asyncio.run(main())
UDP 编程
UDP 是无连接的协议,使用 asyncio.DatagramProtocol 处理。
UDP 服务器
import asyncio
class EchoServerProtocol(asyncio.DatagramProtocol):
"""UDP Echo 服务器协议"""
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
"""收到数据报"""
message = data.decode()
print(f"收到 {addr}: {message}")
# 发送响应
self.transport.sendto(data, addr)
async def main():
loop = asyncio.get_running_loop()
# 创建 UDP 端点
transport, protocol = await loop.create_datagram_endpoint(
EchoServerProtocol,
local_addr=('127.0.0.1', 9999)
)
print("UDP 服务器运行在 127.0.0.1:9999")
try:
await asyncio.sleep(3600) # 运行 1 小时
finally:
transport.close()
asyncio.run(main())
UDP 客户端
import asyncio
class UDPClientProtocol(asyncio.DatagramProtocol):
"""UDP 客户端协议"""
def __init__(self, message, on_response):
self.message = message
self.on_response = on_response
def connection_made(self, transport):
# 发送数据
transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
# 收到响应
self.on_response(data.decode())
def error_received(self, exc):
print(f"错误: {exc}")
async def send_udp_message(message, host, port):
"""发送 UDP 消息并等待响应"""
loop = asyncio.get_running_loop()
future = loop.create_future()
def on_response(data):
future.set_result(data)
transport, protocol = await loop.create_datagram_endpoint(
lambda: UDPClientProtocol(message, on_response),
remote_addr=(host, port)
)
try:
# 等待响应(带超时)
response = await asyncio.wait_for(future, timeout=5)
return response
finally:
transport.close()
async def main():
response = await send_udp_message("Hello UDP", '127.0.0.1', 9999)
print(f"响应: {response}")
asyncio.run(main())
SSL/TLS 加密连接
SSL 客户端
import asyncio
import ssl
async def ssl_client():
# 创建 SSL 上下文
ssl_context = ssl.create_default_context()
# 连接到 HTTPS 服务器
reader, writer = await asyncio.open_connection(
'www.example.com',
443,
ssl=ssl_context
)
# 发送 HTTP 请求
request = b"GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n"
writer.write(request)
await writer.drain()
# 读取响应
response = await reader.read(4096)
print(response.decode())
writer.close()
await writer.wait_closed()
asyncio.run(ssl_client())
SSL 服务器
import asyncio
import ssl
async def handle_client(reader, writer):
data = await reader.read(1024)
writer.write(b"SSL connection received")
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
# 创建 SSL 上下文
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain('server.crt', 'server.key')
server = await asyncio.start_server(
handle_client,
'127.0.0.1', 8888,
ssl=ssl_context
)
async with server:
await server.serve_forever()
asyncio.run(main())
Unix 域套接字
Unix 域套接字用于同一台机器上的进程间通信,比 TCP 更快。
Unix 套接字服务器
import asyncio
import os
SOCKET_PATH = '/tmp/mysocket.sock'
async def handle_client(reader, writer):
data = await reader.read(1024)
writer.write(b"Unix socket response")
await writer.drain()
writer.close()
await writer.wait_closed()
async def main():
# 删除已存在的套接字文件
if os.path.exists(SOCKET_PATH):
os.unlink(SOCKET_PATH)
server = await asyncio.start_unix_server(
handle_client,
path=SOCKET_PATH
)
print(f"Unix 套接字服务器: {SOCKET_PATH}")
async with server:
await server.serve_forever()
asyncio.run(main())
Unix 套接字客户端
import asyncio
SOCKET_PATH = '/tmp/mysocket.sock'
async def main():
reader, writer = await asyncio.open_unix_connection(SOCKET_PATH)
writer.write(b"Hello Unix Socket")
await writer.drain()
data = await reader.read(1024)
print(f"收到: {data.decode()}")
writer.close()
await writer.wait_closed()
asyncio.run(main())
实践示例
示例 1:HTTP 客户端
手动实现一个简单的 HTTP 客户端:
import asyncio
async def http_get(host, port, path):
"""简单的 HTTP GET 请求"""
reader, writer = await asyncio.open_connection(host, port)
# 构造 HTTP 请求
request = (
f"GET {path} HTTP/1.1\r\n"
f"Host: {host}\r\n"
f"Connection: close\r\n"
f"\r\n"
)
writer.write(request.encode())
await writer.drain()
# 读取响应头
headers = []
while True:
line = await reader.readline()
if line == b'\r\n':
break
headers.append(line.decode().strip())
# 读取响应体
body = await reader.read()
writer.close()
await writer.wait_closed()
return headers, body
async def main():
headers, body = await http_get('www.example.com', 80, '/')
print("响应头:")
for h in headers[:5]:
print(f" {h}")
print(f"\n响应体长度: {len(body)} 字节")
asyncio.run(main())
示例 2:端口扫描器
import asyncio
async def check_port(host, port, timeout=1):
"""检查端口是否开放"""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=timeout
)
writer.close()
await writer.wait_closed()
return True
except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
return False
async def port_scanner(host, start_port, end_port):
"""扫描端口范围"""
print(f"扫描 {host} 端口 {start_port}-{end_port}")
# 并发检查所有端口
tasks = [
asyncio.create_task(check_port(host, port))
for port in range(start_port, end_port + 1)
]
results = await asyncio.gather(*tasks)
# 打印开放端口
for port, is_open in enumerate(results, start=start_port):
if is_open:
print(f"端口 {port} 开放")
async def main():
await port_scanner('127.0.0.1', 80, 100)
asyncio.run(main())
示例 3:简单代理服务器
import asyncio
async def handle_proxy(client_reader, client_writer, target_host, target_port):
"""代理客户端请求到目标服务器"""
try:
# 连接目标服务器
target_reader, target_writer = await asyncio.open_connection(
target_host, target_port
)
# 双向转发数据
async def forward(reader, writer, direction):
try:
while True:
data = await reader.read(4096)
if not data:
break
writer.write(data)
await writer.drain()
except ConnectionError:
pass
# 并发转发两个方向的数据
await asyncio.gather(
forward(client_reader, target_writer, "client->target"),
forward(target_reader, client_writer, "target->client"),
)
except ConnectionError as e:
print(f"连接错误: {e}")
finally:
client_writer.close()
await client_writer.wait_closed()
async def main():
target_host = 'www.example.com'
target_port = 80
server = await asyncio.start_server(
lambda r, w: handle_proxy(r, w, target_host, target_port),
'127.0.0.1', 8888
)
print(f"代理服务器运行在 127.0.0.1:8888 -> {target_host}:{target_port}")
async with server:
await server.serve_forever()
asyncio.run(main())
小结
asyncio 网络编程的核心 API:
| API | 用途 |
|---|---|
open_connection() | 创建 TCP 客户端连接 |
start_server() | 创建 TCP 服务器 |
StreamReader | 异步读取数据 |
StreamWriter | 异步写入数据 |
create_datagram_endpoint() | 创建 UDP 端点 |
最佳实践:
- 始终使用
async with确保资源正确释放 - 写入数据后调用
await writer.drain() - 使用超时避免无限等待
- 正确处理
ConnectionError等异常 - 关闭连接时调用
wait_closed()确保完全关闭
对于生产环境,建议使用成熟的异步网络库如 aiohttp 或 httpx,它们提供了更完善的功能和更好的错误处理。