反反爬策略
网站通常会采取各种反爬措施来阻止自动化爬取。本章将介绍常见的反爬机制及应对策略。
法律提示
请只在合法合规的前提下使用这些技术,遵守网站的 robots.txt 和服务条款。
常见反爬机制
User-Agent 伪装
基础 User-Agent 轮换
import requests
import random
# 常见 User-Agent 列表
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gocko) Version/17.2 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
]
def get_random_ua():
return random.choice(USER_AGENTS)
# 使用随机 User-Agent
headers = {
'User-Agent': get_random_ua()
}
response = requests.get(url, headers=headers)
使用 fake-useragent 库
pip install fake-useragent
from fake_useragent import UserAgent
import requests
ua = UserAgent()
headers = {'User-Agent': ua.random}
response = requests.get(url, headers=headers)
IP 代理池
使用代理
import requests
# 使用单个代理
proxies = {
'http': 'http://127.0.0.1:7890',
'https': 'http://127.0.0.1:7890'
}
response = requests.get(url, proxies=proxies)
# 使用代理池
proxy_list = [
'http://user:pass@ip1:port1',
'http://user:pass@ip2:port2',
]
def get_proxy():
return random.choice(proxy_list)
response = requests.get(url, proxies={'http': get_proxy()})
免费代理获取
import requests
from bs4 import BeautifulSoup
def get_free_proxies():
"""获取免费代理"""
url = 'https://www.free-proxy-list.net/'
response = requests.get(url)
soup = BeautifulSoup(response.text, 'lxml')
proxies = []
for row in soup.select('table.table tbody tr'):
cols = row.find_all('td')
if cols[4].text == 'yes': # 支持 HTTPS
ip = cols[0].text
port = cols[1].text
proxies.append(f'http://{ip}:{port}')
return proxies
专业代理服务
常用的付费代理服务:
- 阿布云:提供 HTTP/HTTPS/SOCKS5 代理
- 讯代理:高质量代理池
- 青果代理:企业级代理服务
# 使用阿布云代理示例
proxy_host = 'http-dyn.abuyun.com'
proxy_port = '9020'
proxy_user = 'YOUR_PROXY_USER'
proxy_pass = 'YOUR_PROXY_PASS'
proxies = {
'http': f'http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}',
'https': f'http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}',
}
response = requests.get(url, proxies=proxies)
请求频率控制
基础延迟
import time
import random
# 固定延迟
time.sleep(2) # 每次请求后等待2秒
# 随机延迟(更自然)
delay = random.uniform(1, 3) # 1-3秒随机延迟
time.sleep(delay)
智能延迟
import time
import random
from collections import defaultdict
class SmartDelayer:
def __init__(self):
self.delays = defaultdict(list)
def wait(self, domain, min_delay=1, max_delay=5):
"""智能等待"""
# 检查上次请求时间
last_request = self.delays[domain]
if last_request:
elapsed = time.time() - last_request[0]
if elapsed < min_delay:
sleep_time = random.uniform(min_delay, max_delay)
time.sleep(sleep_time)
# 记录本次请求时间
self.delays[domain] = [time.time()]
def random_wait(self, min_delay=1, max_delay=5):
"""随机等待"""
time.sleep(random.uniform(min_delay, max_delay))
# 使用
delayer = SmartDelayer()
for url in urls:
delayer.wait('example.com')
response = requests.get(url)
指数退避
import time
import random
def request_with_backoff(url, max_retries=5):
"""指数退避重试"""
for attempt in range(max_retries):
try:
response = requests.get(url)
if response.status_code == 200:
return response
except Exception as e:
print(f"请求失败: {e}")
# 指数退避
delay = 2 ** attempt + random.uniform(0, 1)
print(f"等待 {delay:.2f} 秒后重试...")
time.sleep(delay)
return None
Cookie 处理
自动处理 Cookie
import requests
# 使用 Session 自动维护 Cookie
session = requests.Session()
# 设置 Cookie
session.cookies.set('session_id', 'abc123')
# 访问网站(会自动处理 Cookie)
response = session.get('https://example.com')
# 查看 Cookie
print(session.cookies.get_dict())
保存和加载 Cookie
import requests
import json
# 保存 Cookie 到文件
def save_cookies(session, filename):
with open(filename, 'w') as f:
json.dump(dict(session.cookies), f)
# 从文件加载 Cookie
def load_cookies(session, filename):
with open(filename, 'r') as f:
cookies = json.load(f)
session.cookies.update(cookies)
验证码处理
识别验证码
对于简单的图形验证码,可以使用第三方服务:
# 使用打码平台 API(以超级鹰为例)
def recognize_captcha(image_path):
api_key = 'YOUR_API_KEY'
soft_id = '123456'
with open(image_path, 'rb') as f:
image_data = f.read()
# 调用API识别
# 具体API调用根据服务商文档
result = call_captcha_api(api_key, soft_id, image_data)
return result
处理滑块验证码
# 使用 selenium 处理滑块验证码
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
def solve_slider(driver, slider_element, track_distance):
"""移动滑块"""
ActionChains(driver).click_and_hold(slider_element).perform()
# 模拟人类滑动轨迹
tracks = get_slider_tracks(track_distance)
for track in tracks:
ActionChains(driver).move_by_offset(track, 0).perform()
ActionChains(driver).release().perform()
def get_slider_tracks(distance):
"""生成滑动轨迹"""
tracks = []
current = 0
mid = distance * 0.7
while current < distance:
if current < mid:
step = random.randint(2, 5)
else:
step = random.randint(1, 3)
current += step
tracks.append(step)
return tracks
JavaScript 渲染
使用 Selenium
pip install selenium webdriver-manager
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
# 配置 Chrome 选项
chrome_options = Options()
chrome_options.add_argument('--headless') # 无头模式
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('user-agent=Mozilla/5.0 ...')
# 创建驱动
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=chrome_options
)
# 访问页面
driver.get('https://example.com')
# 等待页面加载
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, 'content'))
)
# 获取页面源码
html = driver.page_source
# 使用 BeautifulSoup 解析
soup = BeautifulSoup(html, 'lxml')
# 关闭驱动
driver.quit()
使用 Playwright
pip install playwright
playwright install chromium
import asyncio
from playwright.async_api import async_playwright
async def scrape_with_playwright():
async with async_playwright() as p:
# 启动浏览器
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# 设置 User-Agent
await page.set_extra_http_headers({
'User-Agent': 'Mozilla/5.0 ...'
})
# 访问页面
await page.goto('https://example.com')
# 等待加载
await page.wait_for_selector('.content')
# 获取页面内容
html = await page.content()
await browser.close()
return html
# 运行
html = asyncio.run(scrape_with_playwright())
寻找 API 接口
很多动态加载的数据来自 API,可以通过浏览器开发者工具找到:
import requests
# 通过开发者工具找到的 API 地址
api_url = 'https://example.com/api/data?page=1&size=20'
# 直接请求 API(通常更高效)
headers = {
'User-Agent': 'Mozilla/5.0 ...',
'Referer': 'https://example.com/'
}
response = requests.get(api_url, headers=headers)
data = response.json()
行为检测规避
模拟人类行为
import random
import time
def human_like_delay():
"""模拟人类阅读页面的延迟"""
base_delay = random.uniform(0.5, 2.0)
variance = random.uniform(0, 0.5)
time.sleep(base_delay + variance)
def scroll_page(driver):
"""模拟滚动页面"""
total_height = driver.execute_script("return document.body.scrollHeight")
for i in range(0, total_height, random.randint(100, 300)):
driver.execute_script(f"window.scrollTo(0, {i});")
time.sleep(random.uniform(0.1, 0.3))
# 随机回滚一点
driver.execute_script(f"window.scrollTo(0, {random.randint(0, 100)});")
鼠标移动模拟
from selenium.webdriver.common.action_chains import ActionChains
def human_mouse_movements(driver, element):
"""模拟人类鼠标移动"""
actions = ActionChains(driver)
# 从元素外开始
actions.move_to_element_with_offset(element, -50, -50)
# 移动到元素附近(不直接点击)
for _ in range(random.randint(3, 7)):
x_offset = random.randint(-20, 20)
y_offset = random.randint(-20, 20)
actions.move_by_offset(x_offset, y_offset)
actions.pause(random.uniform(0.1, 0.3))
actions.perform()
综合案例
import requests
import time
import random
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class StealthSpider:
def __init__(self):
self.session = requests.Session()
self.setup_session()
def setup_session(self):
"""配置 Session"""
# User-Agent
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ...',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 ...',
]
self.session.headers.update({
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
# 重试配置
retry = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
def request(self, url, **kwargs):
"""发送请求(带防封策略)"""
# 随机延迟
time.sleep(random.uniform(1, 3))
# 默认配置
kwargs.setdefault('timeout', 10)
kwargs.setdefault('allow_redirects', True)
# 使用代理(可选)
if random.random() < 0.3: # 30%概率使用代理
kwargs['proxies'] = self.get_proxy()
response = self.session.get(url, **kwargs)
# 检查是否被封
if response.status_code == 429:
print('请求过于频繁,等待更长时间...')
time.sleep(random.uniform(30, 60))
response = self.session.get(url, **kwargs)
return response
def get_proxy(self):
"""获取代理"""
# 从代理池获取
proxy = random.choice(self.proxy_list)
return {
'http': f'http://{proxy}',
'https': f'http://{proxy}'
}
# 使用示例
spider = StealthSpider()
response = spider.request('https://example.com')
小结
本章我们学习了:
- User-Agent 伪装 - 使用随机 User-Agent
- IP 代理池 - 使用代理隐藏真实 IP
- 请求频率控制 - 延迟和指数退避
- Cookie 处理 - Session 自动管理
- 验证码处理 - 识别和滑块验证
- JavaScript 渲染 - Selenium 和 Playwright
- 行为检测规避 - 模拟人类行为
练习
- 实现一个带代理轮换的爬虫
- 使用 Playwright 爬取一个动态渲染的网页
- 实现指数退避重试机制