异步爬虫
当需要爬取大量页面时,同步请求的效率会成为瓶颈。异步爬虫可以在等待网络响应的同时处理其他请求,大幅提升爬取效率。
同步与异步的区别
同步请求的问题
传统的同步请求(如 requests)在发送请求后会阻塞等待响应,一次只能处理一个请求:
import requests
import time
def fetch_sync(urls):
results = []
for url in urls:
response = requests.get(url)
results.append(response.status_code)
return results
urls = [f'https://httpbin.org/delay/1' for _ in range(10)]
start = time.time()
results = fetch_sync(urls)
print(f'同步耗时: {time.time() - start:.2f}秒')
上面代码爬取 10 个页面,每个页面延迟 1 秒,总耗时约 10 秒。大部分时间都在等待网络响应。
异步请求的优势
异步请求可以在等待一个请求响应的同时,发送其他请求:
import asyncio
import aiohttp
import time
async def fetch_async(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def fetch_one(session, url):
async with session.get(url) as response:
return response.status
urls = [f'https://httpbin.org/delay/1' for _ in range(10)]
start = time.time()
results = asyncio.run(fetch_async(urls))
print(f'异步耗时: {time.time() - start:.2f}秒')
同样的 10 个页面,异步方式只需约 1 秒。
asyncio 基础
asyncio 是 Python 的异步编程框架,提供了事件循环、协程等核心功能。
async/await 语法
import asyncio
async def say_hello():
print('Hello')
await asyncio.sleep(1)
print('World')
asyncio.run(say_hello())
async def 定义的函数是协程函数,调用它返回协程对象。await 用于等待异步操作完成。
并发执行多个协程
import asyncio
async def task(name, delay):
print(f'{name} 开始')
await asyncio.sleep(delay)
print(f'{name} 完成')
return f'{name} 结果'
async def main():
results = await asyncio.gather(
task('任务A', 2),
task('任务B', 1),
task('任务C', 3),
)
print(results)
asyncio.run(main())
asyncio.gather() 可以并发执行多个协程,等待全部完成后返回结果列表。
创建任务
import asyncio
async def main():
task1 = asyncio.create_task(say_hello())
task2 = asyncio.create_task(say_hello())
await task1
await task2
asyncio.run(main())
asyncio.create_task() 将协程包装为任务,立即调度执行。
aiohttp 入门
aiohttp 是基于 asyncio 的异步 HTTP 客户端/服务器框架,非常适合编写异步爬虫。
安装
pip install aiohttp
# 可选:加速 SSL
pip install aiohttp[speedups]
基本用法
import aiohttp
import asyncio
async def fetch_url(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
html = await fetch_url('https://httpbin.org/get')
print(html)
asyncio.run(main())
ClientSession 的作用
ClientSession 是 aiohttp 的核心组件,它管理连接池和 Cookie:
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/cookies/set?name=value') as resp:
print(await resp.text())
async with session.get('https://httpbin.org/cookies') as resp:
print(await resp.text())
asyncio.run(main())
同一个 Session 会自动管理 Cookie,多个请求共享连接池。
请求方法
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as resp:
print('GET:', resp.status)
async with session.post('https://httpbin.org/post', json={'key': 'value'}) as resp:
print('POST:', resp.status)
async with session.put('https://httpbin.org/put', data={'key': 'value'}) as resp:
print('PUT:', resp.status)
async with session.delete('https://httpbin.org/delete') as resp:
print('DELETE:', resp.status)
asyncio.run(main())
请求参数
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
params = {'key': 'value', 'page': 1}
async with session.get('https://httpbin.org/get', params=params) as resp:
print(await resp.text())
asyncio.run(main())
请求头
import aiohttp
import asyncio
async def main():
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
}
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get('https://httpbin.org/headers') as resp:
print(await resp.text())
asyncio.run(main())
响应处理
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as resp:
print('状态码:', resp.status)
print('响应头:', resp.headers['Content-Type'])
print('文本:', await resp.text())
print('JSON:', await resp.json())
asyncio.run(main())
并发控制
直接并发大量请求可能导致服务器压力过大或被封禁,需要控制并发数量。
使用 Semaphore
import asyncio
import aiohttp
async def fetch_with_limit(session, url, semaphore):
async with semaphore:
async with session.get(url) as response:
return await response.text()
async def main():
semaphore = asyncio.Semaphore(5)
async with aiohttp.ClientSession() as session:
urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(20)]
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(f'完成 {len(results)} 个请求')
asyncio.run(main())
Semaphore 限制同时进行的请求数量为 5 个。
使用 asyncio.Semaphore 封装
import asyncio
import aiohttp
from typing import List
class AsyncSpider:
def __init__(self, max_concurrent: int = 10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch(self, session: aiohttp.ClientSession, url: str, **kwargs):
async with self.semaphore:
try:
async with session.get(url, **kwargs) as response:
return await response.text()
except Exception as e:
print(f'请求失败 {url}: {e}')
return None
async def fetch_all(self, urls: List[str], **kwargs):
async with aiohttp.ClientSession() as session:
tasks = [self.fetch(session, url, **kwargs) for url in urls]
return await asyncio.gather(*tasks)
async def main():
spider = AsyncSpider(max_concurrent=5)
urls = [f'https://httpbin.org/get?id={i}' for i in range(20)]
results = await spider.fetch_all(urls)
print(f'成功: {sum(1 for r in results if r)} 个')
asyncio.run(main())
超时设置
import aiohttp
import asyncio
from aiohttp import ClientTimeout
async def main():
timeout = ClientTimeout(total=30, connect=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get('https://httpbin.org/delay/10') as resp:
print(await resp.text())
except asyncio.TimeoutError:
print('请求超时')
asyncio.run(main())
超时参数说明:
total:整个请求的总超时时间connect:连接超时时间sock_read:读取超时时间sock_connect:Socket 连接超时时间
错误处理
import aiohttp
import asyncio
from aiohttp import ClientError, ClientResponseError
async def safe_fetch(session, url):
try:
async with session.get(url) as response:
response.raise_for_status()
return await response.text()
except ClientResponseError as e:
print(f'HTTP 错误 {url}: {e.status}')
except ClientError as e:
print(f'请求错误 {url}: {e}')
except asyncio.TimeoutError:
print(f'请求超时 {url}')
except Exception as e:
print(f'未知错误 {url}: {e}')
return None
async def main():
async with aiohttp.ClientSession() as session:
urls = [
'https://httpbin.org/get',
'https://httpbin.org/status/404',
'https://httpbin.org/status/500',
]
tasks = [safe_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f'成功: {sum(1 for r in results if r)} 个')
asyncio.run(main())
重试机制
import aiohttp
import asyncio
import random
async def fetch_with_retry(session, url, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
async with session.get(url, **kwargs) as response:
if response.status == 200:
return await response.text()
elif response.status in [429, 500, 502, 503, 504]:
await asyncio.sleep(2 ** attempt + random.random())
continue
else:
return None
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
return None
async def main():
async with aiohttp.ClientSession() as session:
result = await fetch_with_retry(session, 'https://httpbin.org/get')
print(result[:100] if result else '失败')
asyncio.run(main())
代理设置
import aiohttp
import asyncio
async def main():
proxy = 'http://127.0.0.1:7890'
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/ip', proxy=proxy) as resp:
print(await resp.text())
asyncio.run(main())
代理池轮换:
import aiohttp
import asyncio
import random
class ProxyPool:
def __init__(self, proxies):
self.proxies = proxies
def get_proxy(self):
return random.choice(self.proxies)
async def fetch_with_proxy(session, url, proxy_pool):
proxy = proxy_pool.get_proxy()
async with session.get(url, proxy=proxy) as resp:
return await resp.text()
async def main():
proxy_pool = ProxyPool([
'http://proxy1:8080',
'http://proxy2:8080',
])
async with aiohttp.ClientSession() as session:
result = await fetch_with_proxy(session, 'https://httpbin.org/ip', proxy_pool)
print(result)
asyncio.run(main())
完整异步爬虫示例
import asyncio
import aiohttp
import random
import time
from typing import List, Optional
from bs4 import BeautifulSoup
class AsyncWebSpider:
def __init__(
self,
max_concurrent: int = 10,
request_delay: float = 0.5,
timeout: int = 30,
max_retries: int = 3,
):
self.max_concurrent = max_concurrent
self.request_delay = request_delay
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.semaphore = asyncio.Semaphore(max_concurrent)
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}
async def fetch(
self,
session: aiohttp.ClientSession,
url: str,
**kwargs
) -> Optional[str]:
async with self.semaphore:
await asyncio.sleep(random.uniform(0, self.request_delay))
for attempt in range(self.max_retries):
try:
async with session.get(url, **kwargs) as response:
if response.status == 200:
return await response.text()
elif response.status in [429, 500, 502, 503, 504]:
wait = 2 ** attempt + random.random()
await asyncio.sleep(wait)
else:
return None
except asyncio.TimeoutError:
print(f'超时: {url}')
except Exception as e:
print(f'错误: {url} - {e}')
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt)
return None
async def fetch_all(self, urls: List[str]) -> List[Optional[str]]:
async with aiohttp.ClientSession(
headers=self.headers,
timeout=self.timeout
) as session:
tasks = [self.fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
def parse_html(self, html: str, selector: str) -> List[str]:
soup = BeautifulSoup(html, 'lxml')
elements = soup.select(selector)
return [e.get_text(strip=True) for e in elements]
async def main():
spider = AsyncWebSpider(
max_concurrent=5,
request_delay=0.5,
timeout=30,
max_retries=3,
)
urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(20)]
start = time.time()
results = await spider.fetch_all(urls)
elapsed = time.time() - start
success = sum(1 for r in results if r)
print(f'完成: {success}/{len(urls)} 个请求')
print(f'耗时: {elapsed:.2f}秒')
if __name__ == '__main__':
asyncio.run(main())
异步文件写入
大量数据写入也需要异步处理:
import asyncio
import aiohttp
import aiofiles
import json
async def save_results(results, filename):
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(json.dumps(results, ensure_ascii=False, indent=2))
async def main():
spider = AsyncWebSpider(max_concurrent=5)
urls = [f'https://httpbin.org/get?id={i}' for i in range(10)]
results = await spider.fetch_all(urls)
data = [r for r in results if r]
await save_results(data, 'results.json')
print(f'已保存 {len(data)} 条数据')
asyncio.run(main())
小结
本章我们学习了:
- 同步与异步的区别 - 异步请求的效率优势
- asyncio 基础 - async/await 语法和并发执行
- aiohttp 入门 - 异步 HTTP 客户端的使用
- 并发控制 - 使用 Semaphore 限制并发数
- 超时和错误处理 - 健壮的爬虫设计
- 重试机制 - 处理临时故障
- 代理设置 - IP 轮换
- 完整示例 - 生产级异步爬虫
练习
- 使用 aiohttp 实现一个并发爬取 100 个页面的爬虫,限制并发数为 10
- 为异步爬虫添加重试和超时机制
- 实现异步保存爬取结果到 JSON 文件