反反爬策略
网站通常会采取各种反爬措施来阻止自动化爬取。本章将介绍常见的反爬机制及应对策略。
法律提示
请只在合法合规的前提下使用这些技术,遵守网站的 robots.txt 和服务条款。
常见反爬机制
User-Agent 伪装
基础 User-Agent 轮换
import requests
import random
# 常见 User-Agent 列表
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gocko) Version/17.2 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
]
def get_random_ua():
return random.choice(USER_AGENTS)
# 使用随机 User-Agent
headers = {
'User-Agent': get_random_ua()
}
response = requests.get(url, headers=headers)
使用 fake-useragent 库
pip install fake-useragent
from fake_useragent import UserAgent
import requests
ua = UserAgent()
headers = {'User-Agent': ua.random}
response = requests.get(url, headers=headers)
IP 代理池
使用代理
import requests
# 使用单个代理
proxies = {
'http': 'http://127.0.0.1:7890',
'https': 'http://127.0.0.1:7890'
}
response = requests.get(url, proxies=proxies)
# 使用代理池
proxy_list = [
'http://user:pass@ip1:port1',
'http://user:pass@ip2:port2',
]
def get_proxy():
return random.choice(proxy_list)
response = requests.get(url, proxies={'http': get_proxy()})
免费代理获取
import requests
from bs4 import BeautifulSoup
def get_free_proxies():
"""获取免费代理"""
url = 'https://www.free-proxy-list.net/'
response = requests.get(url)
soup = BeautifulSoup(response.text, 'lxml')
proxies = []
for row in soup.select('table.table tbody tr'):
cols = row.find_all('td')
if cols[4].text == 'yes': # 支持 HTTPS
ip = cols[0].text
port = cols[1].text
proxies.append(f'http://{ip}:{port}')
return proxies
专业代理服务
常用的付费代理服务:
- 阿布云:提供 HTTP/HTTPS/SOCKS5 代理
- 讯代理:高质量代理池
- 青果代理:企业级代理服务
# 使用阿布云代理示例
proxy_host = 'http-dyn.abuyun.com'
proxy_port = '9020'
proxy_user = 'YOUR_PROXY_USER'
proxy_pass = 'YOUR_PROXY_PASS'
proxies = {
'http': f'http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}',
'https': f'http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}',
}
response = requests.get(url, proxies=proxies)
请求频率控制
基础延迟
import time
import random
# 固定延迟
time.sleep(2) # 每次请求后等待2秒
# 随机延迟(更自然)
delay = random.uniform(1, 3) # 1-3秒随机延迟
time.sleep(delay)
智能延迟
import time
import random
from collections import defaultdict
class SmartDelayer:
def __init__(self):
self.delays = defaultdict(list)
def wait(self, domain, min_delay=1, max_delay=5):
"""智能等待"""
# 检查上次请求时间
last_request = self.delays[domain]
if last_request:
elapsed = time.time() - last_request[0]
if elapsed < min_delay:
sleep_time = random.uniform(min_delay, max_delay)
time.sleep(sleep_time)
# 记录本次请求时间
self.delays[domain] = [time.time()]
def random_wait(self, min_delay=1, max_delay=5):
"""随机等待"""
time.sleep(random.uniform(min_delay, max_delay))
# 使用
delayer = SmartDelayer()
for url in urls:
delayer.wait('example.com')
response = requests.get(url)
指数退避
import time
import random
def request_with_backoff(url, max_retries=5):
"""指数退避重试"""
for attempt in range(max_retries):
try:
response = requests.get(url)
if response.status_code == 200:
return response
except Exception as e:
print(f"请求失败: {e}")
# 指数退避
delay = 2 ** attempt + random.uniform(0, 1)
print(f"等待 {delay:.2f} 秒后重试...")
time.sleep(delay)
return None
Cookie 处理
自动处理 Cookie
import requests
# 使用 Session 自动维护 Cookie
session = requests.Session()
# 设置 Cookie
session.cookies.set('session_id', 'abc123')
# 访问网站(会自动处理 Cookie)
response = session.get('https://example.com')
# 查看 Cookie
print(session.cookies.get_dict())
保存和加载 Cookie
import requests
import json
# 保存 Cookie 到文件
def save_cookies(session, filename):
with open(filename, 'w') as f:
json.dump(dict(session.cookies), f)
# 从文件加载 Cookie
def load_cookies(session, filename):
with open(filename, 'r') as f:
cookies = json.load(f)
session.cookies.update(cookies)
验证码处理
验证码是网站防止自动化访问的常见手段。理解不同类型的验证码及其处理方法,对于爬虫开发至关重要。
验证码类型
| 类型 | 特点 | 难度 | 处理方式 |
|---|---|---|---|
| 图形验证码 | 简单的字母数字组合 | 低 | OCR识别、打码平台 |
| 算术验证码 | 需要计算结果 | 低 | 本地计算 |
| 滑块验证码 | 拖动滑块到指定位置 | 中 | 轨迹模拟 |
| 点选验证码 | 按顺序点击特定位置 | 中 | 图像识别+坐标定位 |
| 短信验证码 | 需要手机接收 | 高 | 接码平台 |
| reCAPTCHA | 谷歌智能验证 | 高 | 第三方服务 |
图形验证码识别
图形验证码是最基础的验证码类型,通常包含扭曲的字母和数字。
使用 OCR 识别
import pytesseract
from PIL import Image
import requests
from io import BytesIO
# 安装依赖:
# pip install pytesseract pillow
# 还需要安装 Tesseract-OCR 软件
def recognize_text_captcha(image_url):
"""
使用 OCR 识别简单的图形验证码
参数:
image_url: 验证码图片的 URL
返回:
str: 识别出的文本
"""
# 下载图片
response = requests.get(image_url)
image = Image.open(BytesIO(response.content))
# 预处理:转为灰度图
image = image.convert('L')
# 二值化处理(提高对比度)
threshold = 128
image = image.point(lambda x: 255 if x > threshold else 0)
# OCR 识别
text = pytesseract.image_to_string(image, config='--psm 7')
return text.strip()
# 使用示例
result = recognize_text_captcha('https://example.com/captcha.png')
print(f'识别结果: {result}')
图像预处理提高识别率
对于复杂的验证码,需要更多的预处理步骤:
from PIL import Image, ImageFilter, ImageEnhance
import numpy as np
def preprocess_captcha(image_path):
"""
验证码图像预处理,提高 OCR 识别率
步骤:
1. 灰度化
2. 增强对比度
3. 去噪
4. 二值化
5. 边缘增强
"""
# 打开图片
img = Image.open(image_path)
# 1. 转为灰度图
img = img.convert('L')
# 2. 增强对比度
enhancer = ImageEnhance.Contrast(img)
img = enhancer.enhance(2.0)
# 3. 去噪(中值滤波)
img = img.filter(ImageFilter.MedianFilter(size=3))
# 4. 二值化
img = img.point(lambda x: 255 if x > 140 else 0, '1')
# 5. 边缘增强
img = img.filter(ImageFilter.EDGE_ENHANCE)
return img
def advanced_ocr_recognition(image_path):
"""使用预处理 + OCR 识别验证码"""
# 预处理
processed_img = preprocess_captcha(image_path)
# OCR 配置
# --psm 7: 将图像视为单行文本
# --oem 3: 使用默认 OCR 引擎模式
config = '--psm 7 --oem 3 -c tessedit_char_whitelist=0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
text = pytesseract.image_to_string(processed_img, config=config)
return text.strip()
使用打码平台
对于 OCR 难以识别的验证码,可以使用专业的打码平台:
import requests
import base64
import time
class CaptchaSolver:
"""
打码平台接口封装(以超级鹰为例)
使用前需要:
1. 注册打码平台账号
2. 充值获取积分
3. 获取软件 ID
"""
def __init__(self, username, password, soft_id):
self.username = username
self.password = password
self.soft_id = soft_id
self.base_url = 'http://api.chaojiying.net/'
def recognize(self, image_path, codetype=1004):
"""
识别验证码
参数:
image_path: 图片路径
codetype: 验证码类型
- 1004: 4位字母数字混合
- 1005: 5位字母数字混合
- 1006: 6位字母数字混合
- 1101: 计算题
- 9001: 滑块验证码
返回:
dict: 识别结果
"""
# 读取并编码图片
with open(image_path, 'rb') as f:
image_data = base64.b64encode(f.read()).decode()
# 构建请求参数
data = {
'user': self.username,
'pass2': self.password,
'softid': self.soft_id,
'codetype': codetype,
'file_base64': image_data
}
# 发送请求
response = requests.post(
self.base_url + 'Upload/Processing.php',
data=data,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
result = response.json()
if result.get('err_no') == 0:
return {
'success': True,
'code': result.get('pic_str'),
'pic_id': result.get('pic_id')
}
else:
return {
'success': False,
'error': result.get('err_str')
}
def report_error(self, pic_id):
"""报告识别错误,返还积分"""
data = {
'user': self.username,
'pass2': self.password,
'softid': self.soft_id,
'pic_id': pic_id,
'action': 'reporterror'
}
response = requests.post(
self.base_url + 'Upload/ReportError.php',
data=data
)
return response.json()
# 使用示例
solver = CaptchaSolver('username', 'password', 'soft_id')
result = solver.recognize('captcha.png', codetype=1004)
if result['success']:
print(f'验证码: {result["code"]}')
算术验证码
算术验证码需要计算表达式的结果:
import re
from PIL import Image
import pytesseract
def solve_math_captcha(image_path):
"""
识别并计算算术验证码
示例:识别 "3 + 5 = ?",返回 8
"""
# OCR 识别
img = Image.open(image_path)
text = pytesseract.image_to_string(img)
# 提取数字和运算符
# 匹配模式:数字 运算符 数字
match = re.search(r'(\d+)\s*([\+\-\×\*\/÷])\s*(\d+)', text)
if match:
num1 = int(match.group(1))
operator = match.group(2)
num2 = int(match.group(3))
# 计算
if operator in ['+', '+']:
return num1 + num2
elif operator in ['-', '-']:
return num1 - num2
elif operator in ['×', '*', 'x', 'X']:
return num1 * num2
elif operator in ['÷', '/']:
return num1 // num2
return None
# 使用示例
result = solve_math_captcha('math_captcha.png')
print(f'计算结果: {result}')
滑块验证码
滑块验证码需要拖动滑块到缺口位置,关键在于模拟人类的滑动轨迹。
获取缺口位置
from selenium import webdriver
from selenium.webdriver.common.by import By
from PIL import Image
import numpy as np
def find_gap_position(driver, bg_image_path, slider_image_path):
"""
通过对比背景图和滑块图,找到缺口位置
原理:
1. 背景图是有缺口的完整图片
2. 滑块图是缺口位置的图片
3. 通过图像对比找到缺口在背景图中的位置
"""
# 打开两张图片
bg_img = Image.open(bg_image_path)
slider_img = Image.open(slider_image_path)
# 转为数组
bg_array = np.array(bg_img)
slider_array = np.array(slider_img)
# 获取缺口位置(简化方法:从左到右扫描)
width, height = bg_img.size
for x in range(width):
# 对比每一列的像素差异
diff = np.abs(bg_array[:, x] - slider_array[:, x])
if np.mean(diff) > 50: # 差异阈值
return x
return None
生成人类滑动轨迹
import random
import math
def generate_slide_track(distance):
"""
生成模拟人类滑动的轨迹
人类滑动特点:
1. 先加速后减速
2. 可能有轻微抖动
3. 最后可能有微调
参数:
distance: 需要滑动的总距离
返回:
list: 每一步的移动距离列表
"""
tracks = []
current = 0
# 减速阈值(到达这个距离后开始减速)
mid = distance * 0.7
# 当前速度
v = 0
while current < distance:
if current < mid:
# 加速阶段
a = random.uniform(2, 5)
else:
# 减速阶段
a = random.uniform(-5, -2)
# 计算移动距离
s = v * 0.1 + 0.5 * a * 0.01 # 假设每步间隔 0.1 秒
s = max(1, int(s)) # 最小移动 1 像素
# 更新当前位置和速度
current += s
v = v + a * 0.1
# 添加随机抖动
if random.random() < 0.1:
s += random.choice([-1, 1])
tracks.append(s)
# 最后可能需要微调
total = sum(tracks)
if total < distance:
tracks.append(distance - total)
elif total > distance:
# 移除超出部分
diff = total - distance
for i in range(len(tracks) - 1, -1, -1):
if tracks[i] > diff:
tracks[i] -= diff
break
else:
diff -= tracks[i]
tracks[i] = 0
return tracks
# 使用示例
distance = 200 # 缺口距离
tracks = generate_slide_track(distance)
print(f'滑动轨迹: {tracks}')
print(f'总距离: {sum(tracks)}')
完整的滑块验证码处理
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
import time
import random
def solve_slider_captcha(driver, slider_selector, distance):
"""
完整的滑块验证码处理
参数:
driver: Selenium WebDriver
slider_selector: 滑块元素的选择器
distance: 需要滑动的距离
"""
# 找到滑块元素
slider = driver.find_element(By.CSS_SELECTOR, slider_selector)
# 生成轨迹
tracks = generate_slide_track(distance)
# 创建动作链
actions = ActionChains(driver)
# 点击并按住滑块
actions.click_and_hold(slider).pause(0.2)
# 按轨迹移动
for track in tracks:
# 添加随机 y 轴抖动(模拟人手不稳)
y_offset = random.randint(-2, 2)
actions.move_by_offset(track, y_offset).pause(random.uniform(0.01, 0.05))
# 松开前短暂停留
actions.pause(0.2).release()
# 执行动作
actions.perform()
# 完整使用示例
def login_with_slider(driver, username, password):
"""带滑块验证的登录"""
# 访问登录页面
driver.get('https://example.com/login')
# 输入用户名密码
driver.find_element(By.ID, 'username').send_keys(username)
driver.find_element(By.ID, 'password').send_keys(password)
# 点击登录按钮触发滑块
driver.find_element(By.ID, 'login-btn').click()
time.sleep(1)
# 等待滑块出现
time.sleep(2)
# 计算缺口距离(实际项目中需要通过图像对比计算)
distance = calculate_gap_distance(driver)
# 执行滑动
solve_slider_captcha(driver, '.slider-btn', distance)
# 等待验证结果
time.sleep(2)
点选验证码
点选验证码需要按顺序点击特定位置:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from PIL import Image
import io
def solve_click_captcha(driver, captcha_image_selector, target_texts):
"""
处理点选验证码
参数:
driver: Selenium WebDriver
captcha_image_selector: 验证码图片选择器
target_texts: 需要点击的文字列表,如 ['春', '夏', '秋']
"""
# 获取验证码图片
captcha_element = driver.find_element(By.CSS_SELECTOR, captcha_image_selector)
# 截取验证码图片
screenshot = driver.get_screenshot_as_png()
full_img = Image.open(io.BytesIO(screenshot))
# 获取验证码图片位置和大小
location = captcha_element.location
size = captcha_element.size
# 裁剪出验证码图片
left = location['x']
top = location['y']
right = left + size['width']
bottom = top + size['height']
captcha_img = full_img.crop((left, top, right, bottom))
# 识别需要点击的文字位置
# 这一步通常需要使用图像识别或者打码平台
click_positions = identify_click_positions(captcha_img, target_texts)
# 执行点击
actions = ActionChains(driver)
for pos in click_positions:
# 移动到位置并点击
actions.move_to_element_with_offset(
captcha_element,
pos['x'],
pos['y']
).click().pause(0.3)
actions.perform()
def identify_click_positions(captcha_img, target_texts):
"""
识别需要点击的位置
实际项目中,这一步通常需要:
1. 使用 OCR 识别图片中的所有文字
2. 找到目标文字的位置
3. 或使用打码平台返回坐标
"""
# 这里使用打码平台的返回格式示例
# 打码平台通常返回类似 "123,45|234,56|345,67" 的坐标
# 格式:x1,y1|x2,y2|x3,y3
# 模拟打码平台返回
positions = []
coords_str = "120,85|230,90|340,88" # 示例坐标
for coord in coords_str.split('|'):
x, y = map(int, coord.split(','))
positions.append({'x': x, 'y': y})
return positions
短信验证码
短信验证码需要手机接收,可以使用接码平台:
import requests
import time
class SMSService:
"""
短信接码平台接口封装
使用流程:
1. 获取手机号
2. 在网站填写手机号
3. 获取验证码
4. 释放手机号(可选)
"""
def __init__(self, api_key, platform='sms'):
self.api_key = api_key
self.platform = platform
# 不同平台的 API 地址
self.api_urls = {
'sms': 'https://api.sms-activate.org/stubs/handler_api.php',
'sms_man': 'https://sms-man.com/control',
}
def get_phone_number(self, service='go', country=0):
"""
获取手机号
参数:
service: 服务代码(如 go=Google, fb=Facebook)
country: 国家代码(0=俄罗斯,1=乌克兰等)
返回:
dict: 包含手机号和任务 ID
"""
params = {
'api_key': self.api_key,
'action': 'getNumber',
'service': service,
'country': country
}
response = requests.get(self.api_urls['sms'], params=params)
result = response.text
if result.startswith('ACCESS_NUMBER'):
parts = result.split(':')
return {
'success': True,
'task_id': parts[1],
'phone_number': parts[2]
}
else:
return {'success': False, 'error': result}
def get_sms_code(self, task_id, timeout=180):
"""
获取短信验证码
参数:
task_id: 任务 ID
timeout: 超时时间(秒)
返回:
str: 验证码
"""
start_time = time.time()
while time.time() - start_time < timeout:
params = {
'api_key': self.api_key,
'action': 'getStatus',
'id': task_id
}
response = requests.get(self.api_urls['sms'], params=params)
result = response.text
if result.startswith('STATUS_OK'):
# 提取验证码(假设是数字)
code = result.split(':')[1]
return code
elif result == 'STATUS_WAIT_CODE':
# 等待短信
time.sleep(5)
else:
break
return None
def release_number(self, task_id):
"""释放手机号(取消任务)"""
params = {
'api_key': self.api_key,
'action': 'setStatus',
'id': task_id,
'status': 8 # 8 = 取消
}
response = requests.get(self.api_urls['sms'], params=params)
return response.text
# 使用示例
def register_with_sms(api_key):
"""使用短信验证码注册"""
sms = SMSService(api_key)
# 1. 获取手机号
phone_result = sms.get_phone_number(service='go')
if not phone_result['success']:
print(f"获取手机号失败: {phone_result['error']}")
return
phone = phone_result['phone_number']
task_id = phone_result['task_id']
print(f"手机号: {phone}")
# 2. 在网站填写手机号并发送验证码(这里需要根据具体网站实现)
# send_sms_to_website(phone)
# 3. 获取验证码
code = sms.get_sms_code(task_id, timeout=120)
if code:
print(f"验证码: {code}")
# 使用验证码完成注册
else:
print("获取验证码超时")
sms.release_number(task_id)
reCAPTCHA 处理
谷歌的 reCAPTCHA 是最复杂的验证码之一:
# reCAPTCHA v2(复选框)通常需要第三方服务
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
def solve_recaptcha_v2(driver, site_key, page_url, api_key):
"""
使用第三方服务处理 reCAPTCHA v2
流程:
1. 获取 site_key 和页面 URL
2. 调用第三方服务获取 token
3. 注入 token 到页面
"""
# 使用 2captcha 或类似服务
# 1. 提交任务
submit_url = 'http://2captcha.com/in.php'
params = {
'key': api_key,
'method': 'userrecaptcha',
'googlekey': site_key,
'pageurl': page_url
}
response = requests.get(submit_url, params=params)
if not response.text.startswith('OK'):
return None
task_id = response.text.split('|')[1]
# 2. 等待结果
result_url = 'http://2captcha.com/res.php'
for _ in range(60): # 最多等待 5 分钟
time.sleep(5)
response = requests.get(result_url, params={
'key': api_key,
'action': 'get',
'id': task_id
})
if response.text.startswith('OK'):
token = response.text.split('|')[1]
break
elif response.text != 'CAPCHA_NOT_READY':
return None
else:
return None
# 3. 注入 token
driver.execute_script(f'''
document.getElementById('g-recaptcha-response').innerHTML = '{token}';
''')
return token
JavaScript 渲染
使用 Selenium
pip install selenium webdriver-manager
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
# 配置 Chrome 选项
chrome_options = Options()
chrome_options.add_argument('--headless') # 无头模式
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('user-agent=Mozilla/5.0 ...')
# 创建驱动
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=chrome_options
)
# 访问页面
driver.get('https://example.com')
# 等待页面加载
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, 'content'))
)
# 获取页面源码
html = driver.page_source
# 使用 BeautifulSoup 解析
soup = BeautifulSoup(html, 'lxml')
# 关闭驱动
driver.quit()
使用 Playwright
pip install playwright
playwright install chromium
import asyncio
from playwright.async_api import async_playwright
async def scrape_with_playwright():
async with async_playwright() as p:
# 启动浏览器
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# 设置 User-Agent
await page.set_extra_http_headers({
'User-Agent': 'Mozilla/5.0 ...'
})
# 访问页面
await page.goto('https://example.com')
# 等待加载
await page.wait_for_selector('.content')
# 获取页面内容
html = await page.content()
await browser.close()
return html
# 运行
html = asyncio.run(scrape_with_playwright())
寻找 API 接口
很多动态加载的数据来自 API,可以通过浏览器开发者工具找到:
import requests
# 通过开发者工具找到的 API 地址
api_url = 'https://example.com/api/data?page=1&size=20'
# 直接请求 API(通常更高效)
headers = {
'User-Agent': 'Mozilla/5.0 ...',
'Referer': 'https://example.com/'
}
response = requests.get(api_url, headers=headers)
data = response.json()
行为检测规避
模拟人类行为
import random
import time
def human_like_delay():
"""模拟人类阅读页面的延迟"""
base_delay = random.uniform(0.5, 2.0)
variance = random.uniform(0, 0.5)
time.sleep(base_delay + variance)
def scroll_page(driver):
"""模拟滚动页面"""
total_height = driver.execute_script("return document.body.scrollHeight")
for i in range(0, total_height, random.randint(100, 300)):
driver.execute_script(f"window.scrollTo(0, {i});")
time.sleep(random.uniform(0.1, 0.3))
# 随机回滚一点
driver.execute_script(f"window.scrollTo(0, {random.randint(0, 100)});")
鼠标移动模拟
from selenium.webdriver.common.action_chains import ActionChains
def human_mouse_movements(driver, element):
"""模拟人类鼠标移动"""
actions = ActionChains(driver)
# 从元素外开始
actions.move_to_element_with_offset(element, -50, -50)
# 移动到元素附近(不直接点击)
for _ in range(random.randint(3, 7)):
x_offset = random.randint(-20, 20)
y_offset = random.randint(-20, 20)
actions.move_by_offset(x_offset, y_offset)
actions.pause(random.uniform(0.1, 0.3))
actions.perform()
综合案例
import requests
import time
import random
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class StealthSpider:
def __init__(self):
self.session = requests.Session()
self.setup_session()
def setup_session(self):
"""配置 Session"""
# User-Agent
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ...',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 ...',
]
self.session.headers.update({
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
# 重试配置
retry = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
def request(self, url, **kwargs):
"""发送请求(带防封策略)"""
# 随机延迟
time.sleep(random.uniform(1, 3))
# 默认配置
kwargs.setdefault('timeout', 10)
kwargs.setdefault('allow_redirects', True)
# 使用代理(可选)
if random.random() < 0.3: # 30%概率使用代理
kwargs['proxies'] = self.get_proxy()
response = self.session.get(url, **kwargs)
# 检查是否被封
if response.status_code == 429:
print('请求过于频繁,等待更长时间...')
time.sleep(random.uniform(30, 60))
response = self.session.get(url, **kwargs)
return response
def get_proxy(self):
"""获取代理"""
# 从代理池获取
proxy = random.choice(self.proxy_list)
return {
'http': f'http://{proxy}',
'https': f'http://{proxy}'
}
# 使用示例
spider = StealthSpider()
response = spider.request('https://example.com')
分布式爬虫基础
当需要大规模爬取数据时,单机爬虫会遇到性能瓶颈。分布式爬虫通过多台机器协同工作,可以大幅提升爬取效率。
为什么需要分布式爬虫?
单机爬虫的局限性:
| 问题 | 单机爬虫 | 分布式爬虫 |
|---|---|---|
| 爬取速度 | 受限于单机性能 | 可横向扩展 |
| IP 封禁 | 容易被封 | 多 IP 分散风险 |
| 稳定性 | 单点故障 | 高可用 |
| 数据量 | 内存/磁盘限制 | 分布式存储 |
| 维护成本 | 低 | 较高 |
分布式爬虫架构
一个典型的分布式爬虫系统包含以下组件:
核心组件详解
1. 任务队列
任务队列是分布式爬虫的核心,负责存储和分发待爬取的 URL。
使用 Redis 实现任务队列:
import redis
import json
from typing import Optional, Dict, Any
class RedisTaskQueue:
"""
基于 Redis 的任务队列
特点:
- 支持优先级队列
- 支持任务持久化
- 支持分布式访问
"""
def __init__(self, host='localhost', port=6379, db=0):
self.redis = redis.Redis(host=host, port=port, db=db)
self.queue_key = 'spider:tasks'
self.processing_key = 'spider:processing'
def push(self, url: str, priority: int = 0, meta: Dict = None):
"""
添加任务到队列
参数:
url: 待爬取的 URL
priority: 优先级(数字越大优先级越高)
meta: 附加元数据
"""
task = {
'url': url,
'meta': meta or {},
'priority': priority
}
# 使用有序集合实现优先级队列
self.redis.zadd(
self.queue_key,
{json.dumps(task): priority}
)
def pop(self, timeout: int = 0) -> Optional[Dict]:
"""
获取一个任务
参数:
timeout: 阻塞等待超时(0 表示不阻塞)
返回:
任务字典或 None
"""
# 获取优先级最高的任务
result = self.redis.zpopmax(self.queue_key)
if result:
task_json, _ = result[0]
task = json.loads(task_json)
# 将任务移到处理中队列(用于故障恢复)
self.redis.hset(
self.processing_key,
task['url'],
json.dumps(task)
)
return task
return None
def complete(self, url: str):
"""标记任务完成"""
self.redis.hdel(self.processing_key, url)
def retry(self, url: str, max_retries: int = 3):
"""重试失败的任务"""
task_json = self.redis.hget(self.processing_key, url)
if task_json:
task = json.loads(task_json)
retries = task.get('retries', 0)
if retries < max_retries:
task['retries'] = retries + 1
self.redis.hdel(self.processing_key, url)
self.push(task['url'], task.get('priority', 0), task.get('meta'))
else:
# 超过重试次数,移到失败队列
self.redis.hset('spider:failed', url, task_json)
self.redis.hdel(self.processing_key, url)
def size(self) -> int:
"""获取队列大小"""
return self.redis.zcard(self.queue_key)
# 使用示例
queue = RedisTaskQueue()
# 添加任务
queue.push('https://example.com/page1', priority=1)
queue.push('https://example.com/page2', priority=2, meta={'category': 'news'})
# 获取任务
task = queue.pop()
if task:
print(f"处理任务: {task['url']}")
# 处理完成后标记
queue.complete(task['url'])
2. URL 去重
在海量数据爬取中,避免重复爬取同一 URL 至关重要。
基于 Redis Set 的去重:
import redis
import hashlib
class URLFilter:
"""
URL 去重过滤器
使用 Redis Set 存储 URL 的指纹
"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.filter_key = 'spider:url_filter'
def _get_fingerprint(self, url: str) -> str:
"""计算 URL 指纹"""
# 使用 MD5 生成指纹(也可以用 SHA1)
return hashlib.md5(url.encode()).hexdigest()
def add(self, url: str) -> bool:
"""
添加 URL 到过滤器
返回:
True: URL 是新的
False: URL 已存在
"""
fingerprint = self._get_fingerprint(url)
# SADD 返回 1 表示新添加,0 表示已存在
return self.redis.sadd(self.filter_key, fingerprint) == 1
def exists(self, url: str) -> bool:
"""检查 URL 是否已存在"""
fingerprint = self._get_fingerprint(url)
return self.redis.sismember(self.filter_key, fingerprint)
def clear(self):
"""清空过滤器"""
self.redis.delete(self.filter_key)
# 使用示例
r = redis.Redis()
url_filter = URLFilter(r)
if url_filter.add('https://example.com/page1'):
print('新 URL,开始爬取')
else:
print('URL 已存在,跳过')
使用布隆过滤器优化内存:
from pybloom_live import ScalableBloomFilter
import pickle
class BloomURLFilter:
"""
基于 Bloom Filter 的 URL 去重
优点:
- 内存占用极低
- 查询速度极快
缺点:
- 有一定的误判率(可以接受的 URL 被误判为已存在)
"""
def __init__(self, error_rate=0.001, initial_capacity=100000):
self.bloom = ScalableBloomFilter(
initial_capacity=initial_capacity,
error_rate=error_rate
)
def add(self, url: str) -> bool:
"""添加 URL,返回是否为新的"""
if url in self.bloom:
return False
self.bloom.add(url)
return True
def exists(self, url: str) -> bool:
"""检查 URL 是否可能存在"""
return url in self.bloom
def save(self, filepath: str):
"""保存到文件"""
with open(filepath, 'wb') as f:
pickle.dump(self.bloom, f)
def load(self, filepath: str):
"""从文件加载"""
with open(filepath, 'rb') as f:
self.bloom = pickle.load(f)
# 使用示例
bloom_filter = BloomURLFilter(error_rate=0.001)
# 添加 URL
if bloom_filter.add('https://example.com/page1'):
print('新 URL')
# 保存到文件
bloom_filter.save('url_filter.bloom')
3. 代理池管理
import redis
import requests
import time
import random
from typing import Optional, List
class ProxyPool:
"""
分布式代理池
功能:
- 代理获取与存储
- 代理健康检查
- 代理自动切换
"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.proxy_key = 'spider:proxies'
self.invalid_key = 'spider:proxies:invalid'
self.check_url = 'http://httpbin.org/ip'
def add(self, proxy: str, source: str = 'manual'):
"""
添加代理
参数:
proxy: 代理地址,如 'http://ip:port'
source: 代理来源
"""
proxy_info = {
'proxy': proxy,
'source': source,
'added_at': time.time(),
'success_count': 0,
'fail_count': 0
}
self.redis.hset(self.proxy_key, proxy, json.dumps(proxy_info))
def get(self) -> Optional[str]:
"""
获取一个可用代理
策略:优先使用成功率高的代理
"""
all_proxies = self.redis.hgetall(self.proxy_key)
if not all_proxies:
return None
# 解析并排序(按成功率)
proxy_list = []
for proxy, info in all_proxies.items():
data = json.loads(info)
success = data.get('success_count', 0)
fail = data.get('fail_count', 0)
success_rate = success / (success + fail + 1)
proxy_list.append((proxy.decode(), success_rate))
# 按成功率降序排序
proxy_list.sort(key=lambda x: x[1], reverse=True)
# 从前 20% 的代理中随机选择(增加随机性)
top_count = max(1, len(proxy_list) // 5)
selected = random.choice(proxy_list[:top_count])
return selected[0]
def report_success(self, proxy: str):
"""报告代理使用成功"""
info = self.redis.hget(self.proxy_key, proxy)
if info:
data = json.loads(info)
data['success_count'] = data.get('success_count', 0) + 1
data['last_success'] = time.time()
self.redis.hset(self.proxy_key, proxy, json.dumps(data))
def report_fail(self, proxy: str):
"""报告代理使用失败"""
info = self.redis.hget(self.proxy_key, proxy)
if info:
data = json.loads(info)
data['fail_count'] = data.get('fail_count', 0) + 1
data['last_fail'] = time.time()
self.redis.hset(self.proxy_key, proxy, json.dumps(data))
# 如果失败次数过多,移到无效列表
if data['fail_count'] > 10:
self.redis.hdel(self.proxy_key, proxy)
self.redis.hset(self.invalid_key, proxy, json.dumps(data))
def check_health(self):
"""检查所有代理的健康状态"""
all_proxies = self.redis.hgetall(self.proxy_key)
for proxy, info in all_proxies.items():
proxy = proxy.decode()
try:
response = requests.get(
self.check_url,
proxies={'http': proxy, 'https': proxy},
timeout=10
)
if response.status_code == 200:
self.report_success(proxy)
else:
self.report_fail(proxy)
except Exception:
self.report_fail(proxy)
# 使用示例
r = redis.Redis()
proxy_pool = ProxyPool(r)
# 添加代理
proxy_pool.add('http://127.0.0.1:7890', source='local')
proxy_pool.add('http://user:[email protected]:8080', source='paid')
# 获取代理
proxy = proxy_pool.get()
if proxy:
try:
response = requests.get(url, proxies={'http': proxy, 'https': proxy})
proxy_pool.report_success(proxy)
except Exception:
proxy_pool.report_fail(proxy)
完整的分布式爬虫示例
import redis
import requests
import json
import time
import random
from bs4 import BeautifulSoup
from typing import Optional, Dict
class DistributedSpider:
"""
简单的分布式爬虫
特点:
- 基于 Redis 的任务队列
- 自动 URL 去重
- 代理自动切换
- 断点续爬
"""
def __init__(self, redis_host='localhost', redis_port=6379):
# 初始化 Redis 连接
self.redis = redis.Redis(host=redis_host, port=redis_port)
# 初始化组件
self.task_queue = RedisTaskQueue(redis_host, redis_port)
self.url_filter = URLFilter(self.redis)
self.proxy_pool = ProxyPool(self.redis)
# 请求配置
self.timeout = 30
self.max_retries = 3
def add_seed_urls(self, urls: list):
"""添加种子 URL"""
for url in urls:
if self.url_filter.add(url):
self.task_queue.push(url, priority=1)
def fetch(self, url: str) -> Optional[str]:
"""获取页面内容"""
proxy = self.proxy_pool.get()
for attempt in range(self.max_retries):
try:
proxies = None
if proxy:
proxies = {'http': proxy, 'https': proxy}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(
url,
headers=headers,
proxies=proxies,
timeout=self.timeout
)
if response.status_code == 200:
if proxy:
self.proxy_pool.report_success(proxy)
return response.text
elif response.status_code in [429, 503]:
# IP 被限制,切换代理
if proxy:
self.proxy_pool.report_fail(proxy)
proxy = self.proxy_pool.get()
time.sleep(2 ** attempt)
except Exception as e:
if proxy:
self.proxy_pool.report_fail(proxy)
proxy = self.proxy_pool.get()
time.sleep(2 ** attempt)
return None
def parse(self, html: str, url: str) -> Dict:
"""解析页面(子类重写)"""
soup = BeautifulSoup(html, 'lxml')
result = {
'url': url,
'title': soup.title.string if soup.title else '',
'data': {}
}
# 提取新的 URL
new_urls = []
for link in soup.find_all('a', href=True):
new_url = link['href']
if new_url.startswith('http'):
new_urls.append(new_url)
result['new_urls'] = new_urls
return result
def save(self, data: Dict):
"""保存数据(子类重写)"""
# 保存到 Redis 或数据库
self.redis.hset('spider:results', data['url'], json.dumps(data, ensure_ascii=False))
def run(self, idle_timeout: int = 60):
"""
运行爬虫
参数:
idle_timeout: 队列为空时的超时时间(秒)
"""
last_task_time = time.time()
while True:
# 获取任务
task = self.task_queue.pop()
if task is None:
# 队列为空,检查超时
if time.time() - last_task_time > idle_timeout:
print('队列为空超时,退出')
break
time.sleep(1)
continue
last_task_time = time.time()
url = task['url']
try:
# 获取页面
html = self.fetch(url)
if html:
# 解析页面
result = self.parse(html, url)
# 保存数据
self.save(result)
# 添加新 URL 到队列
for new_url in result.get('new_urls', []):
if self.url_filter.add(new_url):
self.task_queue.push(new_url)
# 标记任务完成
self.task_queue.complete(url)
print(f'完成: {url}')
except Exception as e:
print(f'处理失败: {url}, 错误: {e}')
self.task_queue.retry(url)
# 随机延迟
time.sleep(random.uniform(0.5, 2))
# 启动爬虫
if __name__ == '__main__':
spider = DistributedSpider()
# 添加种子 URL(只需执行一次)
# spider.add_seed_urls(['https://example.com/page1', 'https://example.com/page2'])
# 运行爬虫
spider.run()
分布式爬虫的最佳实践
1. 任务粒度控制
# 好:任务粒度适中,每个任务独立
task = {
'url': 'https://example.com/article/123',
'type': 'detail'
}
# 不好:任务粒度过大,失败代价高
task = {
'urls': ['url1', 'url2', ..., 'url100'], # 100 个 URL 作为一个任务
'type': 'batch'
}
2. 优雅退出
import signal
import sys
class GracefulSpider(DistributedSpider):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.running = True
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""处理终止信号"""
print('收到终止信号,正在优雅退出...')
self.running = False
def run(self):
while self.running:
task = self.task_queue.pop()
if task is None:
time.sleep(1)
continue
# 处理任务
self.process(task)
# 检查是否需要退出
if not self.running:
# 将未完成的任务放回队列
self.task_queue.push(task['url'])
break
3. 监控与告警
class MonitoredSpider(DistributedSpider):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.stats = {
'total': 0,
'success': 0,
'failed': 0,
'start_time': time.time()
}
def report_stats(self):
"""报告统计信息"""
elapsed = time.time() - self.stats['start_time']
rate = self.stats['success'] / elapsed if elapsed > 0 else 0
stats_info = {
'total_tasks': self.stats['total'],
'success': self.stats['success'],
'failed': self.stats['failed'],
'success_rate': self.stats['success'] / max(1, self.stats['total']),
'tasks_per_second': rate,
'queue_size': self.task_queue.size()
}
# 存储到 Redis
self.redis.hset('spider:stats', 'latest', json.dumps(stats_info))
return stats_info
小结
本章我们学习了:
- User-Agent 伪装 - 使用随机 User-Agent
- IP 代理池 - 使用代理隐藏真实 IP
- 请求频率控制 - 延迟和指数退避
- Cookie 处理 - Session 自动管理
- 验证码处理 - OCR识别、滑块、点选、短信验证码
- JavaScript 渲染 - Selenium 和 Playwright
- 行为检测规避 - 模拟人类行为
- 分布式爬虫 - 任务队列、URL去重、代理池、完整实现
练习
- 实现一个带代理轮换的爬虫
- 使用 Playwright 爬取一个动态渲染的网页
- 实现指数退避重试机制
- 基于 Redis 构建一个简单的分布式爬虫