跳到主要内容

异步爬虫

当需要爬取大量页面时,同步请求的效率会成为瓶颈。异步爬虫可以在等待网络响应的同时处理其他请求,大幅提升爬取效率。

理解异步编程的本质

在学习异步爬虫之前,我们需要先理解异步编程的核心概念。很多初学者会混淆"异步"和"多线程",实际上它们是两种完全不同的并发模型。

同步 vs 异步 vs 多线程

**同步(Synchronous)**是最简单的编程模型:代码一行一行执行,遇到 I/O 操作(如网络请求、文件读写)时,程序会阻塞等待,直到操作完成后才继续执行下一行。

# 同步执行:必须等待每个请求完成
import requests
import time

def sync_crawl():
start = time.time()
for i in range(5):
response = requests.get(f'https://httpbin.org/delay/1')
print(f'请求 {i+1}: {response.status_code}')
print(f'总耗时: {time.time() - start:.2f}秒')
# 结果:总耗时约 5 秒(串行执行)

**多线程(Multithreading)**通过创建多个线程来实现并发,每个线程独立执行。Python 的多线程受 GIL(全局解释器锁)限制,同一时刻只有一个线程在执行 Python 代码,但对于 I/O 密集型任务,线程在等待 I/O 时会释放 GIL,其他线程可以执行。

# 多线程执行:多个请求并行
import threading
import requests
import time

def fetch(url, results, index):
response = requests.get(url)
results[index] = response.status_code

def threaded_crawl():
start = time.time()
results = [None] * 5
threads = []

for i in range(5):
t = threading.Thread(
target=fetch,
args=(f'https://httpbin.org/delay/1', results, i)
)
threads.append(t)
t.start()

for t in threads:
t.join()

print(f'总耗时: {time.time() - start:.2f}秒')
# 结果:总耗时约 1 秒(并行执行)

**异步(Asynchronous)**使用单线程实现并发,核心思想是"非阻塞 I/O"。当遇到 I/O 操作时,不阻塞等待,而是注册一个回调,继续执行其他任务。当 I/O 完成时,事件循环会调用之前注册的回调。

# 异步执行:单线程并发
import asyncio
import aiohttp
import time

async def async_crawl():
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [
session.get(f'https://httpbin.org/delay/1')
for _ in range(5)
]
responses = await asyncio.gather(*tasks)
for r in responses:
print(f'状态码: {r.status}')
print(f'总耗时: {time.time() - start:.2f}秒')
# 结果:总耗时约 1 秒(单线程并发)

为什么异步爬虫更高效?

模型线程数内存占用上下文切换适用场景
同步1最低简单任务、顺序执行
多线程N较高(每线程约 8MB)有(内核级)CPU 密集型、已有同步库
异步1有(用户态)I/O 密集型、高并发

异步爬虫的优势在于:

  1. 资源占用少:单线程模型,不需要为每个请求创建线程
  2. 无 GIL 竞争:同一时刻只有一个协程执行,没有线程安全问题
  3. 上下文切换开销小:在用户态切换,比内核态线程切换快得多
  4. 可扩展性强:可以轻松处理成千上万个并发连接

asyncio 核心概念

asyncio 是 Python 的异步编程框架,理解其核心概念对编写高质量的异步爬虫至关重要。

事件循环(Event Loop)

事件循环是 asyncio 的核心,它是一个无限循环,负责调度和执行协程。可以把事件循环想象成一个调度员:

  1. 维护一个任务队列
  2. 从队列中取出就绪的任务执行
  3. 当任务遇到 I/O 操作时,暂停任务,注册回调
  4. 当 I/O 完成时,将任务重新放入就绪队列
  5. 继续执行下一个就绪的任务
import asyncio

async def task(name, delay):
print(f'{name} 开始')
await asyncio.sleep(delay) # 模拟 I/O 操作
print(f'{name} 完成')

