跳到主要内容

异步迭代器与生成器

异步迭代器允许我们在迭代过程中执行异步操作,例如从网络流中读取数据或从数据库分批获取记录。本章介绍异步迭代器的概念、实现方式和实际应用。

同步迭代器回顾

在讨论异步迭代器之前,先回顾同步迭代器的工作原理。

同步迭代器

同步迭代器实现了 __iter____next__ 方法:

class Counter:
"""同步迭代器:生成数字序列"""

def __init__(self, limit):
self.limit = limit
self.count = 0

def __iter__(self):
return self

def __next__(self):
if self.count >= self.limit:
raise StopIteration
self.count += 1
return self.count

# 使用 for 循环迭代
for num in Counter(5):
print(num) # 1, 2, 3, 4, 5

同步生成器

生成器使用 yield 简化迭代器的创建:

def counter(limit):
"""同步生成器"""
count = 0
while count < limit:
count += 1
yield count

for num in counter(5):
print(num)

同步迭代器的限制:__next__ 方法不能是异步的,无法在其中执行 await 操作。

异步迭代器

当需要在迭代过程中执行异步操作时,就需要异步迭代器。

基本语法

异步迭代器实现 __aiter____anext__ 方法:

import asyncio

class AsyncCounter:
"""异步迭代器:每秒返回一个数字"""

def __init__(self, limit):
self.limit = limit
self.count = 0

def __aiter__(self):
return self

async def __anext__(self):
if self.count >= self.limit:
raise StopAsyncIteration # 注意:使用 StopAsyncIteration

# 可以在这里执行异步操作!
await asyncio.sleep(1)

self.count += 1
return self.count

async def main():
# 使用 async for 迭代
async for num in AsyncCounter(5):
print(num)

asyncio.run(main())

输出(每秒打印一个数字):

1
2
3
4
5

关键区别

