UDP 协议详解
UDP(User Datagram Protocol,用户数据报协议)是传输层的另一个重要协议,由 RFC 768 定义。与 TCP 不同,UDP 提供无连接、不可靠的数据报传输服务,以其简单高效著称。
UDP 概述
UDP 的设计理念是"做尽可能少的事情",将复杂的可靠性机制留给应用层实现。这种设计使得 UDP 具有极低的协议开销和快速的传输速度。
UDP 的核心特性
无连接:发送数据前不需要建立连接,直接发送即可。这省去了 TCP 三次握手的时间开销,适合快速发送少量数据的场景。
不可靠传输:不保证数据到达,不保证顺序,不处理重传。发送方发送数据后不知道是否到达,需要应用层自己处理可靠性。
面向报文:保留报文边界,应用层给什么就发什么。不会像 TCP 那样合并或拆分数据,接收方收到的就是发送方发送的完整报文。
无拥塞控制:网络拥塞时不会降低发送速率。这对实时应用很重要,但也可能导致网络拥塞加剧。
支持多播和广播:可以一次发送给多个接收者。TCP 只支持一对一通信,UDP 支持一对一、一对多、多对多。
首部开销小:UDP 首部只有 8 字节,而 TCP 首部至少 20 字节。
UDP 与 TCP 的对比
| 特性 | UDP | TCP |
|---|---|---|
| 连接 | 无连接 | 面向连接 |
| 可靠性 | 不可靠 | 可靠 |
| 顺序 | 无序 | 有序 |
| 流量控制 | 无 | 滑动窗口 |
| 拥塞控制 | 无 | 慢启动、拥塞避免等 |
| 首部大小 | 8 字节 | 20-60 字节 |
| 传输方式 | 数据报 | 字节流 |
| 多播/广播 | 支持 | 不支持 |
| 适用场景 | 实时通信、DNS、游戏 | 文件传输、邮件、Web |
UDP 数据报格式
UDP 数据报由首部和数据两部分组成,首部固定 8 字节:
0 16 31
+----------------+--------------------------------+
| 源端口 | 目的端口 |
+----------------+--------------------------------+
| 长度 | 校验和 |
+----------------+--------------------------------+
| 数据 |
+------------------------------------------------+
字段详解
源端口(16 位):发送方的端口号,可选字段。如果不需要回复,可以置 0。
目的端口(16 位):接收方的端口号,必填字段。数据到达目的主机后,根据端口号交付给对应的应用进程。
长度(16 位):UDP 数据报的总长度,包括首部和数据,单位是字节。最小值是 8(只有首部),最大值是 65535。由于 IP 首部占用 20 字节,UDP 数据的最大载荷是 65535 - 8 - 20 = 65507 字节。
校验和(16 位):用于检测 UDP 数据报在传输过程中是否出错。计算时包含伪首部、UDP 首部和数据。伪首部包含源 IP、目的 IP、协议号和 UDP 长度,用于验证数据报是否到达正确的目的地。
伪首部
伪首部不是 UDP 数据报的一部分,只在计算校验和时使用:
0 16 31
+----------------+--------------------------------+
| 源 IP 地址 |
+------------------------------------------------+
| 目的 IP 地址 |
+--------+-------+--------------------------------+
| 0 | 协议 | UDP 长度 |
+--------+-------+--------------------------------+
协议字段值为 17,表示 UDP。通过包含 IP 地址,校验和可以验证数据报是否到达正确的目的地。
校验和计算
校验和的计算步骤:
- 构造伪首部,添加到 UDP 数据报前面
- 如果数据长度不是 16 位的整数倍,填充一个全 0 字节(不发送)
- 将所有 16 位字按二进制反码求和
- 结果取反即为校验和
如果计算结果为全 0,校验和字段应设为全 1(因为全 0 表示不使用校验和)。
UDP 的工作过程
发送数据
UDP 发送数据的过程非常简单:
- 应用程序将数据传递给 UDP
- UDP 添加 8 字节首部
- 将 UDP 数据报传递给 IP 层
- IP 层封装后发送
没有连接建立,没有确认机制,发送即完成。
接收数据
UDP 接收数据的过程:
- IP 层收到数据报,检查目的 IP
- 如果是本机 IP,将数据传递给 UDP
- UDP 检查校验和,如果出错则丢弃
- 根据目的端口,将数据传递给对应的应用进程
- 如果没有进程监听该端口,返回 ICMP 端口不可达消息
端口复用和分用
UDP 使用端口号实现多路复用和分用:
复用:多个应用进程可以使用 UDP 发送数据,UDP 通过端口号区分不同的进程。
分用:UDP 收到数据后,根据目的端口号将数据分发给对应的应用进程。
端口号范围 0-65535,其中 0-1023 是知名端口,由 IANA 分配;1024-49151 是注册端口;49152-65535 是动态端口。
UDP 的应用场景
UDP 的特点决定了它适合特定的应用场景。
实时音视频
视频会议、直播、网络电话等实时音视频应用首选 UDP。
原因:
- 实时性比可靠性更重要,丢包可以接受,延迟不能接受
- 音视频编解码器可以处理一定的丢包
- TCP 的重传机制会增加不可预测的延迟
例子:WebRTC、Zoom、微信视频通话
在线游戏
多人在线游戏需要快速响应,UDP 更适合。
原因:
- 游戏状态更新频繁,旧数据很快过时,重传没有意义
- 延迟直接影响游戏体验
- 游戏可以在应用层实现必要的可靠性
例子:王者荣耀、英雄联盟、绝地求生
DNS 查询
DNS 查询通常使用 UDP,响应快速。
原因:
- 查询请求很小,通常一个数据包就够了
- 不需要建立连接的开销
- 快速响应对用户体验很重要
注意:当响应超过 512 字节(传统限制)或需要安全传输时,DNS 也会使用 TCP。
DHCP
DHCP(动态主机配置协议)使用 UDP 进行地址分配。
原因:
- DHCP 客户端还没有 IP 地址,无法建立 TCP 连接
- 需要广播通信,UDP 支持广播
- 简单的请求-响应模式,不需要连接
DHCP 服务器监听端口 67,客户端使用端口 68。
SNMP
SNMP(简单网络管理协议)使用 UDP 进行网络设备监控。
原因:
- 网络设备可能无法处理大量 TCP 连接
- 监控数据丢失可以接受,下一个周期会重新发送
- 减少网络管理流量
流媒体
视频流、音频流等流媒体传输常用 UDP。
原因:
- 流媒体可以容忍一定的丢包
- 实时性要求高
- 应用层可以实现自适应码率
IoT 物联网
物联网设备资源有限,UDP 更适合。
原因:
- 设备计算能力和内存有限,TCP 的复杂开销太大
- 很多场景只需要发送少量数据
- CoAP(受限应用协议)基于 UDP
UDP 的可靠性增强
虽然 UDP 本身不可靠,但应用层可以实现可靠性机制。
应用层确认机制
发送方发送数据后等待确认,超时重传:
import socket
import time
def reliable_send(sock, data, addr, max_retries=3, timeout=1.0):
sock.settimeout(timeout)
for i in range(max_retries):
try:
sock.sendto(data, addr)
ack, _ = sock.recvfrom(1024)
if ack == b'ACK':
return True
except socket.timeout:
continue
return False
序号机制
为每个数据包分配序号,接收方可以检测丢包和乱序:
import struct
def pack_with_seq(seq, data):
return struct.pack('!I', seq) + data
def unpack_with_seq(packet):
seq = struct.unpack('!I', packet[:4])[0]
data = packet[4:]
return seq, data
前向纠错(FEC)
发送冗余数据,接收方可以根据冗余信息恢复丢失的数据:
- 异或 FEC:每 N 个数据包发送一个异或包,丢失一个可以恢复
- Reed-Solomon 编码:可以恢复多个丢失的数据包
拥塞控制
应用层可以实现拥塞控制,避免网络过载:
- 基于丢包:检测到丢包时降低发送速率
- 基于延迟:延迟增加时降低发送速率
- BBR:基于带宽和 RTT 的拥塞控制
QUIC 协议
QUIC(Quick UDP Internet Connections)是 Google 开发的基于 UDP 的可靠传输协议,用于 HTTP/3。
QUIC 在 UDP 之上实现了:
- 可靠传输(类似 TCP 的确认和重传)
- 拥塞控制
- 多路复用(一个连接多个流)
- 前向纠错
- 连接迁移(网络切换不断开)
UDP 编程接口
基本 UDP 通信
服务器端:
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('0.0.0.0', 8080))
print('UDP 服务器启动,等待数据...')
while True:
data, client_addr = server_socket.recvfrom(1024)
print(f'收到来自 {client_addr} 的数据: {data.decode()}')
server_socket.sendto(b'Hello from server', client_addr)
客户端:
import socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_addr = ('127.0.0.1', 8080)
client_socket.sendto(b'Hello from client', server_addr)
data, _ = client_socket.recvfrom(1024)
print(f'收到响应: {data.decode()}')
client_socket.close()
UDP 广播
UDP 支持广播通信:
import socket
# 发送广播
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client_socket.sendto(b'Broadcast message', ('<broadcast>', 8080))
client_socket.close()
import socket
# 接收广播
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
server_socket.bind(('0.0.0.0', 8080))
while True:
data, addr = server_socket.recvfrom(1024)
print(f'收到广播: {data.decode()} from {addr}')
UDP 组播
UDP 支持组播通信,比广播更高效:
import socket
import struct
# 发送组播
multicast_group = ('224.1.1.1', 8080)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ttl = struct.pack('b', 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)
sock.sendto(b'Multicast message', multicast_group)
sock.close()
import socket
import struct
# 接收组播
multicast_group = '224.1.1.1'
server_address = ('', 8080)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(server_address)
group = socket.inet_aton(multicast_group)
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
while True:
data, addr = sock.recvfrom(1024)
print(f'收到组播: {data.decode()} from {addr}')
UDP 常见问题
数据包丢失
UDP 不保证数据到达,丢包是常态。应对方法:
- 应用层实现确认和重传
- 使用前向纠错
- 接受一定程度的丢包
数据包乱序
UDP 不保证顺序,数据可能乱序到达。应对方法:
- 添加序号,接收方根据序号重排
- 接受乱序(某些场景可以)
数据包过大
UDP 数据报过大可能导致 IP 分片,增加丢包概率。应对方法:
- 限制 UDP 数据报大小,不超过 MTU(通常 1500 字节)
- 考虑 IP 首部(20 字节)和 UDP 首部(8 字节),数据不超过 1472 字节
安全问题
UDP 容易受到攻击:
- UDP 洪泛攻击:发送大量 UDP 数据包消耗带宽
- 放大攻击:利用 UDP 服务放大攻击流量
应对方法:
- 限制 UDP 服务的响应速率
- 验证请求来源
- 使用防火墙过滤
UDP 性能优化
在高性能场景下,UDP 的性能调优至关重要。以下从内核参数、Socket 选项、应用层优化等多个层面介绍 UDP 性能优化技术。
内核参数调优
Linux 系统提供了多个 UDP 相关的内核参数:
# 查看 UDP 相关参数
sysctl -a | grep udp
# 常用调优参数
# 增加接收缓冲区大小(字节)
# 默认值通常较小,高吞吐场景需要增大
net.core.rmem_max = 12582912 # 最大接收缓冲区
net.core.rmem_default = 262144 # 默认接收缓冲区
# 增加发送缓冲区大小
net.core.wmem_max = 12582912 # 最大发送缓冲区
net.core.wmem_default = 262144 # 默认发送缓冲区
# UDP 特有的内存限制(页面数)
net.ipv4.udp_mem = 379440 505920 758880 # min pressure max
# UDP 接收缓冲区的最小、默认、最大值
net.ipv4.udp_rmem_min = 16384
net.ipv4.udp_wmem_min = 16384
# 应用到系统
sysctl -p
参数说明:
rmem_max和wmem_max:Socket 缓冲区的最大值,应用层可以通过setsockopt请求更大的缓冲区udp_mem:系统范围 UDP 内存限制,单位是内存页。超过pressure值时开始回收,超过max时拒绝分配- 对于万兆网络或高速数据采集场景,建议将缓冲区设置为 10MB 以上
Socket 选项优化
应用层可以通过 Socket 选项精细控制 UDP 行为:
import socket
def create_optimized_udp_socket():
"""创建优化后的 UDP Socket"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 1. 设置缓冲区大小(需要在绑定前设置)
# 接收缓冲区:10MB
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 10 * 1024 * 1024)
# 发送缓冲区:10MB
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 10 * 1024 * 1024)
# 2. 允许地址重用(快速重启服务)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 3. 设置接收超时(避免无限阻塞)
sock.settimeout(5.0)
# 4. Linux 特定优化
# SO_BUSYBOX:当数据包到达但缓冲区满时,丢弃旧数据包而非新数据包
# 适用于实时数据流,保证接收最新数据
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BUSY_POLL, 50) # 微秒
except:
pass # 不是所有系统都支持
# IP_MTU_DISCOVER:控制 MTU 发现行为
# IP_PMTUDISC_DO:总是设置 DF 标志,禁止分片
# IP_PMTUDISC_DONT:不设置 DF,允许分片
# IP_PMTUDISC_PROBE:设置 DF 但不进行路径 MTU 发现
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MTU_DISCOVER,
socket.IP_PMTUDISC_DONT)
return sock
# 验证缓冲区设置
sock = create_optimized_udp_socket()
rcvbuf = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
sndbuf = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
print(f"接收缓冲区: {rcvbuf // 1024} KB")
print(f"发送缓冲区: {sndbuf // 1024} KB")
批量收发优化
对于高吞吐场景,逐个数据包收发会产生大量系统调用开销。现代 Linux 内核支持批量收发:
import socket
import struct
# Linux 3.14+ 支持 recvmmsg 和 sendmmsg
# 可以一次系统调用收发多个数据包
def send_batch(sock, messages_and_addrs):
"""批量发送 UDP 数据包"""
import ctypes
import array
# 使用 sendmmsg 系统调用
# 这里用循环模拟,实际应使用 sendmmsg
for data, addr in messages_and_addrs:
sock.sendto(data, addr)
# 对于接收端,可以使用 recvmmsg 批量接收
# Python 的 socket 库没有直接暴露 recvmmsg/sendmmsg
# 可以使用 ctypes 或 cython 包装
class MMsgHdr(ctypes.Structure):
"""mmsghdr 结构体(Linux 特有)"""
_fields_ = [
("msg_hdr", ctypes.c_void_p), # msghdr
("msg_len", ctypes.c_uint), # 实际发送/接收的字节数
]
def batch_receive(sock, batch_size=64, buffer_size=2048):
"""批量接收(需要 Linux 支持)"""
# 这里展示概念,实际需要更复杂的实现
messages = []
for _ in range(batch_size):
try:
data, addr = sock.recvfrom(buffer_size)
messages.append((data, addr))
except socket.timeout:
break
return messages
零拷贝优化
对于大流量数据传输,零拷贝技术可以显著减少 CPU 开销:
import socket
import array
def zero_copy_send(sock, data, addr):
"""
使用 sendmsg 实现零拷贝发送
避免数据在用户空间的额外拷贝
"""
# sendmsg 支持分散/聚集 I/O
# 可以发送多个不连续的缓冲区
# 构建多个缓冲区(避免拼接开销)
header = b'HEADER:'
trailer = b':END'
# 使用 iovec 风格的分散写入
ancdata = []
msg_flags = 0
# Python 的 sendmsg 接口
sock.sendmsg([header, data, trailer], ancdata, msg_flags, addr)
多线程/多进程处理
对于 CPU 密集型的 UDP 处理,可以利用 SO_REUSEPORT 实现多进程负载均衡:
import socket
import os
import multiprocessing
def udp_worker(port, worker_id):
"""UDP 工作进程"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# SO_REUSEPORT 允许多个 Socket 绑定同一端口
# 内核会自动进行负载均衡
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind(('0.0.0.0', port))
print(f"Worker {worker_id} started, PID: {os.getpid()}")
while True:
data, addr = sock.recvfrom(2048)
# 处理数据
response = process_data(data)
sock.sendto(response, addr)
def process_data(data):
"""数据处理函数"""
# 模拟处理
return data.upper()
def start_udp_server(port, num_workers=4):
"""启动多进程 UDP 服务器"""
processes = []
for i in range(num_workers):
p = multiprocessing.Process(target=udp_worker, args=(port, i))
p.start()
processes.append(p)
for p in processes:
p.join()
# Linux 3.9+ 支持 SO_REUSEPORT
# 内核会自动将数据包分发到不同的 Socket
CPU 亲和性设置
将 UDP 处理绑定到特定 CPU 核心,提高缓存命中率:
import os
import multiprocessing
def set_cpu_affinity(cpu_id):
"""设置进程的 CPU 亲和性"""
try:
os.sched_setaffinity(0, {cpu_id})
print(f"绑定到 CPU {cpu_id}")
except AttributeError:
# Windows 使用不同的方法
import ctypes
kernel32 = ctypes.windll.kernel32
# 设置线程亲和性
kernel32.SetThreadAffinityMask(
kernel32.GetCurrentThread(),
1 << cpu_id
)
def worker_with_affinity(port, cpu_id):
"""带 CPU 亲和性的工作进程"""
set_cpu_affinity(cpu_id)
# ... UDP 处理逻辑
性能监控
监控 UDP 性能指标,及时发现瓶颈:
def get_udp_statistics():
"""获取 UDP 统计信息"""
stats = {}
# Linux: /proc/net/snmp
with open('/proc/net/snmp', 'r') as f:
for line in f:
if line.startswith('Udp:'):
parts = line.split()
if len(parts) > 1:
# 解析 UDP 统计
# InDatagrams, NoPorts, InErrors, OutDatagrams, RcvbufErrors, SndbufErrors
stats['in_datagrams'] = int(parts[1]) if len(parts) > 1 else 0
stats['no_ports'] = int(parts[2]) if len(parts) > 2 else 0
stats['in_errors'] = int(parts[3]) if len(parts) > 3 else 0
stats['out_datagrams'] = int(parts[4]) if len(parts) > 4 else 0
stats['rcvbuf_errors'] = int(parts[5]) if len(parts) > 5 else 0
stats['sndbuf_errors'] = int(parts[6]) if len(parts) > 6 else 0
break
# Linux: /proc/net/sockstat
with open('/proc/net/sockstat', 'r') as f:
for line in f:
if 'UDP:' in line:
parts = line.split()
stats['udp_sockets'] = int(parts[2])
break
return stats
# 监控示例
stats = get_udp_statistics()
print(f"接收数据报: {stats.get('in_datagrams', 'N/A')}")
print(f"接收缓冲区溢出: {stats.get('rcvbuf_errors', 'N/A')}")
print(f"发送缓冲区溢出: {stats.get('sndbuf_errors', 'N/A')}")
print(f"UDP Socket 数量: {stats.get('udp_sockets', 'N/A')}")
关键指标解读:
| 指标 | 含义 | 优化方向 |
|---|---|---|
InDatagrams | 接收的数据报总数 | 基准指标 |
NoPorts | 目标端口无监听 | 检查服务配置 |
InErrors | 接收错误 | 检查校验和、缓冲区 |
RcvbufErrors | 接收缓冲区溢出 | 增大缓冲区或处理速度 |
SndbufErrors | 发送缓冲区溢出 | 增大缓冲区或降低发送速率 |
UDP 可靠性协议对比
当应用需要 UDP 的性能优势,同时需要一定程度的可靠性时,可以在应用层或传输层实现可靠性机制。
常见可靠 UDP 协议
| 协议 | 特点 | 适用场景 | 开源实现 |
|---|---|---|---|
| QUIC | HTTP/3 标准,内置 TLS,多路复用 | Web、HTTP | quiche、quinn |
| KCP | 低延迟,可配置可靠性 | 游戏、实时通信 | kcp-go、kcp.py |
| RUDP | 可靠 UDP,简单实现 | 需要可靠性的 UDP 应用 | 各种语言实现 |
| UDT | 面向大数据传输 | 高速数据传输 | UDT4 |
| SCTP | 多宿主、多流 | 电信、信令传输 | 内核支持 |
| WebRTC DataChannel | 浏览器 P2P | 实时通信 | libwebrtc |
KCP 协议详解
KCP 是一个快速可靠的 ARQ 协议,比 TCP 快 30%-40%,延迟降低三倍:
# KCP 核心参数
KCP_CONFIG = {
'nodelay': 1, # 0: 关闭,1: 开启(快速重传)
'interval': 10, # 内部更新间隔(毫秒)
'resend': 2, # 快速重传模式,0: 关闭,2: 开启
'nc': 1, # 不使用拥塞控制
'sndwnd': 128, # 发送窗口大小
'rcvwnd': 512, # 接收窗口大小
'mtu': 1400, # 最大传输单元
'timeout': 30, # 连接超时(秒)
}
KCP vs TCP 关键差异:
TCP 重传策略:
- 超时时间 = RTTvar * 4 + RTTs
- 丢包后等待时间较长
- 拥塞控制会大幅降低发送速率
KCP 重传策略:
- 可配置的快速重传
- 跳过 TCP 的慢启动和拥塞避免
- 以带宽换延迟
RUDP 简单实现
以下是一个简化版可靠 UDP 的实现示例:
import socket
import struct
import time
import threading
from collections import defaultdict
class ReliableUDPSender:
"""可靠 UDP 发送端"""
def __init__(self, sock, addr, window_size=32, timeout=0.1):
self.sock = sock
self.addr = addr
self.window_size = window_size
self.timeout = timeout
self.next_seq = 0
self.base_seq = 0 # 窗口起始
self.buffer = {} # 缓存已发送未确认的数据
self.acked = set() # 已确认的序列号
self.lock = threading.Lock()
self.running = True
# 启动重传线程
self.retransmit_thread = threading.Thread(target=self._retransmit_worker)
self.retransmit_thread.daemon = True
self.retransmit_thread.start()
def _retransmit_worker(self):
"""后台重传线程"""
while self.running:
time.sleep(self.timeout)
with self.lock:
current_time = time.time()
for seq, (data, send_time) in list(self.buffer.items()):
if current_time - send_time > self.timeout:
# 重传
self._send_packet(seq, data)
self.buffer[seq] = (data, current_time)
def _send_packet(self, seq, data):
"""发送带序列号的数据包"""
# 格式: seq(4B) | data
packet = struct.pack('!I', seq) + data
self.sock.sendto(packet, self.addr)
def send(self, data):
"""发送数据(阻塞直到窗口有空位)"""
while True:
with self.lock:
if self.next_seq - self.base_seq < self.window_size:
break
time.sleep(0.001)
with self.lock:
seq = self.next_seq
self._send_packet(seq, data)
self.buffer[seq] = (data, time.time())
self.next_seq += 1
def handle_ack(self, ack_packet):
"""处理确认包"""
ack_seq = struct.unpack('!I', ack_packet)[0]
with self.lock:
self.acked.add(ack_seq)
if ack_seq in self.buffer:
del self.buffer[ack_seq]
# 滑动窗口
while self.base_seq in self.acked:
self.base_seq += 1
def close(self):
"""关闭连接"""
self.running = False
self.retransmit_thread.join()
class ReliableUDPReceiver:
"""可靠 UDP 接收端"""
def __init__(self, sock):
self.sock = sock
self.expected_seq = 0
self.buffer = {} # 缓存乱序到达的数据
self.lock = threading.Lock()
def receive(self):
"""接收数据"""
while True:
packet, addr = self.sock.recvfrom(65535)
if len(packet) < 4:
continue
seq = struct.unpack('!I', packet[:4])[0]
data = packet[4:]
with self.lock:
# 发送 ACK
ack_packet = struct.pack('!I', seq)
self.sock.sendto(ack_packet, addr)
# 处理数据
if seq == self.expected_seq:
yield data
self.expected_seq += 1
# 检查缓冲区中是否有后续数据
while self.expected_seq in self.buffer:
yield self.buffer.pop(self.expected_seq)
self.expected_seq += 1
elif seq > self.expected_seq:
# 乱序到达,缓存
self.buffer[seq] = data
# seq < expected_seq 是重复包,忽略
QUIC 协议特点
QUIC 是 UDP 可靠传输的现代解决方案,已成为 HTTP/3 的基础:
QUIC 协议栈:
┌─────────────────────────────────┐
│ HTTP/3 │
├─────────────────────────────────┤
│ QUIC (可靠传输 + 加密) │
│ ┌─────────┬─────────────────┐ │
│ │ 流多路复用 │ 拥塞控制 │ │
│ │ 丢包恢复 │ 流量控制 │ │
│ └─────────┴─────────────────┘ │
│ TLS 1.3 (内置加密) │
├─────────────────────────────────┤
│ UDP │
└─────────────────────────────────┘
QUIC 相比 TCP + TLS 的优势:
| 特性 | TCP + TLS | QUIC |
|---|---|---|
| 连接建立 | 2-3 RTT | 0-1 RTT |
| 队头阻塞 | 整个连接 | 仅单个流 |
| 连接迁移 | 不支持 | 支持 |
| 拥塞控制 | 内核实现 | 用户态实现 |
| 协议演进 | 需要内核升级 | 用户态更新 |
UDP 实际应用案例
DNS 查询优化
DNS 是 UDP 最经典的应用,理解其优化策略对理解 UDP 应用很有帮助:
import socket
import struct
import time
def dns_query(domain, dns_server='8.8.8.8', timeout=2.0):
"""
高性能 DNS 查询实现
优化点:
1. 使用单个 Socket 复用
2. 合理的超时设置
3. 二进制协议解析
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
# 构建 DNS 查询报文
# ID (2B) + Flags (2B) + Questions (2B) + Answers (2B) + Auth (2B) + Additional (2B)
header = struct.pack('>HHHHHH',
0x1234, # Transaction ID
0x0100, # Flags: Standard query
1, 0, 0, 0 # Counts
)
# Question section
question = b''
for part in domain.split('.'):
question += bytes([len(part)]) + part.encode()
question += b'\x00' # End of name
question += struct.pack('>HH', 1, 1) # Type A, Class IN
query = header + question
start_time = time.time()
sock.sendto(query, (dns_server, 53))
response, _ = sock.recvfrom(512)
elapsed = time.time() - start_time
sock.close()
return parse_dns_response(response), elapsed
def parse_dns_response(data):
"""解析 DNS 响应"""
if len(data) < 12:
return None
# 解析头部
header = struct.unpack('>HHHHHH', data[:12])
trans_id, flags, qdcount, ancount, _, _ = header
results = []
offset = 12
# 跳过 Question section
for _ in range(qdcount):
while data[offset] != 0:
offset += data[offset] + 1
offset += 5 # 0x00 + Type(2) + Class(2)
# 解析 Answer section
for _ in range(ancount):
# Name (可能使用压缩指针)
if data[offset] & 0xC0 == 0xC0:
offset += 2 # 压缩指针
else:
while data[offset] != 0:
offset += data[offset] + 1
offset += 1
# Type, Class, TTL, RDLENGTH
rtype, rclass, ttl, rdlength = struct.unpack('>HHIH', data[offset:offset+10])
offset += 10
# RDATA
if rtype == 1: # A record
ip = '.'.join(str(b) for b in data[offset:offset+4])
results.append(ip)
offset += rdlength
return results
# 使用示例
# ips, time_ms = dns_query('www.google.com')
# print(f"解析结果: {ips}, 耗时: {time_ms*1000:.2f}ms")
游戏/实时通信场景
实时多人游戏对延迟极其敏感,UDP 是首选:
import socket
import struct
import time
import threading
from collections import deque
class GameServer:
"""
游戏服务器示例
关键设计:
1. 状态同步而非事件同步
2. 插值和平滑处理
3. 丢包不影响游戏进行
"""
def __init__(self, port=9999):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('0.0.0.0', port))
self.sock.settimeout(0.1)
self.clients = {} # {addr: {'x': 0, 'y': 0, 'last_time': 0}}
self.running = True
# 状态同步线程
self.sync_thread = threading.Thread(target=self._sync_worker)
self.sync_thread.daemon = True
def _sync_worker(self):
"""定时广播游戏状态"""
while self.running:
time.sleep(0.016) # ~60 FPS
if not self.clients:
continue
# 构建状态包
# 格式: client_count(1B) + [addr_id(4B) + x(2B) + y(2B)] * n
state = struct.pack('!B', len(self.clients))
for addr, data in self.clients.items():
addr_id = hash(addr) & 0xFFFFFFFF
state += struct.pack('!Ihh', addr_id,
int(data['x']), int(data['y']))
# 广播给所有客户端
for addr in self.clients:
self.sock.sendto(state, addr)
def run(self):
"""主循环"""
self.sync_thread.start()
while self.running:
try:
data, addr = self.sock.recvfrom(1024)
# 解析客户端输入
# 格式: x(2B) + y(2B) + action(1B)
if len(data) >= 5:
x, y, action = struct.unpack('!hhB', data[:5])
# 更新客户端状态
if addr not in self.clients:
self.clients[addr] = {'x': 0, 'y': 0}
# 应用移动
self.clients[addr]['x'] = x
self.clients[addr]['y'] = y
self.clients[addr]['last_time'] = time.time()
except socket.timeout:
continue
except Exception as e:
print(f"Error: {e}")
self.sock.close()
视频流传输
视频流对实时性要求高,可以容忍丢包:
import socket
import struct
import time
class VideoStreamer:
"""
视频流发送端
关键技术:
1. RTP/RTCP 协议设计
2. 时间戳同步
3. 分片和重组
4. 自适应码率
"""
def __init__(self, dest_addr, mtu=1400):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.dest_addr = dest_addr
self.mtu = mtu
self.sequence = 0
self.timestamp = 0
self.ssrc = 0x12345678 # 同步源标识
def send_frame(self, frame_data, frame_type='I'):
"""
发送一个视频帧
frame_type: I/P/B 帧
I 帧: 关键帧,独立解码
P 帧: 预测帧,依赖前一帧
B 帧: 双向预测帧,依赖前后帧
"""
# 计算分片数量
chunk_size = self.mtu - 12 # RTP 头部占用 12 字节
chunks = []
for i in range(0, len(frame_data), chunk_size):
chunks.append(frame_data[i:i+chunk_size])
timestamp = self.timestamp
self.timestamp += 3000 # 假设 30fps,每个时间单位 1/90000 秒
# 发送所有分片
for i, chunk in enumerate(chunks):
# RTP 头部
# V=2, P=0, X=0, CC=0, M=(last chunk), PT=96
marker = 1 if i == len(chunks) - 1 else 0
header = struct.pack('!BBHII',
0x80, # Version 2, no padding, no extension, CC=0
96 | (marker << 7), # Payload type 96, marker bit
self.sequence,
timestamp,
self.ssrc
)
self.sequence = (self.sequence + 1) & 0xFFFF
# 分片头部(简化)
frag_header = struct.pack('!BBI',
1 if i == 0 else 0, # 开始标记
1 if i == len(chunks) - 1 else 0, # 结束标记
len(frame_data) # 总长度
)
packet = header + frag_header + chunk
self.sock.sendto(packet, self.dest_addr)
def close(self):
self.sock.close()
class VideoReceiver:
"""视频流接收端"""
def __init__(self, port=5000):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(('0.0.0.0', port))
self.sock.settimeout(5.0)
self.frames = {} # {timestamp: {chunks}}
def receive_frame(self):
"""接收并重组一个完整的帧"""
while True:
data, _ = self.sock.recvfrom(2048)
if len(data) < 12:
continue
# 解析 RTP 头部
seq, timestamp = struct.unpack('!HI', data[2:8])
marker = (data[1] >> 7) & 1
# 解析分片头部
start, end, total_len = struct.unpack('!BBI', data[12:18])
chunk_data = data[18:]
# 存储分片
if timestamp not in self.frames:
self.frames[timestamp] = {'chunks': [], 'total': total_len}
self.frames[timestamp]['chunks'].append(chunk_data)
# 检查是否收到完整帧
if marker: # 最后一个分片
frame_data = b''.join(self.frames[timestamp]['chunks'])
del self.frames[timestamp]
if len(frame_data) >= total_len * 0.95: # 允许少量丢失
return frame_data
小结
UDP 是简单高效的传输协议:
- 特点:无连接、不可靠、面向报文、支持多播广播
- 格式:8 字节首部,包含端口、长度、校验和
- 应用:实时音视频、在线游戏、DNS、DHCP、IoT
- 可靠性:应用层可以实现确认、重传、FEC 等机制
- 编程:使用 SOCK_DGRAM 类型的 Socket
- 性能优化:缓冲区调优、批量收发、零拷贝、多进程处理
- 可靠传输:QUIC、KCP 等协议在 UDP 上实现可靠性
UDP 适合对实时性要求高、可以容忍一定丢包的场景。如果需要可靠性,可以在应用层实现或使用 QUIC、KCP 等协议。
[!TIP] 想深入了解 TCP 的可靠性机制?请看 TCP 协议详解。想了解 HTTP/3 如何基于 UDP 实现可靠传输?请看 HTTP/3 与 QUIC 协议详解。
练习
- 比较 UDP 和 TCP 的主要区别
- 说明 UDP 校验和的计算过程
- 列举 UDP 适合的应用场景及其原因
- 编写一个简单的 UDP 客户端和服务器程序
- 解释 UDP 缓冲区溢出的原因及解决方案
- 比较 QUIC、KCP 和 TCP 在延迟和可靠性方面的差异