# 事件循环的工作流程示意
async def main():
# 创建三个任务
t1 = asyncio.create_task(task('任务A', 2))
t2 = asyncio.create_task(task('任务B', 1))
t3 = asyncio.create_task(task('任务C', 3))

# 等待所有任务完成
await asyncio.gather(t1, t2, t3)

# 执行顺序(时间线):
# 0s: 任务A开始 -> 暂停等待2秒
# 0s: 任务B开始 -> 暂停等待1秒
# 0s: 任务C开始 -> 暂停等待3秒
# 1s: 任务B完成
# 2s: 任务A完成
# 3s: 任务C完成

asyncio.run(main())

事件循环的关键特点:

  • 单线程:所有协程都在同一个线程中执行,避免了线程安全问题
  • 协作式调度:协程主动让出控制权(通过 await),而不是被抢占
  • 非阻塞 I/O:I/O 操作通过操作系统提供的异步接口实现

协程(Coroutine)

协程是使用 async def 定义的函数,它是一个可以被暂停和恢复的函数。调用协程函数不会立即执行,而是返回一个协程对象:

import asyncio

async def my_coroutine():
print('协程开始')
await asyncio.sleep(1)
print('协程结束')

# 调用协程函数只返回协程对象,不会执行
coro = my_coroutine()
print(type(coro)) # <class 'coroutine'>

# 必须通过事件循环执行
asyncio.run(coro)

协程的三种状态:

import asyncio
import inspect

async def demo():
await asyncio.sleep(0.1)

coro = demo()
print(inspect.iscoroutine(coro)) # True

# 执行协程
asyncio.run(coro)

Task(任务)

Task 是对协程的封装,它将协程包装成一个可调度的任务,并跟踪其执行状态:

import asyncio

async def work(name):
print(f'{name} 开始')
await asyncio.sleep(1)
return f'{name} 结果'

async def main():
# 方式1:create_task 创建任务(推荐)
task1 = asyncio.create_task(work('任务1'))

# 方式2:ensure_future 创建任务
task2 = asyncio.ensure_future(work('任务2'))

# Task 的状态
print(f'任务1 完成: {task1.done()}') # False
print(f'任务1 取消: {task1.cancelled()}') # False

# 等待任务完成
result1 = await task1
result2 = await task2

print(f'任务1 完成: {task1.done()}') # True
print(f'结果: {result1}, {result2}')

asyncio.run(main())

协程与 Task 的区别

特性协程(Coroutine)Task
创建async def 定义create_task() 包装协程
调度不自动调度自动加入事件循环调度
状态无状态跟踪done()cancelled() 状态
结果需要 await 获取可通过 result() 获取
取消无法取消可通过 cancel() 取消

Future

Future 是一个低-level 的可等待对象,表示一个异步操作的最终结果。Task 是 Future 的子类:

import asyncio

async def main():
# 创建 Future
future = asyncio.Future()

# 设置结果
future.set_result('完成')

# 获取结果
result = await future
print(result) # 完成

asyncio.run(main())

实际开发中,我们主要使用 Task,很少直接操作 Future。

可等待对象(Awaitable)

在 Python 中,可以使用 await 表达式的对象称为可等待对象,包括:

  1. 协程async def 定义的函数返回的对象
  2. Task:通过 create_task() 创建
  3. Future:低-level 的异步结果对象
import asyncio

async def coro():
return '协程结果'

async def main():
# 等待协程
result1 = await coro()

# 等待 Task
task = asyncio.create_task(coro())
result2 = await task

# 等待 Future
future = asyncio.Future()
future.set_result('Future结果')
result3 = await future

print(result1, result2, result3)

asyncio.run(main())

并发原语

asyncio 提供了多种并发控制原语,用于协调多个协程的执行:

Semaphore(信号量)

限制同时访问某个资源的协程数量:

import asyncio
import aiohttp

async def fetch_with_limit(semaphore, session, url):
async with semaphore: # 获取信号量
async with session.get(url) as response:
return await response.text()

