跳到主要内容

反反爬策略

网站通常会采取各种反爬措施来阻止自动化爬取。本章将介绍常见的反爬机制及应对策略。

法律提示

请只在合法合规的前提下使用这些技术,遵守网站的 robots.txt 和服务条款。

常见反爬机制

User-Agent 伪装

基础 User-Agent 轮换

import requests
import random

# 常见 User-Agent 列表
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gocko) Version/17.2 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
]

def get_random_ua():
return random.choice(USER_AGENTS)

# 使用随机 User-Agent
headers = {
'User-Agent': get_random_ua()
}
response = requests.get(url, headers=headers)

使用 fake-useragent 库

pip install fake-useragent
from fake_useragent import UserAgent
import requests

ua = UserAgent()
headers = {'User-Agent': ua.random}
response = requests.get(url, headers=headers)

IP 代理池

使用代理

import requests

# 使用单个代理
proxies = {
'http': 'http://127.0.0.1:7890',
'https': 'http://127.0.0.1:7890'
}
response = requests.get(url, proxies=proxies)

# 使用代理池
proxy_list = [
'http://user:pass@ip1:port1',
'http://user:pass@ip2:port2',
]

def get_proxy():
return random.choice(proxy_list)

response = requests.get(url, proxies={'http': get_proxy()})

免费代理获取

import requests
from bs4 import BeautifulSoup

def get_free_proxies():
"""获取免费代理"""
url = 'https://www.free-proxy-list.net/'
response = requests.get(url)
soup = BeautifulSoup(response.text, 'lxml')

proxies = []
for row in soup.select('table.table tbody tr'):
cols = row.find_all('td')
if cols[4].text == 'yes': # 支持 HTTPS
ip = cols[0].text
port = cols[1].text
proxies.append(f'http://{ip}:{port}')

return proxies

专业代理服务

常用的付费代理服务:

  • 阿布云:提供 HTTP/HTTPS/SOCKS5 代理
  • 讯代理:高质量代理池
  • 青果代理:企业级代理服务
# 使用阿布云代理示例
proxy_host = 'http-dyn.abuyun.com'
proxy_port = '9020'
proxy_user = 'YOUR_PROXY_USER'
proxy_pass = 'YOUR_PROXY_PASS'

proxies = {
'http': f'http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}',
'https': f'http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}',
}

response = requests.get(url, proxies=proxies)

请求频率控制

基础延迟

import time
import random

# 固定延迟
time.sleep(2) # 每次请求后等待2秒

# 随机延迟(更自然)
delay = random.uniform(1, 3) # 1-3秒随机延迟
time.sleep(delay)

智能延迟

import time
import random
from collections import defaultdict

class SmartDelayer:
def __init__(self):
self.delays = defaultdict(list)

def wait(self, domain, min_delay=1, max_delay=5):
"""智能等待"""
# 检查上次请求时间
last_request = self.delays[domain]
if last_request:
elapsed = time.time() - last_request[0]
if elapsed < min_delay:
sleep_time = random.uniform(min_delay, max_delay)
time.sleep(sleep_time)

# 记录本次请求时间
self.delays[domain] = [time.time()]

def random_wait(self, min_delay=1, max_delay=5):
"""随机等待"""
time.sleep(random.uniform(min_delay, max_delay))

# 使用
delayer = SmartDelayer()

for url in urls:
delayer.wait('example.com')
response = requests.get(url)

指数退避

import time
import random

def request_with_backoff(url, max_retries=5):
"""指数退避重试"""
for attempt in range(max_retries):
try:
response = requests.get(url)
if response.status_code == 200:
return response
except Exception as e:
print(f"请求失败: {e}")

# 指数退避
delay = 2 ** attempt + random.uniform(0, 1)
print(f"等待 {delay:.2f} 秒后重试...")
time.sleep(delay)

return None
import requests

# 使用 Session 自动维护 Cookie
session = requests.Session()

# 设置 Cookie
session.cookies.set('session_id', 'abc123')

# 访问网站(会自动处理 Cookie)
response = session.get('https://example.com')

# 查看 Cookie
print(session.cookies.get_dict())
import requests
import json

# 保存 Cookie 到文件
def save_cookies(session, filename):
with open(filename, 'w') as f:
json.dump(dict(session.cookies), f)

# 从文件加载 Cookie
def load_cookies(session, filename):
with open(filename, 'r') as f:
cookies = json.load(f)
session.cookies.update(cookies)

验证码处理

识别验证码

对于简单的图形验证码,可以使用第三方服务:

# 使用打码平台 API(以超级鹰为例)
def recognize_captcha(image_path):
api_key = 'YOUR_API_KEY'
soft_id = '123456'

with open(image_path, 'rb') as f:
image_data = f.read()

# 调用API识别
# 具体API调用根据服务商文档
result = call_captcha_api(api_key, soft_id, image_data)
return result

处理滑块验证码

# 使用 selenium 处理滑块验证码
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains

def solve_slider(driver, slider_element, track_distance):
"""移动滑块"""
ActionChains(driver).click_and_hold(slider_element).perform()

