异步迭代器与生成器
异步迭代器允许我们在迭代过程中执行异步操作,例如从网络流中读取数据或从数据库分批获取记录。本章介绍异步迭代器的概念、实现方式和实际应用。
同步迭代器回顾
在讨论异步迭代器之前,先回顾同步迭代器的工作原理。
同步迭代器
同步迭代器实现了 __iter__ 和 __next__ 方法:
class Counter:
"""同步迭代器:生成数字序列"""
def __init__(self, limit):
self.limit = limit
self.count = 0
def __iter__(self):
return self
def __next__(self):
if self.count >= self.limit:
raise StopIteration
self.count += 1
return self.count
# 使用 for 循环迭代
for num in Counter(5):
print(num) # 1, 2, 3, 4, 5
同步生成器
生成器使用 yield 简化迭代器的创建:
def counter(limit):
"""同步生成器"""
count = 0
while count < limit:
count += 1
yield count
for num in counter(5):
print(num)
同步迭代器的限制:__next__ 方法不能是异步的,无法在其中执行 await 操作。
异步迭代器
当需要在迭代过程中执行异步操作时,就需要异步迭代器。
基本语法
异步迭代器实现 __aiter__ 和 __anext__ 方法:
import asyncio
class AsyncCounter:
"""异步迭代器:每秒返回一个数字"""
def __init__(self, limit):
self.limit = limit
self.count = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.count >= self.limit:
raise StopAsyncIteration # 注意:使用 StopAsyncIteration
# 可以在这里执行异步操作!
await asyncio.sleep(1)
self.count += 1
return self.count
async def main():
# 使用 async for 迭代
async for num in AsyncCounter(5):
print(num)
asyncio.run(main())
输出(每秒打印一个数字):
1
2
3
4
5
关键区别
| 特性 | 同步迭代器 | 异步迭代器 |
|---|---|---|
| 定义方法 | __iter__, __next__ | __aiter__, __anext__ |
| 方法类型 | 同步方法 | 异步方法(async def) |
| 结束信号 | StopIteration | StopAsyncIteration |
| 迭代语法 | for x in iter | async for x in iter |
为什么需要异步迭代器?
考虑从 API 分页获取数据的场景:
import asyncio
import aiohttp
class PaginatedAPI:
"""分页 API 异步迭代器"""
def __init__(self, base_url, per_page=10):
self.base_url = base_url
self.per_page = per_page
self.page = 1
self.items = []
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
# 如果当前页的项目已用完
if self.index >= len(self.items):
# 异步获取下一页
self.items = await self._fetch_page(self.page)
self.index = 0
self.page += 1
# 没有更多数据
if not self.items:
raise StopAsyncIteration
item = self.items[self.index]
self.index += 1
return item
async def _fetch_page(self, page):
"""模拟异步获取页面数据"""
# 实际中这里会发起 HTTP 请求
await asyncio.sleep(0.1)
if page > 3: # 模拟只有 3 页数据
return []
return [f"项目 {page}-{i}" for i in range(self.per_page)]
async def main():
async for item in PaginatedAPI("https://api.example.com"):
print(item)
asyncio.run(main())
如果没有异步迭代器,需要手动处理分页逻辑:
async def fetch_all_manual():
"""手动处理分页"""
page = 1
all_items = []
while True:
items = await fetch_page(page)
if not items:
break
all_items.extend(items)
page += 1
return all_items
异步迭代器让代码更优雅,而且支持流式处理——不需要等到所有数据加载完就可以开始处理。
异步生成器
异步生成器是创建异步迭代器的更简洁方式。
基本语法
使用 async def 定义函数,使用 yield 返回值:
import asyncio
async def async_counter(limit):
"""异步生成器"""
count = 0
while count < limit:
await asyncio.sleep(1) # 可以执行异步操作
count += 1
yield count
async def main():
# 使用 async for 迭代
async for num in async_counter(5):
print(num)
asyncio.run(main())
异步生成器自动实现了 __aiter__ 和 __anext__,比手动编写异步迭代器类更简洁。
异步生成器 vs 异步迭代器类
| 方式 | 优点 | 缺点 |
|---|---|---|
| 异步生成器 | 代码简洁,易于理解 | 状态管理不够灵活 |
| 异步迭代器类 | 完全控制迭代逻辑 | 代码量较多 |
大多数情况下,异步生成器已经足够。
实际示例:读取大文件
import asyncio
import aiofiles
async def read_lines(filepath):
"""异步逐行读取文件"""
async with aiofiles.open(filepath, mode='r') as file:
async for line in file:
yield line.strip()
async def process_large_file(filepath):
"""处理大文件"""
async for line in read_lines(filepath):
# 处理每一行
if line:
print(f"处理: {line[:50]}...")
asyncio.run(process_large_file("large_file.txt"))
实际示例:实时数据流
import asyncio
import random
async def sensor_data(sensor_id, interval=1):
"""模拟传感器数据流"""
while True:
# 模拟读取传感器数据
await asyncio.sleep(interval)
value = random.uniform(20, 30)
yield {
'sensor_id': sensor_id,
'value': round(value, 2),
'timestamp': asyncio.get_event_loop().time()
}
async def monitor_sensors():
"""监控多个传感器"""
# 创建多个数据流
sensors = [
sensor_data("温度传感器", 0.5),
sensor_data("湿度传感器", 0.8),
]
# 合并数据流
async for data in merge_async_iterators(*sensors):
print(f"收到数据: {data}")
async def merge_async_iterators(*iterators):
"""合并多个异步迭代器"""
tasks = {
asyncio.create_task(it.__anext__()): it
for it in iterators
}
while tasks:
done, _ = await asyncio.wait(
tasks.keys(),
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
it = tasks.pop(task)
try:
yield task.result()
# 重新创建任务获取下一个值
tasks[asyncio.create_task(it.__anext__())] = it
except StopAsyncIteration:
pass # 这个迭代器结束了
# asyncio.run(monitor_sensors())
异步推导式
Python 支持异步推导式,用于从异步迭代器创建列表、集合或字典。
异步列表推导式
import asyncio
async def async_range(n):
"""异步生成器"""
for i in range(n):
await asyncio.sleep(0.1)
yield i
async def main():
# 异步列表推导式
squares = [x * x async for x in async_range(5)]
print(squares) # [0, 1, 4, 9, 16]
asyncio.run(main())
异步集合和字典推导式
import asyncio
async def async_items():
yield "apple", 1
yield "banana", 2
yield "cherry", 3
async def main():
# 异步集合推导式
fruits = {item async for item in async_range(5)}
print(fruits) # {0, 1, 2, 3, 4}
# 异步字典推导式
prices = {k: v async for k, v in async_items()}
print(prices) # {'apple': 1, 'banana': 2, 'cherry': 3}
asyncio.run(main())
带条件的推导式
import asyncio
async def main():
# 只保留偶数的平方
even_squares = [
x * x
async for x in async_range(10)
if x % 2 == 0
]
print(even_squares) # [0, 4, 16, 36, 64]
asyncio.run(main())
异步生成器表达式
类似于同步生成器表达式,异步生成器表达式创建一个异步迭代器:
import asyncio
async def async_range(n):
for i in range(n):
await asyncio.sleep(0.01)
yield i
async def main():
# 异步生成器表达式(注意括号)
async_gen = (x * x async for x in async_range(5))
# 它是一个异步迭代器
async for value in async_gen:
print(value)
asyncio.run(main())
关闭异步生成器
异步生成器需要正确关闭以释放资源。
使用 aclose()
import asyncio
async def generator_with_cleanup():
"""带清理逻辑的异步生成器"""
try:
for i in range(100):
await asyncio.sleep(0.1)
yield i
finally:
print("清理资源...")
async def main():
gen = generator_with_cleanup()
async for value in gen:
print(value)
if value >= 3:
await gen.aclose() # 提前关闭
break
print("结束")
asyncio.run(main())
输出:
0
1
2
3
清理资源...
结束
使用 async with
Python 3.10+ 可以使用 contextlib.aclosing:
import asyncio
from contextlib import aclosing
async def main():
async with aclosing(generator_with_cleanup()) as gen:
async for value in gen:
print(value)
if value >= 3:
break
# 自动调用 aclose()
异步内置函数
anext()
anext() 是 next() 的异步版本:
import asyncio
async def async_range(n):
for i in range(n):
await asyncio.sleep(0.01)
yield i
async def main():
gen = async_range(5)
# 获取下一个值
first = await anext(gen)
print(first) # 0
second = await anext(gen)
print(second) # 1
# 带默认值
for _ in range(10):
value = await anext(gen, "结束")
print(value)
asyncio.run(main())
aiter()
aiter() 是 iter() 的异步版本:
import asyncio
class AsyncSequence:
def __init__(self, data):
self.data = data
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
await asyncio.sleep(0.01)
if self.index >= len(self.data):
raise StopAsyncIteration
value = self.data[self.index]
self.index += 1
return value
async def main():
seq = AsyncSequence([1, 2, 3, 4, 5])
# 获取异步迭代器
it = aiter(seq)
# 手动迭代
while True:
try:
value = await anext(it)
print(value)
except StopAsyncIteration:
break
asyncio.run(main())
实践示例
示例 1:异步数据库游标
import asyncio
class AsyncCursor:
"""模拟异步数据库游标"""
def __init__(self, query, batch_size=100):
self.query = query
self.batch_size = batch_size
self._offset = 0
self._buffer = []
self._exhausted = False
def __aiter__(self):
return self
async def __anext__(self):
if self._buffer:
return self._buffer.pop(0)
if self._exhausted:
raise StopAsyncIteration
# 异步获取一批数据
self._buffer = await self._fetch_batch()
if not self._buffer:
self._exhausted = True
raise StopAsyncIteration
return self._buffer.pop(0)
async def _fetch_batch(self):
"""模拟数据库查询"""
await asyncio.sleep(0.1) # 模拟 I/O
if self._offset >= 500:
return []
batch = [
{'id': self._offset + i, 'data': f'记录 {self._offset + i}'}
for i in range(self.batch_size)
]
self._offset += self.batch_size
return batch
async def main():
async for row in AsyncCursor("SELECT * FROM users"):
if row['id'] % 100 == 0:
print(f"处理: {row}")
asyncio.run(main())
示例 2:异步队列消费者
import asyncio
import random
async def producer(queue):
"""生产者"""
for i in range(20):
await asyncio.sleep(random.uniform(0.1, 0.3))
await queue.put(f"消息 {i}")
await queue.put(None) # 结束信号
async def queue_consumer(queue):
"""队列消费者(异步生成器)"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
yield item
queue.task_done()
async def main():
queue = asyncio.Queue()
# 启动生产者
prod_task = asyncio.create_task(producer(queue))
# 消费队列
async for message in queue_consumer(queue):
print(f"消费: {message}")
await prod_task
asyncio.run(main())
示例 3:带速率限制的 API 客户端
import asyncio
class RateLimitedAPI:
"""带速率限制的 API 客户端"""
def __init__(self, requests_per_second=5):
self.interval = 1.0 / requests_per_second
self.last_request = 0
async def fetch_all(self, items):
"""获取所有项目(带速率限制)"""
for item in items:
# 速率限制
now = asyncio.get_event_loop().time()
wait_time = self.last_request + self.interval - now
if wait_time > 0:
await asyncio.sleep(wait_time)
self.last_request = asyncio.get_event_loop().time()
# 模拟 API 请求
result = await self._fetch(item)
yield result
async def _fetch(self, item):
"""模拟 API 请求"""
await asyncio.sleep(0.1)
return f"结果: {item}"
async def main():
api = RateLimitedAPI(requests_per_second=5)
items = [f"item-{i}" for i in range(20)]
async for result in api.fetch_all(items):
print(result)
asyncio.run(main())
小结
异步迭代器和生成器的核心概念:
| 概念 | 说明 |
|---|---|
| 异步迭代器 | 实现 __aiter__ 和 __anext__ 的类 |
| 异步生成器 | 使用 async def 和 yield 的函数 |
async for | 异步迭代语法 |
StopAsyncIteration | 异步迭代结束信号 |
| 异步推导式 | [x async for x in iter] |
最佳实践:
- 优先使用异步生成器而非异步迭代器类
- 使用
aclose()或aclosing()正确关闭异步生成器 - 在异步生成器中使用
try/finally确保资源清理 - 利用异步推导式简化代码
- 对于 I/O 密集型的数据流处理,异步迭代器非常合适
异步迭代器是处理流式数据、分页 API、实时数据等场景的强大工具,让异步代码更加优雅和高效。