async def main():
# 限制最多 5 个并发请求
semaphore = asyncio.Semaphore(5)

async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(semaphore, session, f'https://example.com/{i}')
for i in range(20)
]
results = await asyncio.gather(*tasks)

asyncio.run(main())

Lock(锁)

确保同一时刻只有一个协程访问共享资源:

import asyncio

class AsyncCounter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()

async def increment(self):
async with self.lock: # 获取锁
current = self.value
await asyncio.sleep(0.01) # 模拟耗时操作
self.value = current + 1

async def main():
counter = AsyncCounter()

# 并发增加 10 次
await asyncio.gather(*[counter.increment() for _ in range(10)])

print(f'最终值: {counter.value}') # 正确: 10

asyncio.run(main())

Event(事件)

用于协程间的通知机制:

import asyncio

async def waiter(event, name):
print(f'{name} 等待事件')
await event.wait() # 等待事件被设置
print(f'{name} 收到通知')

async def setter(event):
print('准备设置事件')
await asyncio.sleep(2)
print('设置事件')
event.set() # 设置事件,唤醒所有等待者

async def main():
event = asyncio.Event()

await asyncio.gather(
waiter(event, '等待者1'),
waiter(event, '等待者2'),
setter(event),
)

asyncio.run(main())

Queue(队列)

用于协程间的数据传递:

import asyncio
import random

async def producer(queue, name):
for i in range(5):
item = f'{name}-{i}'
await queue.put(item)
print(f'生产: {item}')
await asyncio.sleep(random.random())

async def consumer(queue, name):
while True:
item = await queue.get()
print(f'{name} 消费: {item}')
queue.task_done()

async def main():
queue = asyncio.Queue(maxsize=10)

# 启动生产者和消费者
producers = [producer(queue, f'生产者{i}') for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, f'消费者{i}')) for i in range(3)]

await asyncio.gather(*producers)
await queue.join() # 等待队列清空

# 取消消费者
for c in consumers:
c.cancel()

asyncio.run(main())

同步与异步的性能对比

同步请求的问题

传统的同步请求(如 requests)在发送请求后会阻塞等待响应,一次只能处理一个请求:

import requests
import time

def fetch_sync(urls):
results = []
for url in urls:
response = requests.get(url)
results.append(response.status_code)
return results

urls = [f'https://httpbin.org/delay/1' for _ in range(10)]

start = time.time()
results = fetch_sync(urls)
print(f'同步耗时: {time.time() - start:.2f}秒')

上面代码爬取 10 个页面,每个页面延迟 1 秒,总耗时约 10 秒。大部分时间都在等待网络响应。

异步请求的优势

异步请求可以在等待一个请求响应的同时,发送其他请求:

import asyncio
import aiohttp
import time