# 模拟人类滑动轨迹
tracks = get_slider_tracks(track_distance)
for track in tracks:
ActionChains(driver).move_by_offset(track, 0).perform()

ActionChains(driver).release().perform()

def get_slider_tracks(distance):
"""生成滑动轨迹"""
tracks = []
current = 0
mid = distance * 0.7
while current < distance:
if current < mid:
step = random.randint(2, 5)
else:
step = random.randint(1, 3)
current += step
tracks.append(step)
return tracks

JavaScript 渲染

使用 Selenium

pip install selenium webdriver-manager
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

# 配置 Chrome 选项
chrome_options = Options()
chrome_options.add_argument('--headless') # 无头模式
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('user-agent=Mozilla/5.0 ...')

# 创建驱动
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=chrome_options
)

# 访问页面
driver.get('https://example.com')

# 等待页面加载
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, 'content'))
)

# 获取页面源码
html = driver.page_source

# 使用 BeautifulSoup 解析
soup = BeautifulSoup(html, 'lxml')

# 关闭驱动
driver.quit()

使用 Playwright

pip install playwright
playwright install chromium
import asyncio
from playwright.async_api import async_playwright

async def scrape_with_playwright():
async with async_playwright() as p:
# 启动浏览器
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()

# 设置 User-Agent
await page.set_extra_http_headers({
'User-Agent': 'Mozilla/5.0 ...'
})

# 访问页面
await page.goto('https://example.com')

# 等待加载
await page.wait_for_selector('.content')

# 获取页面内容
html = await page.content()

await browser.close()
return html

# 运行
html = asyncio.run(scrape_with_playwright())

寻找 API 接口

很多动态加载的数据来自 API,可以通过浏览器开发者工具找到:

import requests

# 通过开发者工具找到的 API 地址
api_url = 'https://example.com/api/data?page=1&size=20'

# 直接请求 API(通常更高效)
headers = {
'User-Agent': 'Mozilla/5.0 ...',
'Referer': 'https://example.com/'
}

response = requests.get(api_url, headers=headers)
data = response.json()

行为检测规避

模拟人类行为

import random
import time

def human_like_delay():
"""模拟人类阅读页面的延迟"""
base_delay = random.uniform(0.5, 2.0)
variance = random.uniform(0, 0.5)
time.sleep(base_delay + variance)

def scroll_page(driver):
"""模拟滚动页面"""
total_height = driver.execute_script("return document.body.scrollHeight")

for i in range(0, total_height, random.randint(100, 300)):
driver.execute_script(f"window.scrollTo(0, {i});")
time.sleep(random.uniform(0.1, 0.3))

# 随机回滚一点
driver.execute_script(f"window.scrollTo(0, {random.randint(0, 100)});")

鼠标移动模拟

from selenium.webdriver.common.action_chains import ActionChains

def human_mouse_movements(driver, element):
"""模拟人类鼠标移动"""
actions = ActionChains(driver)

# 从元素外开始
actions.move_to_element_with_offset(element, -50, -50)

# 移动到元素附近(不直接点击)
for _ in range(random.randint(3, 7)):
x_offset = random.randint(-20, 20)
y_offset = random.randint(-20, 20)
actions.move_by_offset(x_offset, y_offset)
actions.pause(random.uniform(0.1, 0.3))

actions.perform()

综合案例

import requests
import time
import random
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class StealthSpider:
def __init__(self):
self.session = requests.Session()
self.setup_session()

def setup_session(self):
"""配置 Session"""
# User-Agent
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ...',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 ...',
]

self.session.headers.update({
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})

# 重试配置
retry = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)

def request(self, url, **kwargs):
"""发送请求(带防封策略)"""
# 随机延迟
time.sleep(random.uniform(1, 3))

# 默认配置
kwargs.setdefault('timeout', 10)
kwargs.setdefault('allow_redirects', True)

# 使用代理(可选)
if random.random() < 0.3: # 30%概率使用代理
kwargs['proxies'] = self.get_proxy()

response = self.session.get(url, **kwargs)

# 检查是否被封
if response.status_code == 429:
print('请求过于频繁,等待更长时间...')
time.sleep(random.uniform(30, 60))
response = self.session.get(url, **kwargs)

return response

def get_proxy(self):
"""获取代理"""
# 从代理池获取
proxy = random.choice(self.proxy_list)
return {
'http': f'http://{proxy}',
'https': f'http://{proxy}'
}

# 使用示例
spider = StealthSpider()
response = spider.request('https://example.com')

小结

本章我们学习了:

  1. User-Agent 伪装 - 使用随机 User-Agent
  2. IP 代理池 - 使用代理隐藏真实 IP
  3. 请求频率控制 - 延迟和指数退避
  4. Cookie 处理 - Session 自动管理
  5. 验证码处理 - 识别和滑块验证
  6. JavaScript 渲染 - Selenium 和 Playwright
  7. 行为检测规避 - 模拟人类行为

练习

  1. 实现一个带代理轮换的爬虫
  2. 使用 Playwright 爬取一个动态渲染的网页
  3. 实现指数退避重试机制