跳到主要内容

异步网络编程

asyncio 提供了完整的异步网络编程支持,包括 TCP 客户端/服务器、UDP 通信等。本章介绍如何使用 asyncio 进行网络编程。

Streams API 概述

asyncio 提供两种网络编程 API:

API 层级特点适用场景
Streams(高层)使用 StreamReader/StreamWriter,语法简洁大多数应用开发
Transports/Protocols(底层)基于回调,性能更高框架开发、高性能场景

本教程主要介绍高层 Streams API,它使用 async/await 语法,更容易理解和使用。

TCP 客户端

基本连接

使用 asyncio.open_connection() 建立 TCP 连接:

import asyncio

async def tcp_client():
# 建立连接,返回 reader 和 writer
reader, writer = await asyncio.open_connection(
host='127.0.0.1',
port=8888
)

# 发送数据
message = "Hello Server"
writer.write(message.encode())
await writer.drain() # 等待数据发送完成

# 接收数据
data = await reader.read(100) # 最多读取 100 字节
print(f"收到: {data.decode()}")

# 关闭连接
writer.close()
await writer.wait_closed()

asyncio.run(tcp_client())

open_connection() 返回两个对象:

  • StreamReader:用于读取数据
  • StreamWriter:用于写入数据

StreamReader 常用方法

StreamReader 提供多种读取数据的方式:

import asyncio

async def read_examples():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

# 1. read(n) - 读取最多 n 字节
data = await reader.read(1024)

# 2. read() - 不指定大小,读取直到 EOF
all_data = await reader.read()

# 3. readline() - 读取一行(以 \n 结尾)
line = await reader.readline()

# 4. readuntil(separator) - 读取直到指定分隔符
data = await reader.readuntil(b'\n')

# 5. readexactly(n) - 精确读取 n 字节
# 如果不足 n 字节,抛出 IncompleteReadError
exact_data = await reader.readexactly(10)

# 6. at_eof() - 检查是否到达末尾
if reader.at_eof():
print("已到达数据末尾")

writer.close()
await writer.wait_closed()

各方法的区别:

方法说明适用场景
read(n)最多读取 n 字节不确定数据量
read()读取直到 EOF读取完整响应
readline()读取一行文本协议(如 HTTP)
readuntil()读取到分隔符自定义协议
readexactly()精确读取 n 字节固定长度协议

StreamWriter 常用方法

StreamWriter 用于发送数据:

import asyncio

async def write_examples():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

# 1. write(data) - 写入数据
writer.write(b'Hello')

# 2. writelines(lines) - 写入多行
writer.writelines([b'Line1\n', b'Line2\n'])

# 3. drain() - 等待发送缓冲区清空
await writer.drain()

# 4. write_eof() - 发送 EOF
writer.write_eof()

# 5. close() - 关闭连接
writer.close()

# 6. wait_closed() - 等待连接完全关闭
await writer.wait_closed()

# 7. get_extra_info() - 获取额外信息
peername = writer.get_extra_info('peername') # 对端地址
socket = writer.get_extra_info('socket') # 底层 socket

重要write() 方法不会阻塞,数据会被放入发送缓冲区。必须调用 await writer.drain() 等待数据实际发送出去。

完整的 TCP 客户端示例

import asyncio

class TCPClient:
"""TCP 客户端封装"""

def __init__(self, host, port):
self.host = host
self.port = port
self.reader = None
self.writer = None

async def connect(self):
"""建立连接"""
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port
)
print(f"已连接到 {self.host}:{self.port}")

async def send(self, message):
"""发送消息"""
if self.writer is None:
raise RuntimeError("未连接")

self.writer.write(message.encode())
await self.writer.drain()
print(f"发送: {message}")

async def receive(self, size=1024):
"""接收消息"""
if self.reader is None:
raise RuntimeError("未连接")

data = await self.reader.read(size)
return data.decode()

async def close(self):
"""关闭连接"""
if self.writer:
self.writer.close()
await self.writer.wait_closed()
print("连接已关闭")

async def main():
client = TCPClient('127.0.0.1', 8888)

try:
await client.connect()

# 发送多条消息
for i in range(3):
await client.send(f"消息 {i}")
response = await client.receive()
print(f"收到响应: {response}")

except ConnectionRefusedError:
print("无法连接到服务器")
finally:
await client.close()

asyncio.run(main())

TCP 服务器

基本服务器

使用 asyncio.start_server() 创建 TCP 服务器:

import asyncio

async def handle_client(reader, writer):
"""处理客户端连接"""
# 获取客户端地址
addr = writer.get_extra_info('peername')
print(f"新连接: {addr}")

try:
while True:
# 读取数据
data = await reader.read(1024)
if not data: # 连接关闭
break

message = data.decode()
print(f"收到 {addr}: {message}")

# 发送响应
response = f"Echo: {message}"
writer.write(response.encode())
await writer.drain()

except ConnectionError:
print(f"连接错误: {addr}")

finally:
writer.close()
await writer.wait_closed()
print(f"连接关闭: {addr}")

async def main():
# 创建服务器
server = await asyncio.start_server(
handle_client,
host='127.0.0.1',
port=8888
)