async def fetch_async(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results

async def fetch_one(session, url):
async with session.get(url) as response:
return response.status

urls = [f'https://httpbin.org/delay/1' for _ in range(10)]

start = time.time()
results = asyncio.run(fetch_async(urls))
print(f'异步耗时: {time.time() - start:.2f}秒')

同样的 10 个页面,异步方式只需约 1 秒。

asyncio 基础

asyncio 是 Python 的异步编程框架,提供了事件循环、协程等核心功能。

async/await 语法

import asyncio

async def say_hello():
print('Hello')
await asyncio.sleep(1)
print('World')

asyncio.run(say_hello())

async def 定义的函数是协程函数,调用它返回协程对象。await 用于等待异步操作完成。

并发执行多个协程

import asyncio

async def task(name, delay):
print(f'{name} 开始')
await asyncio.sleep(delay)
print(f'{name} 完成')
return f'{name} 结果'

async def main():
results = await asyncio.gather(
task('任务A', 2),
task('任务B', 1),
task('任务C', 3),
)
print(results)

asyncio.run(main())

asyncio.gather() 可以并发执行多个协程,等待全部完成后返回结果列表。

创建任务

import asyncio

async def main():
task1 = asyncio.create_task(say_hello())
task2 = asyncio.create_task(say_hello())

await task1
await task2

asyncio.run(main())

asyncio.create_task() 将协程包装为任务,立即调度执行。

aiohttp 入门

aiohttp 是基于 asyncio 的异步 HTTP 客户端/服务器框架,非常适合编写异步爬虫。

安装

pip install aiohttp

# 可选:加速 SSL
pip install aiohttp[speedups]

基本用法

import aiohttp
import asyncio

async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()

async def main():
html = await fetch_url('https://httpbin.org/get')
print(html)

asyncio.run(main())

ClientSession 的作用

ClientSession 是 aiohttp 的核心组件,它管理连接池和 Cookie:

import aiohttp
import asyncio

async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/cookies/set?name=value') as resp:
print(await resp.text())

async with session.get('https://httpbin.org/cookies') as resp:
print(await resp.text())

asyncio.run(main())

同一个 Session 会自动管理 Cookie,多个请求共享连接池。

请求方法

import aiohttp
import asyncio

async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as resp:
print('GET:', resp.status)

async with session.post('https://httpbin.org/post', json={'key': 'value'}) as resp:
print('POST:', resp.status)

async with session.put('https://httpbin.org/put', data={'key': 'value'}) as resp:
print('PUT:', resp.status)

async with session.delete('https://httpbin.org/delete') as resp:
print('DELETE:', resp.status)

asyncio.run(main())

请求参数

import aiohttp
import asyncio

async def main():
async with aiohttp.ClientSession() as session:
params = {'key': 'value', 'page': 1}
async with session.get('https://httpbin.org/get', params=params) as resp:
print(await resp.text())

asyncio.run(main())

请求头

import aiohttp
import asyncio

async def main():
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
}

async with aiohttp.ClientSession(headers=headers) as session:
async with session.get('https://httpbin.org/headers') as resp:
print(await resp.text())

asyncio.run(main())

响应处理

import aiohttp
import asyncio

async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as resp:
print('状态码:', resp.status)
print('响应头:', resp.headers['Content-Type'])
print('文本:', await resp.text())
print('JSON:', await resp.json())

asyncio.run(main())

aiohttp 高级用法

连接器配置

aiohttp 使用 TCPConnector 管理连接池,通过自定义连接器可以精细控制连接行为:

import aiohttp
import asyncio

async def main():
# 创建自定义连接器
connector = aiohttp.TCPConnector(
limit=30, # 连接池最大连接数(默认 100)
limit_per_host=10, # 每个主机的最大连接数(默认无限制)
ttl_dns_cache=300, # DNS 缓存时间(秒)
use_dns_cache=True, # 是否启用 DNS 缓存
)

async with aiohttp.ClientSession(connector=connector) as session:
# 使用配置好的连接器
async with session.get('https://httpbin.org/get') as resp:
print(await resp.text())

asyncio.run(main())

连接器关键参数说明

参数默认值说明
limit100连接池最大连接数
limit_per_host0每个主机的最大连接数,0 表示无限制
ttl_dns_cache10DNS 解析结果的缓存时间(秒)
use_dns_cacheTrue是否启用 DNS 缓存
force_closeFalse是否在响应后强制关闭连接
enable_cleanup_closedFalse是否启用已关闭连接的清理

连接池最佳实践

对于高并发爬虫,合理配置连接池至关重要:

import aiohttp
import asyncio
from typing import List, Optional

class OptimizedAsyncSpider:
"""优化连接池配置的异步爬虫"""

def __init__(
self,
max_connections: int = 50, # 总连接数
max_per_host: int = 10, # 每主机连接数
dns_cache_time: int = 300, # DNS 缓存 5 分钟
):
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=max_per_host,
ttl_dns_cache=dns_cache_time,
use_dns_cache=True,
)
self.session: Optional[aiohttp.ClientSession] = None

