异步爬虫
当需要爬取大量页面时,同步请求的效率会成为瓶颈。异步爬虫可以在等待网络响应的同时处理其他请求,大幅提升爬取效率。
理解异步编程的本质
在学习异步爬虫之前,我们需要先理解异步编程的核心概念。很多初学者会混淆"异步"和"多线程",实际上它们是两种完全不同的并发模型。
同步 vs 异步 vs 多线程
**同步(Synchronous)**是最简单的编程模型:代码一行一行执行,遇到 I/O 操作(如网络请求、文件读写)时,程序会阻塞等待,直到操作完成后才继续执行下一行。
# 同步执行:必须等待每个请求完成
import requests
import time
def sync_crawl():
start = time.time()
for i in range(5):
response = requests.get(f'https://httpbin.org/delay/1')
print(f'请求 {i+1}: {response.status_code}')
print(f'总耗时: {time.time() - start:.2f}秒')
# 结果:总耗时约 5 秒(串行执行)
**多线程(Multithreading)**通过创建多个线程来实现并发,每个线程独立执行。Python 的多线程受 GIL(全局解释器锁)限制,同一时刻只有一个线程在执行 Python 代码,但对于 I/O 密集型任务,线程在等待 I/O 时会释放 GIL,其他线程可以执行。
# 多线程执行:多个请求并行
import threading
import requests
import time
def fetch(url, results, index):
response = requests.get(url)
results[index] = response.status_code
def threaded_crawl():
start = time.time()
results = [None] * 5
threads = []
for i in range(5):
t = threading.Thread(
target=fetch,
args=(f'https://httpbin.org/delay/1', results, i)
)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f'总耗时: {time.time() - start:.2f}秒')
# 结果:总耗时约 1 秒(并行执行)
**异步(Asynchronous)**使用单线程实现并发,核心思想是"非阻塞 I/O"。当遇到 I/O 操作时,不阻塞等待,而是注册一个回调,继续执行其他任务。当 I/O 完成时,事件循环会调用之前注册的回调。
# 异步执行:单线程并发
import asyncio
import aiohttp
import time
async def async_crawl():
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [
session.get(f'https://httpbin.org/delay/1')
for _ in range(5)
]
responses = await asyncio.gather(*tasks)
for r in responses:
print(f'状态码: {r.status}')
print(f'总耗时: {time.time() - start:.2f}秒')
# 结果:总耗时约 1 秒(单线程并发)
为什么异步爬虫更高效?
| 模型 | 线程数 | 内存占用 | 上下文切换 | 适用场景 |
|---|---|---|---|---|
| 同步 | 1 | 最低 | 无 | 简单任务、顺序执行 |
| 多线程 | N | 较高(每线程约 8MB) | 有(内核级) | CPU 密集型、已有同步库 |
| 异步 | 1 | 低 | 有(用户态) | I/O 密集型、高并发 |
异步爬虫的优势在于:
- 资源占用少:单线程模型,不需要为每个请求创建线程
- 无 GIL 竞争:同一时刻只有一个协程执行,没有线程安全问题
- 上下文切换开销小:在用户态切换,比内核态线程切换快得多
- 可扩展性强:可以轻松处理成千上万个并发连接
asyncio 核心概念
asyncio 是 Python 的异步编程框架,理解其核心概念对编写高质量的异步爬虫至关重要。
事件循环(Event Loop)
事件循环是 asyncio 的核心,它是一个无限循环,负责调度和执行协程。可以把事件循环想象成一个调度员:
- 维护一个任务队列
- 从队列中取出就绪的任务执行
- 当任务遇到 I/O 操作时,暂停任务,注册回调
- 当 I/O 完成时,将任务重新放入就绪队列
- 继续执行下一个就绪的任务
import asyncio
async def task(name, delay):
print(f'{name} 开始')
await asyncio.sleep(delay) # 模拟 I/O 操作
print(f'{name} 完成')
# 事件循环的工作流程示意
async def main():
# 创建三个任务
t1 = asyncio.create_task(task('任务A', 2))
t2 = asyncio.create_task(task('任务B', 1))
t3 = asyncio.create_task(task('任务C', 3))
# 等待所有任务完成
await asyncio.gather(t1, t2, t3)
# 执行顺序(时间线):
# 0s: 任务A开始 -> 暂停等待2秒
# 0s: 任务B开始 -> 暂停等待1秒
# 0s: 任务C开始 -> 暂停等待3秒
# 1s: 任务B完成
# 2s: 任务A完成
# 3s: 任务C完成
asyncio.run(main())
事件循环的关键特点:
- 单线程:所有协程都在同一个线程中执行,避免了线程安全问题
- 协作式调度:协程主动让出控制权(通过
await),而不是被抢占 - 非阻塞 I/O:I/O 操作通过操作系统提供的异步接口实现
协程(Coroutine)
协程是使用 async def 定义的函数,它是一个可以被暂停和恢复的函数。调用协程函数不会立即执行,而是返回一个协程对象:
import asyncio
async def my_coroutine():
print('协程开始')
await asyncio.sleep(1)
print('协程结束')
# 调用协程函数只返回协程对象,不会执行
coro = my_coroutine()
print(type(coro)) # <class 'coroutine'>
# 必须通过事件循环执行
asyncio.run(coro)
协程的三种状态:
import asyncio
import inspect
async def demo():
await asyncio.sleep(0.1)
coro = demo()
print(inspect.iscoroutine(coro)) # True
# 执行协程
asyncio.run(coro)
Task(任务)
Task 是对协程的封装,它将协程包装成一个可调度的任务,并跟踪其执行状态:
import asyncio
async def work(name):
print(f'{name} 开始')
await asyncio.sleep(1)
return f'{name} 结果'
async def main():
# 方式1:create_task 创建任务(推荐)
task1 = asyncio.create_task(work('任务1'))
# 方式2:ensure_future 创建任务
task2 = asyncio.ensure_future(work('任务2'))
# Task 的状态
print(f'任务1 完成: {task1.done()}') # False
print(f'任务1 取消: {task1.cancelled()}') # False
# 等待任务完成
result1 = await task1
result2 = await task2
print(f'任务1 完成: {task1.done()}') # True
print(f'结果: {result1}, {result2}')
asyncio.run(main())
协程与 Task 的区别:
| 特性 | 协程(Coroutine) | Task |
|---|---|---|
| 创建 | async def 定义 | create_task() 包装协程 |
| 调度 | 不自动调度 | 自动加入事件循环调度 |
| 状态 | 无状态跟踪 | 有 done()、cancelled() 状态 |
| 结果 | 需要 await 获取 | 可通过 result() 获取 |
| 取消 | 无法取消 | 可通过 cancel() 取消 |
Future
Future 是一个低-level 的可等待对象,表示一个异步操作的最终结果。Task 是 Future 的子类:
import asyncio
async def main():
# 创建 Future
future = asyncio.Future()
# 设置结果
future.set_result('完成')
# 获取结果
result = await future
print(result) # 完成
asyncio.run(main())
实际开发中,我们主要使用 Task,很少直接操作 Future。
可等待对象(Awaitable)
在 Python 中,可以使用 await 表达式的对象称为可等待对象,包括:
- 协程:
async def定义的函数返回的对象 - Task:通过
create_task()创建 - Future:低-level 的异步结果对象
import asyncio
async def coro():
return '协程结果'
async def main():
# 等待协程
result1 = await coro()
# 等待 Task
task = asyncio.create_task(coro())
result2 = await task
# 等待 Future
future = asyncio.Future()
future.set_result('Future结果')
result3 = await future
print(result1, result2, result3)
asyncio.run(main())
并发原语
asyncio 提供了多种并发控制原语,用于协调多个协程的执行:
Semaphore(信号量)
限制同时访问某个资源的协程数量:
import asyncio
import aiohttp
async def fetch_with_limit(semaphore, session, url):
async with semaphore: # 获取信号量
async with session.get(url) as response:
return await response.text()
async def main():
# 限制最多 5 个并发请求
semaphore = asyncio.Semaphore(5)
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(semaphore, session, f'https://example.com/{i}')
for i in range(20)
]
results = await asyncio.gather(*tasks)
asyncio.run(main())
Lock(锁)
确保同一时刻只有一个协程访问共享资源:
import asyncio
class AsyncCounter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock: # 获取锁
current = self.value
await asyncio.sleep(0.01) # 模拟耗时操作
self.value = current + 1
async def main():
counter = AsyncCounter()
# 并发增加 10 次
await asyncio.gather(*[counter.increment() for _ in range(10)])
print(f'最终值: {counter.value}') # 正确: 10
asyncio.run(main())
Event(事件)
用于协程间的通知机制:
import asyncio
async def waiter(event, name):
print(f'{name} 等待事件')
await event.wait() # 等待事件被设置
print(f'{name} 收到通知')
async def setter(event):
print('准备设置事件')
await asyncio.sleep(2)
print('设置事件')
event.set() # 设置事件,唤醒所有等待者
async def main():
event = asyncio.Event()
await asyncio.gather(
waiter(event, '等待者1'),
waiter(event, '等待者2'),
setter(event),
)
asyncio.run(main())
Queue(队列)
用于协程间的数据传递:
import asyncio
import random
async def producer(queue, name):
for i in range(5):
item = f'{name}-{i}'
await queue.put(item)
print(f'生产: {item}')
await asyncio.sleep(random.random())
async def consumer(queue, name):
while True:
item = await queue.get()
print(f'{name} 消费: {item}')
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# 启动生产者和消费者
producers = [producer(queue, f'生产者{i}') for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, f'消费者{i}')) for i in range(3)]
await asyncio.gather(*producers)
await queue.join() # 等待队列清空
# 取消消费者
for c in consumers:
c.cancel()
asyncio.run(main())
同步与异步的性能对比
同步请求的问题
传统的同步请求(如 requests)在发送请求后会阻塞等待响应,一次只能处理一个请求:
import requests
import time
def fetch_sync(urls):
results = []
for url in urls:
response = requests.get(url)
results.append(response.status_code)
return results
urls = [f'https://httpbin.org/delay/1' for _ in range(10)]
start = time.time()
results = fetch_sync(urls)
print(f'同步耗时: {time.time() - start:.2f}秒')
上面代码爬取 10 个页面,每个页面延迟 1 秒,总耗时约 10 秒。大部分时间都在等待网络响应。
异步请求的优势
异步请求可以在等待一个请求响应的同时,发送其他请求:
import asyncio
import aiohttp
import time
async def fetch_async(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def fetch_one(session, url):
async with session.get(url) as response:
return response.status
urls = [f'https://httpbin.org/delay/1' for _ in range(10)]
start = time.time()
results = asyncio.run(fetch_async(urls))
print(f'异步耗时: {time.time() - start:.2f}秒')
同样的 10 个页面,异步方式只需约 1 秒。
asyncio 基础
asyncio 是 Python 的异步编程框架,提供了事件循环、协程等核心功能。
async/await 语法
import asyncio
async def say_hello():
print('Hello')
await asyncio.sleep(1)
print('World')
asyncio.run(say_hello())
async def 定义的函数是协程函数,调用它返回协程对象。await 用于等待异步操作完成。
并发执行多个协程
import asyncio
async def task(name, delay):
print(f'{name} 开始')
await asyncio.sleep(delay)
print(f'{name} 完成')
return f'{name} 结果'
async def main():
results = await asyncio.gather(
task('任务A', 2),
task('任务B', 1),
task('任务C', 3),
)
print(results)
asyncio.run(main())
asyncio.gather() 可以并发执行多个协程,等待全部完成后返回结果列表。
创建任务
import asyncio
async def main():
task1 = asyncio.create_task(say_hello())
task2 = asyncio.create_task(say_hello())
await task1
await task2
asyncio.run(main())
asyncio.create_task() 将协程包装为任务,立即调度执行。
aiohttp 入门
aiohttp 是基于 asyncio 的异步 HTTP 客户端/服务器框架,非常适合编写异步爬虫。
安装
pip install aiohttp
# 可选:加速 SSL
pip install aiohttp[speedups]
基本用法
import aiohttp
import asyncio
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
html = await fetch_url('https://httpbin.org/get')
print(html)
asyncio.run(main())
ClientSession 的作用
ClientSession 是 aiohttp 的核心组件,它管理连接池和 Cookie:
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/cookies/set?name=value') as resp:
print(await resp.text())
async with session.get('https://httpbin.org/cookies') as resp:
print(await resp.text())
asyncio.run(main())
同一个 Session 会自动管理 Cookie,多个请求共享连接池。
请求方法
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as resp:
print('GET:', resp.status)
async with session.post('https://httpbin.org/post', json={'key': 'value'}) as resp:
print('POST:', resp.status)
async with session.put('https://httpbin.org/put', data={'key': 'value'}) as resp:
print('PUT:', resp.status)
async with session.delete('https://httpbin.org/delete') as resp:
print('DELETE:', resp.status)
asyncio.run(main())
请求参数
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
params = {'key': 'value', 'page': 1}
async with session.get('https://httpbin.org/get', params=params) as resp:
print(await resp.text())
asyncio.run(main())
请求头
import aiohttp
import asyncio
async def main():
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
}
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get('https://httpbin.org/headers') as resp:
print(await resp.text())
asyncio.run(main())
响应处理
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as resp:
print('状态码:', resp.status)
print('响应头:', resp.headers['Content-Type'])
print('文本:', await resp.text())
print('JSON:', await resp.json())
asyncio.run(main())
aiohttp 高级用法
连接器配置
aiohttp 使用 TCPConnector 管理连接池,通过自定义连接器可以精细控制连接行为:
import aiohttp
import asyncio
async def main():
# 创建自定义连接器
connector = aiohttp.TCPConnector(
limit=30, # 连接池最大连接数(默认 100)
limit_per_host=10, # 每个主机的最大连接数(默认无限制)
ttl_dns_cache=300, # DNS 缓存时间(秒)
use_dns_cache=True, # 是否启用 DNS 缓存
)
async with aiohttp.ClientSession(connector=connector) as session:
# 使用配置好的连接器
async with session.get('https://httpbin.org/get') as resp:
print(await resp.text())
asyncio.run(main())
连接器关键参数说明:
| 参数 | 默认值 | 说明 |
|---|---|---|
limit | 100 | 连接池最大连接数 |
limit_per_host | 0 | 每个主机的最大连接数,0 表示无限制 |
ttl_dns_cache | 10 | DNS 解析结果的缓存时间(秒) |
use_dns_cache | True | 是否启用 DNS 缓存 |
force_close | False | 是否在响应后强制关闭连接 |
enable_cleanup_closed | False | 是否启用已关闭连接的清理 |
连接池最佳实践
对于高并发爬虫,合理配置连接池至关重要:
import aiohttp
import asyncio
from typing import List, Optional
class OptimizedAsyncSpider:
"""优化连接池配置的异步爬虫"""
def __init__(
self,
max_connections: int = 50, # 总连接数
max_per_host: int = 10, # 每主机连接数
dns_cache_time: int = 300, # DNS 缓存 5 分钟
):
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=max_per_host,
ttl_dns_cache=dns_cache_time,
use_dns_cache=True,
)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=self.connector,
connector_owner=False, # 不让 session 关闭 connector
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
await self.connector.close()
async def fetch(self, url: str) -> Optional[str]:
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f'请求失败: {url}, 错误: {e}')
return None
async def main():
async with OptimizedAsyncSpider(
max_connections=50,
max_per_host=10,
) as spider:
urls = [f'https://httpbin.org/get?id={i}' for i in range(20)]
tasks = [spider.fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
print(f'成功: {sum(1 for r in results if r)} 个')
asyncio.run(main())
优雅关闭
aiohttp 的 ClientSession 关闭后,底层连接可能不会立即关闭。为了确保资源正确释放,需要在关闭事件循环前添加短暂等待:
import aiohttp
import asyncio
async def fetch_data():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as resp:
return await resp.text()
# 对于非 SSL 连接,零延迟即可
# await asyncio.sleep(0)
# 对于 SSL 连接,需要等待底层 SSL 连接关闭
# await asyncio.sleep(0.250)
async def main():
result = await fetch_data()
# SSL 连接需要等待 250ms 让底层连接关闭
await asyncio.sleep(0.250)
print(result[:100])
asyncio.run(main())
如果不添加延迟,可能会看到 ResourceWarning: unclosed transport 警告。
流式读取大文件
对于大文件或流式数据,不要一次性读取全部内容:
import aiohttp
import asyncio
async def download_large_file(url: str, output_path: str):
"""流式下载大文件"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
# 检查文件大小
content_length = response.content_length
print(f'文件大小: {content_length / 1024 / 1024:.2f} MB')
# 流式写入
with open(output_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
f.write(chunk)
print(f'下载完成: {output_path}')
async def stream_processing(url: str):
"""流式处理响应内容"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
# 逐行处理
async for line in response.content:
process_line(line)
async def main():
await download_large_file(
'https://example.com/large-file.zip',
'output.zip'
)
asyncio.run(main())
Cookie 管理
aiohttp 的 ClientSession 自动管理 Cookie:
import aiohttp
import asyncio
async def main():
# 创建带有初始 Cookie 的会话
cookies = {'session_id': 'abc123', 'user_token': 'xyz789'}
async with aiohttp.ClientSession(cookies=cookies) as session:
# 发送带有 Cookie 的请求
async with session.get('https://httpbin.org/cookies') as resp:
print(await resp.text())
# 查看当前 Cookie
print(session.cookie_jar.filter_cookies('httpbin.org'))
asyncio.run(main())
SSL/TLS 配置
控制 SSL 验证行为:
import aiohttp
import asyncio
import ssl
import certifi
async def main():
# 方式1:禁用 SSL 验证(不推荐,仅用于测试)
async with aiohttp.ClientSession() as session:
async with session.get('https://example.com', ssl=False) as resp:
print(await resp.text())
# 方式2:使用 certifi 提供的证书
ssl_context = ssl.create_default_context(cafile=certifi.where())
connector = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get('https://example.com') as resp:
print(await resp.text())
# 方式3:验证证书指纹
# 确保连接到特定服务器
fingerprint = b'...' # SHA256 指纹
async with aiohttp.ClientSession() as session:
async with session.get('https://example.com', ssl=aiohttp.Fingerprint(fingerprint)) as resp:
print(await resp.text())
asyncio.run(main())
请求追踪
使用 TraceConfig 追踪请求的完整生命周期:
import aiohttp
import asyncio
import time
async def on_request_start(session, trace_config_ctx, params):
"""请求开始"""
trace_config_ctx.start = time.time()
print(f'请求开始: {params.url}')
async def on_request_end(session, trace_config_ctx, params):
"""请求结束"""
elapsed = time.time() - trace_config_ctx.start
print(f'请求完成: {params.url}, 耗时: {elapsed:.3f}s, 状态: {params.response.status}')
async def on_request_exception(session, trace_config_ctx, params):
"""请求异常"""
print(f'请求异常: {params.url}, 错误: {params.exception}')
async def main():
# 创建追踪配置
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
trace_config.on_request_exception.append(on_request_exception)
async with aiohttp.ClientSession(trace_configs=[trace_config]) as session:
async with session.get('https://httpbin.org/get') as resp:
await resp.text()
asyncio.run(main())
并发控制
直接并发大量请求可能导致服务器压力过大或被封禁,需要控制并发数量。
使用 Semaphore
import asyncio
import aiohttp
async def fetch_with_limit(session, url, semaphore):
async with semaphore:
async with session.get(url) as response:
return await response.text()
async def main():
semaphore = asyncio.Semaphore(5)
async with aiohttp.ClientSession() as session:
urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(20)]
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(f'完成 {len(results)} 个请求')
asyncio.run(main())
Semaphore 限制同时进行的请求数量为 5 个。
使用 asyncio.Semaphore 封装
import asyncio
import aiohttp
from typing import List
class AsyncSpider:
def __init__(self, max_concurrent: int = 10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch(self, session: aiohttp.ClientSession, url: str, **kwargs):
async with self.semaphore:
try:
async with session.get(url, **kwargs) as response:
return await response.text()
except Exception as e:
print(f'请求失败 {url}: {e}')
return None
async def fetch_all(self, urls: List[str], **kwargs):
async with aiohttp.ClientSession() as session:
tasks = [self.fetch(session, url, **kwargs) for url in urls]
return await asyncio.gather(*tasks)
async def main():
spider = AsyncSpider(max_concurrent=5)
urls = [f'https://httpbin.org/get?id={i}' for i in range(20)]
results = await spider.fetch_all(urls)
print(f'成功: {sum(1 for r in results if r)} 个')
asyncio.run(main())
超时设置
import aiohttp
import asyncio
from aiohttp import ClientTimeout
async def main():
timeout = ClientTimeout(total=30, connect=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get('https://httpbin.org/delay/10') as resp:
print(await resp.text())
except asyncio.TimeoutError:
print('请求超时')
asyncio.run(main())
超时参数说明:
total:整个请求的总超时时间connect:连接超时时间sock_read:读取超时时间sock_connect:Socket 连接超时时间
错误处理
import aiohttp
import asyncio
from aiohttp import ClientError, ClientResponseError
async def safe_fetch(session, url):
try:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
except ClientResponseError as e:
print(f'HTTP 错误 {url}: {e.status}')
except ClientError as e:
print(f'请求错误 {url}: {e}')
except asyncio.TimeoutError:
print(f'请求超时 {url}')
except Exception as e:
print(f'未知错误 {url}: {e}')
return None
async def main():
async with aiohttp.ClientSession() as session:
urls = [
'https://httpbin.org/get',
'https://httpbin.org/status/404',
'https://httpbin.org/status/500',
]
tasks = [safe_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f'成功: {sum(1 for r in results if r)} 个')
asyncio.run(main())
重试机制
import aiohttp
import asyncio
import random
async def fetch_with_retry(session, url, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
async with session.get(url, **kwargs) as response:
if response.status == 200:
return await response.text()
elif response.status in [429, 500, 502, 503, 504]:
await asyncio.sleep(2 ** attempt + random.random())
continue
else:
return None
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
return None
async def main():
async with aiohttp.ClientSession() as session:
result = await fetch_with_retry(session, 'https://httpbin.org/get')
print(result[:100] if result else '失败')
asyncio.run(main())
代理设置
import aiohttp
import asyncio
async def main():
proxy = 'http://127.0.0.1:7890'
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/ip', proxy=proxy) as resp:
print(await resp.text())
asyncio.run(main())
代理池轮换:
import aiohttp
import asyncio
import random
class ProxyPool:
def __init__(self, proxies):
self.proxies = proxies
def get_proxy(self):
return random.choice(self.proxies)
async def fetch_with_proxy(session, url, proxy_pool):
proxy = proxy_pool.get_proxy()
async with session.get(url, proxy=proxy) as resp:
return await resp.text()
async def main():
proxy_pool = ProxyPool([
'http://proxy1:8080',
'http://proxy2:8080',
])
async with aiohttp.ClientSession() as session:
result = await fetch_with_proxy(session, 'https://httpbin.org/ip', proxy_pool)
print(result)
asyncio.run(main())
完整异步爬虫示例
import asyncio
import aiohttp
import random
import time
from typing import List, Optional
from bs4 import BeautifulSoup
class AsyncWebSpider:
def __init__(
self,
max_concurrent: int = 10,
request_delay: float = 0.5,
timeout: int = 30,
max_retries: int = 3,
):
self.max_concurrent = max_concurrent
self.request_delay = request_delay
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.semaphore = asyncio.Semaphore(max_concurrent)
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}
async def fetch(
self,
session: aiohttp.ClientSession,
url: str,
**kwargs
) -> Optional[str]:
async with self.semaphore:
await asyncio.sleep(random.uniform(0, self.request_delay))
for attempt in range(self.max_retries):
try:
async with session.get(url, **kwargs) as response:
if response.status == 200:
return await response.text()
elif response.status in [429, 500, 502, 503, 504]:
wait = 2 ** attempt + random.random()
await asyncio.sleep(wait)
else:
return None
except asyncio.TimeoutError:
print(f'超时: {url}')
except Exception as e:
print(f'错误: {url} - {e}')
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt)
return None
async def fetch_all(self, urls: List[str]) -> List[Optional[str]]:
async with aiohttp.ClientSession(
headers=self.headers,
timeout=self.timeout
) as session:
tasks = [self.fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
def parse_html(self, html: str, selector: str) -> List[str]:
soup = BeautifulSoup(html, 'lxml')
elements = soup.select(selector)
return [e.get_text(strip=True) for e in elements]
async def main():
spider = AsyncWebSpider(
max_concurrent=5,
request_delay=0.5,
timeout=30,
max_retries=3,
)
urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(20)]
start = time.time()
results = await spider.fetch_all(urls)
elapsed = time.time() - start
success = sum(1 for r in results if r)
print(f'完成: {success}/{len(urls)} 个请求')
print(f'耗时: {elapsed:.2f}秒')
if __name__ == '__main__':
asyncio.run(main())
异步文件写入
大量数据写入也需要异步处理:
import asyncio
import aiohttp
import aiofiles
import json
async def save_results(results, filename):
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(json.dumps(results, ensure_ascii=False, indent=2))
async def main():
spider = AsyncWebSpider(max_concurrent=5)
urls = [f'https://httpbin.org/get?id={i}' for i in range(10)]
results = await spider.fetch_all(urls)
data = [r for r in results if r]
await save_results(data, 'results.json')
print(f'已保存 {len(data)} 条数据')
asyncio.run(main())
小结
本章我们学习了:
- 同步与异步的区别 - 异步请求的效率优势
- asyncio 基础 - async/await 语法和并发执行
- aiohttp 入门 - 异步 HTTP 客户端的使用
- 并发控制 - 使用 Semaphore 限制并发数
- 超时和错误处理 - 健壮的爬虫设计
- 重试机制 - 处理临时故障
- 代理设置 - IP 轮换
- 完整示例 - 生产级异步爬虫
练习
- 使用 aiohttp 实现一个并发爬取 100 个页面的爬虫,限制并发数为 10
- 为异步爬虫添加重试和超时机制
- 实现异步保存爬取结果到 JSON 文件