Python 爬虫速查表
本文是 Python 爬虫常用代码片段的快速参考。
requests 库
发送请求
import requests
# GET 请求
response = requests.get(url)
response = requests.get(url, params={'key': 'value'})
response = requests.get(url, headers={'User-Agent': '...'})
# POST 请求
response = requests.post(url, data={'key': 'value'})
response = requests.post(url, json={'key': 'value'})
# 带 Cookie
response = requests.get(url, cookies={'session': 'xxx'})
# 带代理
response = requests.get(url, proxies={'http': 'http://ip:port'})
# 带超时
response = requests.get(url, timeout=10)
响应处理
response.status_code # 状态码
response.text # 文本内容
response.json() # JSON 解析
response.content # 字节内容
response.headers # 响应头
response.cookies # Cookie
response.raise_for_status() # 检查错误
BeautifulSoup
创建对象
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'lxml')
soup = BeautifulSoup(file, 'lxml')
soup = BeautifulSoup(response.text, 'lxml')
元素查找
soup.find('div') # 第一个 div
soup.find_all('a') # 所有 a 标签
soup.select('div.content a') # CSS 选择器
# 常用选择器
soup.select('.class') # class 选择器
soup.select('#id') # id 选择器
soup.select('a[href]') # 属性选择器
soup.select('div > a') # 子选择器
内容提取
element.get_text(strip=True) # 获取文本
element['href'] # 获取属性
element.get('href', 'default') # 获取属性(带默认值)
element.attrs # 所有属性
XPath
基本语法
//div # 所有 div 元素
//div[@class="content"] # class 为 content 的 div
//a[@href] # 带有 href 属性的 a 元素
//div/p # div 下的直接子元素 p
//div//p # div 下的所有后代 p
谓语
//li[1] # 第一个 li
//li[last()] # 最后一个 li
//li[position() < 3] # 前两个 li
//div[@id="main"] # id 为 main 的 div
//a[contains(@href, "example")] # href 包含 example
//a[starts-with(@href, "http")] # href 以 http 开头
//a[text()="登录"] # 文本等于"登录"
//a[contains(text(), "登录")] # 文本包含"登录"
轴
//div/parent::* # 父元素
//div/ancestor::* # 所有祖先
//div/descendant::* # 所有后代
//li/following-sibling::* # 之后的所有兄弟
//li/preceding-sibling::* # 之前的所有兄弟
Python 使用
from lxml import etree
tree = etree.HTML(html)
elements = tree.xpath('//div[@class="content"]')
texts = tree.xpath('//p/text()')
attrs = tree.xpath('//a/@href')
正则表达式
常用模式
import re
re.findall(r'\d+', text) # 提取数字
re.search(r'\d+', text).group() # 提取第一个数字
re.sub(r'\d+', 'NUM', text) # 替换数字
re.match(r'^http', url) # 匹配开头
re.split(r'\s+', text) # 按空格分割
常用正则
| 模式 | 含义 |
|---|---|
\d | 数字 |
\w | 单词字符 |
\s | 空白字符 |
. | 任意字符 |
* | 0或多个 |
+ | 1或多个 |
? | 0或1个 |
^ | 开头 |
$ | 结尾 |
[] | 字符集 |
反爬策略
User-Agent
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
请求延迟
import time
import random
time.sleep(random.uniform(1, 3)) # 随机延迟
代理
proxies = {
'http': 'http://ip:port',
'https': 'http://ip:port'
}
Session
session = requests.Session()
session.headers.update({'User-Agent': '...'})
session.get(url)
数据存储
保存为 JSON
import json
# 写入
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 读取
with open('data.json', 'r', encoding='utf-8') as f:
data = json.load(f)
保存为 CSV
import csv
with open('data.csv', 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=['title', 'price'])
writer.writeheader()
writer.writerows(items)
保存到 SQLite
import sqlite3
conn = sqlite3.connect('data.db')
cursor = conn.cursor()
cursor.execute('INSERT INTO table (col1, col2) VALUES (?, ?)', (val1, val2))
conn.commit()
conn.close()
httpx 库
当前版本: v0.28.x | Python 要求: 3.9+
同步请求
import httpx
# GET 请求
response = httpx.get(url)
response = httpx.get(url, params={'key': 'value'})
response = httpx.get(url, headers={'User-Agent': '...'})
# POST 请求
response = httpx.post(url, data={'key': 'value'})
response = httpx.post(url, json={'key': 'value'})
# 超时设置(默认5秒)
response = httpx.get(url, timeout=10.0)
response = httpx.get(url, timeout=None) # 禁用超时
异步请求
import httpx
import asyncio
async def fetch(url):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
asyncio.run(fetch(url))
Client 会话
# 同步 Client
with httpx.Client(base_url='https://api.example.com') as client:
response = client.get('/users') # 相对路径
# 异步 Client
async with httpx.AsyncClient(http2=True) as client:
responses = await asyncio.gather(*[
client.get(url) for url in urls
])
响应处理
response.status_code # 状态码
response.text # 文本内容
response.json() # JSON 解析
response.content # 字节内容
response.headers # 响应头
response.cookies # Cookie
response.url # 最终 URL
response.http_version # HTTP 版本
response.is_success # 2xx
response.raise_for_status() # 检查错误
流式下载
# 同步流式
with httpx.stream('GET', url) as response:
for chunk in response.iter_bytes(8192):
process(chunk)
# 异步流式
async with client.stream('GET', url) as response:
async for chunk in response.aiter_bytes(8192):
process(chunk)
代理设置
# HTTP 代理
proxies = {'http://': 'http://127.0.0.1:7890',
'https://': 'http://127.0.0.1:7890'}
# SOCKS 代理(需要 httpx[socks])
proxies = {'all://': 'socks5://127.0.0.1:1080'}
response = httpx.get(url, proxy='http://127.0.0.1:7890')
异步爬虫 (aiohttp)
当前版本: v3.13.5 | Python 要求: 3.9+
基本用法
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
html = asyncio.run(fetch(url))
并发请求
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch_one(session, url):
async with session.get(url) as response:
return await response.text()
并发控制
semaphore = asyncio.Semaphore(10)
async def fetch_with_limit(session, url):
async with semaphore:
async with session.get(url) as response:
return await response.text()
超时设置
from aiohttp import ClientTimeout
timeout = ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as response:
return await response.text()
常见任务
提取所有链接
for a in soup.select('a'):
print(a.get('href'))
提取表格数据
rows = soup.select('table tr')
data = []
for row in rows[1:]: # 跳过表头
cols = row.select('td')
data.append([col.get_text(strip=True) for col in cols])
分页爬取
for page in range(1, 11):
url = f'https://example.com?page={page}'
response = requests.get(url)
# 处理响应
下载图片
response = requests.get(img_url, stream=True)
with open('image.jpg', 'wb') as f:
for chunk in response.iter_content(8192):
f.write(chunk)
Scrapy 命令
# 创建项目
scrapy startproject myproject
# 创建爬虫
scrapy genspider myspider example.com
# 运行爬虫
scrapy crawl myspider
# 保存数据
scrapy crawl myspider -o items.json
# Shell 交互
scrapy shell https://example.com
Scrapy 新特性(2.12+)
JsonResponse
# 当响应 Content-Type 是 application/json 时自动使用
def parse(self, response):
# 直接获取 JSON 数据
data = response.json()
# 使用 jmespath 查询(需安装 jmespath)
names = response.jmespath('items[*].name').getall()
first = response.jmespath('items[0]').get()
start() 方法
class MySpider(scrapy.Spider):
name = 'my'
async def start(self):
# Scrapy 2.13+ 推荐使用异步 start 方法
for i in range(10):
yield scrapy.Request(f'https://example.com/page/{i}')
# 也可以直接 yield Item(2.12+)
yield {'type': 'initial', 'source': 'start'}
HTTP 状态码
| 状态码 | 含义 |
|---|---|
| 200 | 成功 |
| 301 | 永久重定向 |
| 302 | 临时重定向 |
| 400 | 请求错误 |
| 401 | 需要认证 |
| 403 | 禁止访问 |
| 404 | 未找到 |
| 429 | 请求过多 |
| 500 | 服务器错误 |
| 502 | 网关错误 |
| 503 | 服务不可用 |
Selenium
初始化
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
options = Options()
options.add_argument('--headless') # 无头模式
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=options
)
元素定位
from selenium.webdriver.common.by import By
driver.find_element(By.ID, 'username')
driver.find_element(By.NAME, 'password')
driver.find_element(By.CLASS_NAME, 'btn')
driver.find_element(By.TAG_NAME, 'input')
driver.find_element(By.CSS_SELECTOR, 'div.content a')
driver.find_element(By.XPATH, '//div[@class="content"]//a')
driver.find_element(By.LINK_TEXT, '登录')
driver.find_element(By.PARTIAL_LINK_TEXT, '登')
等待机制
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# 隐式等待
driver.implicitly_wait(10)
# 显式等待
wait = WebDriverWait(driver, 10)
element = wait.until(EC.presence_of_element_located((By.ID, 'content')))
element = wait.until(EC.visibility_of_element_located((By.ID, 'content')))
element = wait.until(EC.element_to_be_clickable((By.ID, 'submit')))
元素操作
element.click() # 点击
element.send_keys('text') # 输入
element.clear() # 清空
element.text # 获取文本
element.get_attribute('href') # 获取属性
页面操作
driver.get(url) # 访问页面
driver.back() # 后退
driver.forward() # 前进
driver.refresh() # 刷新
driver.title # 页面标题
driver.current_url # 当前 URL
driver.page_source # 页面源码
driver.quit() # 关闭浏览器
多窗口和框架
# 多窗口
main_window = driver.current_window_handle
driver.switch_to.window(window_handle)
# iframe
driver.switch_to.frame('frame_id')
driver.switch_to.default_content()
Playwright
初始化
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto('https://example.com')
browser.close()
元素定位
# CSS 选择器
page.locator('div.content')
page.locator('#main')
page.locator('.item')
# 文本选择器
page.locator('text=登录')
page.locator('text=/登录/i') # 正则
# XPath
page.locator('xpath=//div[@class="content"]')
# 组合选择器
page.locator('div.list >> .item')
元素操作
page.click('button') # 点击
page.fill('input', 'text') # 填写
page.type('input', 'text', delay=100) # 逐字输入
page.select_option('select', 'value') # 下拉选择
page.check('input[type="checkbox"]') # 勾选
page.set_input_files('input', 'file.pdf') # 上传文件
等待机制
# 自动等待(推荐)
page.click('button') # 自动等待可点击
# 显式等待
page.wait_for_selector('div.content')
page.wait_for_selector('div.loading', state='hidden')
page.wait_for_load_state('networkidle')
page.wait_for_url('**/dashboard')
网络拦截
# 拦截请求
def handle_route(route):
if route.request.resource_type == 'image':
route.abort()
else:
route.continue_()
page.route('**', handle_route)
# Mock 响应
page.route('**/api/data', lambda route: route.fulfill(
status=200,
body='{"status": "ok"}'
))
截图和 PDF
page.screenshot(path='screenshot.png', full_page=True)
page.pdf(path='document.pdf', format='A4')
Playwright 新特性
WebSocket 路由(1.48+)
# 拦截 WebSocket 连接
def handle_ws(ws):
ws.on('message', lambda msg: print(f'收到: {msg}'))
ws.send('{"status": "ok"}')
page.route_web_socket('**/ws', handle_ws)
Clock API(1.45+)
import datetime
# 控制时间
page.clock.install(time=datetime.datetime(2024, 1, 1, 0, 0, 0))
page.clock.fast_forward('05:00') # 快进 5 分钟
page.clock.pause_at(datetime.datetime(2024, 1, 1, 12, 0, 0))
常用请求头
| 请求头 | 说明 | 示例 |
|---|---|---|
| User-Agent | 客户端标识 | Mozilla/5.0 ... |
| Accept | 接受的内容类型 | text/html,application/json |
| Accept-Language | 接受的语言 | zh-CN,zh;q=0.9 |
| Accept-Encoding | 接受的编码 | gzip, deflate, br |
| Referer | 来源页面 | https://google.com/ |
| Cookie | Cookie 信息 | session=abc123 |
| Authorization | 认证信息 | Bearer token123 |
| Content-Type | 内容类型 | application/json |
最佳实践
- 设置 User-Agent:模拟浏览器
- 添加延迟:避免请求过快
- 处理异常:网络可能不稳定
- 遵守规则:检查 robots.txt
- 保存进度:避免重复爬取
- 使用 Session:维护 Cookie
- 处理编码:注意中文编码
- 选择合适工具:静态页面用 requests,动态页面用 Playwright