async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=self.connector,
connector_owner=False, # 不让 session 关闭 connector
)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
await self.connector.close()

async def fetch(self, url: str) -> Optional[str]:
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f'请求失败: {url}, 错误: {e}')
return None

async def main():
async with OptimizedAsyncSpider(
max_connections=50,
max_per_host=10,
) as spider:
urls = [f'https://httpbin.org/get?id={i}' for i in range(20)]
tasks = [spider.fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
print(f'成功: {sum(1 for r in results if r)} 个')

asyncio.run(main())

优雅关闭

aiohttp 的 ClientSession 关闭后,底层连接可能不会立即关闭。为了确保资源正确释放,需要在关闭事件循环前添加短暂等待:

import aiohttp
import asyncio

async def fetch_data():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as resp:
return await resp.text()

# 对于非 SSL 连接,零延迟即可
# await asyncio.sleep(0)

# 对于 SSL 连接,需要等待底层 SSL 连接关闭
# await asyncio.sleep(0.250)

async def main():
result = await fetch_data()

# SSL 连接需要等待 250ms 让底层连接关闭
await asyncio.sleep(0.250)

print(result[:100])

asyncio.run(main())

如果不添加延迟,可能会看到 ResourceWarning: unclosed transport 警告。

流式读取大文件

对于大文件或流式数据,不要一次性读取全部内容:

import aiohttp
import asyncio

async def download_large_file(url: str, output_path: str):
"""流式下载大文件"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
# 检查文件大小
content_length = response.content_length
print(f'文件大小: {content_length / 1024 / 1024:.2f} MB')

# 流式写入
with open(output_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
f.write(chunk)

print(f'下载完成: {output_path}')

async def stream_processing(url: str):
"""流式处理响应内容"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
# 逐行处理
async for line in response.content:
process_line(line)

async def main():
await download_large_file(
'https://example.com/large-file.zip',
'output.zip'
)

asyncio.run(main())

aiohttp 的 ClientSession 自动管理 Cookie:

import aiohttp
import asyncio

async def main():
# 创建带有初始 Cookie 的会话
cookies = {'session_id': 'abc123', 'user_token': 'xyz789'}

async with aiohttp.ClientSession(cookies=cookies) as session:
# 发送带有 Cookie 的请求
async with session.get('https://httpbin.org/cookies') as resp:
print(await resp.text())

# 查看当前 Cookie
print(session.cookie_jar.filter_cookies('httpbin.org'))

asyncio.run(main())

SSL/TLS 配置

控制 SSL 验证行为:

import aiohttp
import asyncio
import ssl
import certifi

async def main():
# 方式1:禁用 SSL 验证(不推荐,仅用于测试)
async with aiohttp.ClientSession() as session:
async with session.get('https://example.com', ssl=False) as resp:
print(await resp.text())

# 方式2:使用 certifi 提供的证书
ssl_context = ssl.create_default_context(cafile=certifi.where())
connector = aiohttp.TCPConnector(ssl=ssl_context)

async with aiohttp.ClientSession(connector=connector) as session:
async with session.get('https://example.com') as resp:
print(await resp.text())

# 方式3:验证证书指纹
# 确保连接到特定服务器
fingerprint = b'...' # SHA256 指纹
async with aiohttp.ClientSession() as session:
async with session.get('https://example.com', ssl=aiohttp.Fingerprint(fingerprint)) as resp:
print(await resp.text())

asyncio.run(main())

请求追踪

使用 TraceConfig 追踪请求的完整生命周期:

import aiohttp
import asyncio
import time

async def on_request_start(session, trace_config_ctx, params):
"""请求开始"""
trace_config_ctx.start = time.time()
print(f'请求开始: {params.url}')

async def on_request_end(session, trace_config_ctx, params):
"""请求结束"""
elapsed = time.time() - trace_config_ctx.start
print(f'请求完成: {params.url}, 耗时: {elapsed:.3f}s, 状态: {params.response.status}')

async def on_request_exception(session, trace_config_ctx, params):
"""请求异常"""
print(f'请求异常: {params.url}, 错误: {params.exception}')

async def main():
# 创建追踪配置
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
trace_config.on_request_exception.append(on_request_exception)

async with aiohttp.ClientSession(trace_configs=[trace_config]) as session:
async with session.get('https://httpbin.org/get') as resp:
await resp.text()

asyncio.run(main())

并发控制

直接并发大量请求可能导致服务器压力过大或被封禁,需要控制并发数量。

使用 Semaphore

import asyncio
import aiohttp

async def fetch_with_limit(session, url, semaphore):
async with semaphore:
async with session.get(url) as response:
return await response.text()

async def main():
semaphore = asyncio.Semaphore(5)

async with aiohttp.ClientSession() as session:
urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(20)]
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(f'完成 {len(results)} 个请求')