addr = server.sockets[0].getsockname()
print(f"服务器运行在 {addr}")

# 运行服务器
async with server:
await server.serve_forever()

asyncio.run(main())

服务器配置选项

start_server() 支持多种配置参数:

import asyncio

async def handle_client(reader, writer):
# 处理逻辑
pass

async def main():
server = await asyncio.start_server(
handle_client,

# 监听地址
host='0.0.0.0', # 所有网卡
port=8888,

# 连接队列
backlog=100, # 最大等待连接数

# 重用地址
reuse_address=True, # 允许重用处于 TIME_WAIT 的地址

# SSL/TLS
# ssl=ssl_context,

# 开始服务
start_serving=True, # 立即开始接受连接
)

async with server:
await server.serve_forever()

多客户端并发处理

asyncio 服务器自动并发处理多个客户端:

import asyncio

async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f"客户端连接: {addr}")

# 模拟处理时间
await asyncio.sleep(2)

writer.write(b"处理完成")
await writer.drain()

writer.close()
await writer.wait_closed()
print(f"客户端断开: {addr}")

async def main():
server = await asyncio.start_server(
handle_client,
'127.0.0.1', 8888
)

print("服务器启动,可以同时处理多个客户端")

async with server:
await server.serve_forever()

# 同时连接多个客户端,它们会并发处理
asyncio.run(main())

服务器状态管理

import asyncio

class ChatServer:
"""简单的聊天服务器"""

def __init__(self):
self.clients = set() # 存储所有客户端

async def handle_client(self, reader, writer):
addr = writer.get_extra_info('peername')
self.clients.add(writer)

print(f"客户端加入: {addr}, 当前在线: {len(self.clients)}")

try:
# 发送欢迎消息
writer.write(f"欢迎! 当前在线 {len(self.clients)} 人\n".encode())
await writer.drain()

while True:
data = await reader.read(1024)
if not data:
break

message = data.decode().strip()

# 广播给所有客户端
broadcast = f"[{addr}] {message}\n"
for client in self.clients:
if client != writer:
client.write(broadcast.encode())
await client.drain()

except ConnectionError:
pass

finally:
self.clients.remove(writer)
writer.close()
await writer.wait_closed()
print(f"客户端离开: {addr}, 当前在线: {len(self.clients)}")

async def main():
server = ChatServer()

async with await asyncio.start_server(
server.handle_client,
'127.0.0.1', 8888
) as s:
print("聊天服务器运行在 127.0.0.1:8888")
await s.serve_forever()

asyncio.run(main())

UDP 编程

UDP 是无连接的协议,使用 asyncio.DatagramProtocol 处理。

UDP 服务器

import asyncio

class EchoServerProtocol(asyncio.DatagramProtocol):
"""UDP Echo 服务器协议"""

def connection_made(self, transport):
self.transport = transport

def datagram_received(self, data, addr):
"""收到数据报"""
message = data.decode()
print(f"收到 {addr}: {message}")

# 发送响应
self.transport.sendto(data, addr)

async def main():
loop = asyncio.get_running_loop()

# 创建 UDP 端点
transport, protocol = await loop.create_datagram_endpoint(
EchoServerProtocol,
local_addr=('127.0.0.1', 9999)
)

print("UDP 服务器运行在 127.0.0.1:9999")

try:
await asyncio.sleep(3600) # 运行 1 小时
finally:
transport.close()

asyncio.run(main())

UDP 客户端

import asyncio

class UDPClientProtocol(asyncio.DatagramProtocol):
"""UDP 客户端协议"""

def __init__(self, message, on_response):
self.message = message
self.on_response = on_response

def connection_made(self, transport):
# 发送数据
transport.sendto(self.message.encode())

def datagram_received(self, data, addr):
# 收到响应
self.on_response(data.decode())

def error_received(self, exc):
print(f"错误: {exc}")

async def send_udp_message(message, host, port):
"""发送 UDP 消息并等待响应"""
loop = asyncio.get_running_loop()
future = loop.create_future()

def on_response(data):
future.set_result(data)

transport, protocol = await loop.create_datagram_endpoint(
lambda: UDPClientProtocol(message, on_response),
remote_addr=(host, port)
)

try:
# 等待响应(带超时)
response = await asyncio.wait_for(future, timeout=5)
return response
finally:
transport.close()

async def main():
response = await send_udp_message("Hello UDP", '127.0.0.1', 9999)
print(f"响应: {response}")

asyncio.run(main())

SSL/TLS 加密连接

SSL 客户端

import asyncio
import ssl

async def ssl_client():
# 创建 SSL 上下文
ssl_context = ssl.create_default_context()

# 连接到 HTTPS 服务器
reader, writer = await asyncio.open_connection(
'www.example.com',
443,
ssl=ssl_context
)

# 发送 HTTP 请求
request = b"GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n"
writer.write(request)
await writer.drain()

# 读取响应
response = await reader.read(4096)
print(response.decode())

writer.close()
await writer.wait_closed()

asyncio.run(ssl_client())

SSL 服务器

import asyncio
import ssl