特性同步迭代器异步迭代器
定义方法__iter__, __next____aiter__, __anext__
方法类型同步方法异步方法(async def
结束信号StopIterationStopAsyncIteration
迭代语法for x in iterasync for x in iter

为什么需要异步迭代器?

考虑从 API 分页获取数据的场景:

import asyncio
import aiohttp

class PaginatedAPI:
"""分页 API 异步迭代器"""

def __init__(self, base_url, per_page=10):
self.base_url = base_url
self.per_page = per_page
self.page = 1
self.items = []
self.index = 0

def __aiter__(self):
return self

async def __anext__(self):
# 如果当前页的项目已用完
if self.index >= len(self.items):
# 异步获取下一页
self.items = await self._fetch_page(self.page)
self.index = 0
self.page += 1

# 没有更多数据
if not self.items:
raise StopAsyncIteration

item = self.items[self.index]
self.index += 1
return item

async def _fetch_page(self, page):
"""模拟异步获取页面数据"""
# 实际中这里会发起 HTTP 请求
await asyncio.sleep(0.1)

if page > 3: # 模拟只有 3 页数据
return []

return [f"项目 {page}-{i}" for i in range(self.per_page)]

async def main():
async for item in PaginatedAPI("https://api.example.com"):
print(item)

asyncio.run(main())

如果没有异步迭代器,需要手动处理分页逻辑:

async def fetch_all_manual():
"""手动处理分页"""
page = 1
all_items = []

while True:
items = await fetch_page(page)
if not items:
break
all_items.extend(items)
page += 1

return all_items

异步迭代器让代码更优雅,而且支持流式处理——不需要等到所有数据加载完就可以开始处理。

异步生成器

异步生成器是创建异步迭代器的更简洁方式。

基本语法

使用 async def 定义函数,使用 yield 返回值:

import asyncio

async def async_counter(limit):
"""异步生成器"""
count = 0
while count < limit:
await asyncio.sleep(1) # 可以执行异步操作
count += 1
yield count

async def main():
# 使用 async for 迭代
async for num in async_counter(5):
print(num)

asyncio.run(main())

异步生成器自动实现了 __aiter____anext__,比手动编写异步迭代器类更简洁。

异步生成器 vs 异步迭代器类

方式优点缺点
异步生成器代码简洁,易于理解状态管理不够灵活
异步迭代器类完全控制迭代逻辑代码量较多

大多数情况下,异步生成器已经足够。

实际示例:读取大文件

import asyncio
import aiofiles

async def read_lines(filepath):
"""异步逐行读取文件"""
async with aiofiles.open(filepath, mode='r') as file:
async for line in file:
yield line.strip()

async def process_large_file(filepath):
"""处理大文件"""
async for line in read_lines(filepath):
# 处理每一行
if line:
print(f"处理: {line[:50]}...")

asyncio.run(process_large_file("large_file.txt"))

实际示例:实时数据流

import asyncio
import random

async def sensor_data(sensor_id, interval=1):
"""模拟传感器数据流"""
while True:
# 模拟读取传感器数据
await asyncio.sleep(interval)
value = random.uniform(20, 30)
yield {
'sensor_id': sensor_id,
'value': round(value, 2),
'timestamp': asyncio.get_event_loop().time()
}

async def monitor_sensors():
"""监控多个传感器"""
# 创建多个数据流
sensors = [
sensor_data("温度传感器", 0.5),
sensor_data("湿度传感器", 0.8),
]

# 合并数据流
async for data in merge_async_iterators(*sensors):
print(f"收到数据: {data}")

async def merge_async_iterators(*iterators):
"""合并多个异步迭代器"""
tasks = {
asyncio.create_task(it.__anext__()): it
for it in iterators
}

while tasks:
done, _ = await asyncio.wait(
tasks.keys(),
return_when=asyncio.FIRST_COMPLETED
)

for task in done:
it = tasks.pop(task)
try:
yield task.result()
# 重新创建任务获取下一个值
tasks[asyncio.create_task(it.__anext__())] = it
except StopAsyncIteration:
pass # 这个迭代器结束了

# asyncio.run(monitor_sensors())

异步推导式

Python 支持异步推导式,用于从异步迭代器创建列表、集合或字典。

异步列表推导式

import asyncio

async def async_range(n):
"""异步生成器"""
for i in range(n):
await asyncio.sleep(0.1)
yield i

async def main():
# 异步列表推导式
squares = [x * x async for x in async_range(5)]
print(squares) # [0, 1, 4, 9, 16]

asyncio.run(main())

异步集合和字典推导式

import asyncio

async def async_items():
yield "apple", 1
yield "banana", 2
yield "cherry", 3

async def main():
# 异步集合推导式
fruits = {item async for item in async_range(5)}
print(fruits) # {0, 1, 2, 3, 4}

# 异步字典推导式
prices = {k: v async for k, v in async_items()}
print(prices) # {'apple': 1, 'banana': 2, 'cherry': 3}

asyncio.run(main())

带条件的推导式

import asyncio

async def main():
# 只保留偶数的平方
even_squares = [
x * x
async for x in async_range(10)
if x % 2 == 0
]
print(even_squares) # [0, 4, 16, 36, 64]

asyncio.run(main())

异步生成器表达式

类似于同步生成器表达式,异步生成器表达式创建一个异步迭代器:

import asyncio

async def async_range(n):
for i in range(n):
await asyncio.sleep(0.01)
yield i

async def main():
# 异步生成器表达式(注意括号)
async_gen = (x * x async for x in async_range(5))

# 它是一个异步迭代器
async for value in async_gen:
print(value)

asyncio.run(main())

关闭异步生成器

异步生成器需要正确关闭以释放资源。

使用 aclose()

import asyncio

async def generator_with_cleanup():
"""带清理逻辑的异步生成器"""
try:
for i in range(100):
await asyncio.sleep(0.1)
yield i
finally:
print("清理资源...")

async def main():
gen = generator_with_cleanup()

async for value in gen:
print(value)
if value >= 3:
await gen.aclose() # 提前关闭
break

print("结束")

asyncio.run(main())

输出:

0
1
2
3
清理资源...
结束

使用 async with

Python 3.10+ 可以使用 contextlib.aclosing

import asyncio
from contextlib import aclosing

async def main():
async with aclosing(generator_with_cleanup()) as gen:
async for value in gen:
print(value)
if value >= 3:
break
# 自动调用 aclose()

异步内置函数

anext()

anext()next() 的异步版本:

import asyncio

async def async_range(n):
for i in range(n):
await asyncio.sleep(0.01)
yield i

async def main():
gen = async_range(5)

# 获取下一个值
first = await anext(gen)
print(first) # 0

second = await anext(gen)
print(second) # 1

# 带默认值
for _ in range(10):
value = await anext(gen, "结束")
print(value)

asyncio.run(main())

aiter()

aiter()iter() 的异步版本:

import asyncio

class AsyncSequence:
def __init__(self, data):
self.data = data
self.index = 0

def __aiter__(self):
return self

async def __anext__(self):
await asyncio.sleep(0.01)
if self.index >= len(self.data):
raise StopAsyncIteration
value = self.data[self.index]
self.index += 1
return value

async def main():
seq = AsyncSequence([1, 2, 3, 4, 5])

# 获取异步迭代器
it = aiter(seq)

# 手动迭代
while True:
try:
value = await anext(it)
print(value)
except StopAsyncIteration:
break

asyncio.run(main())

实践示例

示例 1:异步数据库游标

import asyncio

class AsyncCursor:
"""模拟异步数据库游标"""

def __init__(self, query, batch_size=100):
self.query = query
self.batch_size = batch_size
self._offset = 0
self._buffer = []
self._exhausted = False

def __aiter__(self):
return self

async def __anext__(self):
if self._buffer:
return self._buffer.pop(0)

if self._exhausted:
raise StopAsyncIteration

# 异步获取一批数据
self._buffer = await self._fetch_batch()

if not self._buffer:
self._exhausted = True
raise StopAsyncIteration

return self._buffer.pop(0)

async def _fetch_batch(self):
"""模拟数据库查询"""
await asyncio.sleep(0.1) # 模拟 I/O

if self._offset >= 500:
return []

batch = [
{'id': self._offset + i, 'data': f'记录 {self._offset + i}'}
for i in range(self.batch_size)
]
self._offset += self.batch_size
return batch

async def main():
async for row in AsyncCursor("SELECT * FROM users"):
if row['id'] % 100 == 0:
print(f"处理: {row}")

asyncio.run(main())

示例 2:异步队列消费者

import asyncio
import random

async def producer(queue):
"""生产者"""
for i in range(20):
await asyncio.sleep(random.uniform(0.1, 0.3))
await queue.put(f"消息 {i}")
await queue.put(None) # 结束信号

async def queue_consumer(queue):
"""队列消费者(异步生成器)"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
yield item
queue.task_done()

async def main():
queue = asyncio.Queue()

# 启动生产者
prod_task = asyncio.create_task(producer(queue))

# 消费队列
async for message in queue_consumer(queue):
print(f"消费: {message}")

await prod_task

asyncio.run(main())

示例 3:带速率限制的 API 客户端

import asyncio

class RateLimitedAPI:
"""带速率限制的 API 客户端"""

def __init__(self, requests_per_second=5):
self.interval = 1.0 / requests_per_second
self.last_request = 0

async def fetch_all(self, items):
"""获取所有项目(带速率限制)"""
for item in items:
# 速率限制
now = asyncio.get_event_loop().time()
wait_time = self.last_request + self.interval - now
if wait_time > 0:
await asyncio.sleep(wait_time)

self.last_request = asyncio.get_event_loop().time()

# 模拟 API 请求
result = await self._fetch(item)
yield result

async def _fetch(self, item):
"""模拟 API 请求"""
await asyncio.sleep(0.1)
return f"结果: {item}"

async def main():
api = RateLimitedAPI(requests_per_second=5)
items = [f"item-{i}" for i in range(20)]

async for result in api.fetch_all(items):
print(result)

asyncio.run(main())

小结

异步迭代器和生成器的核心概念:

概念说明
异步迭代器实现 __aiter____anext__ 的类
异步生成器使用 async defyield 的函数
async for异步迭代语法
StopAsyncIteration异步迭代结束信号
异步推导式[x async for x in iter]

最佳实践:

  1. 优先使用异步生成器而非异步迭代器类
  2. 使用 aclose()aclosing() 正确关闭异步生成器
  3. 在异步生成器中使用 try/finally 确保资源清理
  4. 利用异步推导式简化代码
  5. 对于 I/O 密集型的数据流处理,异步迭代器非常合适

异步迭代器是处理流式数据、分页 API、实时数据等场景的强大工具,让异步代码更加优雅和高效。