asyncio.run(main())

Semaphore 限制同时进行的请求数量为 5 个。

使用 asyncio.Semaphore 封装

import asyncio
import aiohttp
from typing import List

class AsyncSpider:
def __init__(self, max_concurrent: int = 10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)

async def fetch(self, session: aiohttp.ClientSession, url: str, **kwargs):
async with self.semaphore:
try:
async with session.get(url, **kwargs) as response:
return await response.text()
except Exception as e:
print(f'请求失败 {url}: {e}')
return None

async def fetch_all(self, urls: List[str], **kwargs):
async with aiohttp.ClientSession() as session:
tasks = [self.fetch(session, url, **kwargs) for url in urls]
return await asyncio.gather(*tasks)

async def main():
spider = AsyncSpider(max_concurrent=5)
urls = [f'https://httpbin.org/get?id={i}' for i in range(20)]
results = await spider.fetch_all(urls)
print(f'成功: {sum(1 for r in results if r)} 个')

asyncio.run(main())

超时设置

import aiohttp
import asyncio
from aiohttp import ClientTimeout

async def main():
timeout = ClientTimeout(total=30, connect=5)

async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get('https://httpbin.org/delay/10') as resp:
print(await resp.text())
except asyncio.TimeoutError:
print('请求超时')

asyncio.run(main())

超时参数说明:

  • total:整个请求的总超时时间
  • connect:连接超时时间
  • sock_read:读取超时时间
  • sock_connect:Socket 连接超时时间

错误处理

import aiohttp
import asyncio
from aiohttp import ClientError, ClientResponseError

async def safe_fetch(session, url):
try:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
except ClientResponseError as e:
print(f'HTTP 错误 {url}: {e.status}')
except ClientError as e:
print(f'请求错误 {url}: {e}')
except asyncio.TimeoutError:
print(f'请求超时 {url}')
except Exception as e:
print(f'未知错误 {url}: {e}')
return None

async def main():
async with aiohttp.ClientSession() as session:
urls = [
'https://httpbin.org/get',
'https://httpbin.org/status/404',
'https://httpbin.org/status/500',
]
tasks = [safe_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f'成功: {sum(1 for r in results if r)} 个')

asyncio.run(main())

重试机制

import aiohttp
import asyncio
import random