async def handle_client(reader, writer):
data = await reader.read(1024)
writer.write(b"SSL connection received")
await writer.drain()
writer.close()
await writer.wait_closed()

async def main():
# 创建 SSL 上下文
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain('server.crt', 'server.key')

server = await asyncio.start_server(
handle_client,
'127.0.0.1', 8888,
ssl=ssl_context
)

async with server:
await server.serve_forever()

asyncio.run(main())

Unix 域套接字

Unix 域套接字用于同一台机器上的进程间通信,比 TCP 更快。

Unix 套接字服务器

import asyncio
import os

SOCKET_PATH = '/tmp/mysocket.sock'

async def handle_client(reader, writer):
data = await reader.read(1024)
writer.write(b"Unix socket response")
await writer.drain()
writer.close()
await writer.wait_closed()

async def main():
# 删除已存在的套接字文件
if os.path.exists(SOCKET_PATH):
os.unlink(SOCKET_PATH)

server = await asyncio.start_unix_server(
handle_client,
path=SOCKET_PATH
)

print(f"Unix 套接字服务器: {SOCKET_PATH}")

async with server:
await server.serve_forever()

asyncio.run(main())

Unix 套接字客户端

import asyncio

SOCKET_PATH = '/tmp/mysocket.sock'

async def main():
reader, writer = await asyncio.open_unix_connection(SOCKET_PATH)

writer.write(b"Hello Unix Socket")
await writer.drain()

data = await reader.read(1024)
print(f"收到: {data.decode()}")

writer.close()
await writer.wait_closed()

asyncio.run(main())

实践示例

示例 1:HTTP 客户端

手动实现一个简单的 HTTP 客户端:

import asyncio

async def http_get(host, port, path):
"""简单的 HTTP GET 请求"""
reader, writer = await asyncio.open_connection(host, port)

# 构造 HTTP 请求
request = (
f"GET {path} HTTP/1.1\r\n"
f"Host: {host}\r\n"
f"Connection: close\r\n"
f"\r\n"
)

writer.write(request.encode())
await writer.drain()

# 读取响应头
headers = []
while True:
line = await reader.readline()
if line == b'\r\n':
break
headers.append(line.decode().strip())

# 读取响应体
body = await reader.read()

writer.close()
await writer.wait_closed()

return headers, body

async def main():
headers, body = await http_get('www.example.com', 80, '/')

print("响应头:")
for h in headers[:5]:
print(f" {h}")

print(f"\n响应体长度: {len(body)} 字节")

asyncio.run(main())

示例 2:端口扫描器

import asyncio

async def check_port(host, port, timeout=1):
"""检查端口是否开放"""
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=timeout
)
writer.close()
await writer.wait_closed()
return True
except (asyncio.TimeoutError, ConnectionRefusedError, OSError):
return False

async def port_scanner(host, start_port, end_port):
"""扫描端口范围"""
print(f"扫描 {host} 端口 {start_port}-{end_port}")

# 并发检查所有端口
tasks = [
asyncio.create_task(check_port(host, port))
for port in range(start_port, end_port + 1)
]

results = await asyncio.gather(*tasks)

# 打印开放端口
for port, is_open in enumerate(results, start=start_port):
if is_open:
print(f"端口 {port} 开放")

async def main():
await port_scanner('127.0.0.1', 80, 100)

asyncio.run(main())

示例 3:简单代理服务器

import asyncio

async def handle_proxy(client_reader, client_writer, target_host, target_port):
"""代理客户端请求到目标服务器"""
try:
# 连接目标服务器
target_reader, target_writer = await asyncio.open_connection(
target_host, target_port
)

# 双向转发数据
async def forward(reader, writer, direction):
try:
while True:
data = await reader.read(4096)
if not data:
break
writer.write(data)
await writer.drain()
except ConnectionError:
pass

# 并发转发两个方向的数据
await asyncio.gather(
forward(client_reader, target_writer, "client->target"),
forward(target_reader, client_writer, "target->client"),
)

except ConnectionError as e:
print(f"连接错误: {e}")

finally:
client_writer.close()
await client_writer.wait_closed()

async def main():
target_host = 'www.example.com'
target_port = 80

server = await asyncio.start_server(
lambda r, w: handle_proxy(r, w, target_host, target_port),
'127.0.0.1', 8888
)

print(f"代理服务器运行在 127.0.0.1:8888 -> {target_host}:{target_port}")

async with server:
await server.serve_forever()

asyncio.run(main())

小结

asyncio 网络编程的核心 API:

API用途
open_connection()创建 TCP 客户端连接
start_server()创建 TCP 服务器
StreamReader异步读取数据
StreamWriter异步写入数据
create_datagram_endpoint()创建 UDP 端点

最佳实践:

  1. 始终使用 async with 确保资源正确释放
  2. 写入数据后调用 await writer.drain()
  3. 使用超时避免无限等待
  4. 正确处理 ConnectionError 等异常
  5. 关闭连接时调用 wait_closed() 确保完全关闭

对于生产环境,建议使用成熟的异步网络库如 aiohttphttpx,它们提供了更完善的功能和更好的错误处理。