async def fetch_with_retry(session, url, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
async with session.get(url, **kwargs) as response:
if response.status == 200:
return await response.text()
elif response.status in [429, 500, 502, 503, 504]:
await asyncio.sleep(2 ** attempt + random.random())
continue
else:
return None
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
return None

async def main():
async with aiohttp.ClientSession() as session:
result = await fetch_with_retry(session, 'https://httpbin.org/get')
print(result[:100] if result else '失败')

asyncio.run(main())

代理设置

import aiohttp
import asyncio

async def main():
proxy = 'http://127.0.0.1:7890'

async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/ip', proxy=proxy) as resp:
print(await resp.text())

asyncio.run(main())

代理池轮换:

import aiohttp
import asyncio
import random

class ProxyPool:
def __init__(self, proxies):
self.proxies = proxies

def get_proxy(self):
return random.choice(self.proxies)

async def fetch_with_proxy(session, url, proxy_pool):
proxy = proxy_pool.get_proxy()
async with session.get(url, proxy=proxy) as resp:
return await resp.text()

async def main():
proxy_pool = ProxyPool([
'http://proxy1:8080',
'http://proxy2:8080',
])

async with aiohttp.ClientSession() as session:
result = await fetch_with_proxy(session, 'https://httpbin.org/ip', proxy_pool)
print(result)

asyncio.run(main())

完整异步爬虫示例

import asyncio
import aiohttp
import random
import time
from typing import List, Optional
from bs4 import BeautifulSoup

class AsyncWebSpider:
def __init__(
self,
max_concurrent: int = 10,
request_delay: float = 0.5,
timeout: int = 30,
max_retries: int = 3,
):
self.max_concurrent = max_concurrent
self.request_delay = request_delay
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.semaphore = asyncio.Semaphore(max_concurrent)

self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}

async def fetch(
self,
session: aiohttp.ClientSession,
url: str,
**kwargs
) -> Optional[str]:
async with self.semaphore:
await asyncio.sleep(random.uniform(0, self.request_delay))

for attempt in range(self.max_retries):
try:
async with session.get(url, **kwargs) as response:
if response.status == 200:
return await response.text()
elif response.status in [429, 500, 502, 503, 504]:
wait = 2 ** attempt + random.random()
await asyncio.sleep(wait)
else:
return None
except asyncio.TimeoutError:
print(f'超时: {url}')
except Exception as e:
print(f'错误: {url} - {e}')
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt)

return None

async def fetch_all(self, urls: List[str]) -> List[Optional[str]]:
async with aiohttp.ClientSession(
headers=self.headers,
timeout=self.timeout
) as session:
tasks = [self.fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)

def parse_html(self, html: str, selector: str) -> List[str]:
soup = BeautifulSoup(html, 'lxml')
elements = soup.select(selector)
return [e.get_text(strip=True) for e in elements]

async def main():
spider = AsyncWebSpider(
max_concurrent=5,
request_delay=0.5,
timeout=30,
max_retries=3,
)

urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(20)]

start = time.time()
results = await spider.fetch_all(urls)
elapsed = time.time() - start

success = sum(1 for r in results if r)
print(f'完成: {success}/{len(urls)} 个请求')
print(f'耗时: {elapsed:.2f}秒')

if __name__ == '__main__':
asyncio.run(main())

异步文件写入

大量数据写入也需要异步处理:

import asyncio
import aiohttp
import aiofiles
import json

async def save_results(results, filename):
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(json.dumps(results, ensure_ascii=False, indent=2))

async def main():
spider = AsyncWebSpider(max_concurrent=5)
urls = [f'https://httpbin.org/get?id={i}' for i in range(10)]
results = await spider.fetch_all(urls)

data = [r for r in results if r]
await save_results(data, 'results.json')
print(f'已保存 {len(data)} 条数据')

asyncio.run(main())

小结

本章我们学习了:

  1. 同步与异步的区别 - 异步请求的效率优势
  2. asyncio 基础 - async/await 语法和并发执行
  3. aiohttp 入门 - 异步 HTTP 客户端的使用
  4. 并发控制 - 使用 Semaphore 限制并发数
  5. 超时和错误处理 - 健壮的爬虫设计
  6. 重试机制 - 处理临时故障
  7. 代理设置 - IP 轮换
  8. 完整示例 - 生产级异步爬虫

练习

  1. 使用 aiohttp 实现一个并发爬取 100 个页面的爬虫,限制并发数为 10
  2. 为异步爬虫添加重试和超时机制
  3. 实现异步保存爬取结果到 JSON 文件