跳到主要内容

Scrapy 框架详解

Scrapy 是 Python 最强大的专业爬虫框架,提供了完整的爬虫解决方案,包括请求调度、数据处理、存储等功能。相比使用 requests + BeautifulSoup 手动实现爬虫,Scrapy 提供了更完善的架构和更高的效率。

官方文档

本教程内容基于 Scrapy 官方文档

当前版本: v2.14.2 | Python 要求: 3.10+

为什么选择 Scrapy?

Scrapy vs 手动实现

特性Scrapyrequests + BeautifulSoup
并发请求内置并发,自动调度需要手动实现多线程/异步
请求去重自动去重需要手动实现
数据管道内置 Pipeline 机制需要手动处理
中间件完善的中间件系统需要手动实现
日志统计内置日志和统计需要手动实现
断点续爬内置支持需要手动实现
分布式scrapy-redis 支持需要自己实现

适用场景

  • 大规模爬取:需要爬取大量页面,对效率有要求
  • 长期维护:项目需要持续运行和维护
  • 复杂数据处理:需要多步骤处理和验证数据
  • 团队协作:结构清晰,便于多人协作

安装 Scrapy

pip install scrapy

# 验证安装
scrapy version

# 安装 IPython(可选,用于更好的 shell 体验)
pip install ipython

项目结构

创建项目

# 创建新项目
scrapy startproject myspider

# 项目结构
myspider/
├── myspider/ # Python 包
│ ├── __init__.py
│ ├── items.py # 定义数据结构
│ ├── middlewares.py # 中间件
│ ├── pipelines.py # 数据处理管道
│ ├── settings.py # 配置文件
│ └── spiders/ # 爬虫目录
│ └── __init__.py
└── scrapy.cfg # 项目配置文件

创建爬虫

# 进入项目目录
cd myspider

# 创建爬虫(基本模板)
scrapy genspider example example.com

# 创建爬虫(指定模板)
scrapy genspider -t crawl example example.com # CrawlSpider 模板

架构详解

理解 Scrapy 的架构对于开发高质量的爬虫至关重要。

核心组件

┌─────────────────────────────────────────────────────────────────┐
│ Scrapy Engine │
│ (控制数据流和触发事件) │
└───────────────────────────┬─────────────────────────────────────┘

┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Scheduler │ │ Downloader │ │ Spiders │
│ (调度器) │◄──│ (下载器) │◄──│ (爬虫) │
│ │ │ │ │ │
│ 管理请求队列 │ │ 发送HTTP请求 │ │ 解析响应 │
│ 去重过滤 │ │ 返回响应 │ │ 提取数据 │
└───────────────┘ └───────┬───────┘ └───────┬───────┘
▲ │ │
│ ┌───────▼───────┐ │
│ │ Middleware │ │
│ │ (下载器中间件) │ │
│ │ │ │
│ │ 请求/响应处理 │ │
│ │ 代理、重试等 │ │
│ └───────────────┘ │
│ │
│ ┌───────────────┐ ┌───────▼───────┐
│ │Spider Middleware│ │ Item Pipeline │
└───────────│ (爬虫中间件) │──│ (数据管道) │
│ │ │ │
│ 处理爬虫输入输出│ │ 数据清洗 │
│ 深度过滤等 │ │ 数据存储 │
└───────────────┘ └───────────────┘

数据流过程

Scrapy 中的数据流由引擎控制,流程如下:

  1. 引擎从 Spider 获取初始请求:Spider 的 start() 方法生成初始 Request
  2. 请求被调度:Request 进入 Scheduler 队列,进行去重
  3. 发送请求:Engine 从 Scheduler 获取下一个 Request,发送给 Downloader
  4. 下载响应:Downloader 完成下载,生成 Response 返回给 Engine
  5. 回调处理:Engine 将 Response 发送给 Spider 的回调函数处理
  6. 返回数据:Spider 返回 Items 或新的 Requests
  7. 数据处理:Items 进入 Pipeline 进行处理和存储
  8. 循环继续:新的 Requests 进入 Scheduler,重复步骤 2-7

组件职责

组件职责
Engine控制整个系统的数据流,触发事件
Scheduler接收 Engine 的请求,排队,去重
Downloader获取网页内容,返回响应
Spiders解析响应,提取数据,生成新请求
Item Pipeline处理 Spider 提取的数据
Downloader Middleware处理 Engine 和 Downloader 之间的请求/响应
Spider Middleware处理 Engine 和 Spider 之间的请求/响应/异常

Spider 类型

Scrapy 提供了多种 Spider 类型,适用于不同的爬取场景。

scrapy.Spider(基类)

所有 Spider 的基类,提供了最基本的爬虫功能:

import scrapy
from myproject.items import BookItem

class BasicSpider(scrapy.Spider):
name = 'basic' # 爬虫名称,必须唯一
allowed_domains = ['example.com'] # 允许的域名
start_urls = [
'https://example.com/books/page1',
'https://example.com/books/page2',
]

# 自定义设置(覆盖项目设置)
custom_settings = {
'DOWNLOAD_DELAY': 1,
'CONCURRENT_REQUESTS': 8,
}

def parse(self, response):
"""默认回调函数,处理 start_urls 的响应"""
# 提取书籍列表
for book in response.css('div.book-item'):
item = BookItem()
item['title'] = book.css('h3.title::text').get()
item['author'] = book.css('span.author::text').get()
item['price'] = book.css('span.price::text').get()
item['url'] = response.urljoin(book.css('a::attr(href)').get())

# yield item 会发送到 Pipeline
yield item

# 也可以 yield Request 继续爬取详情页
detail_url = book.css('a::attr(href)').get()
if detail_url:
yield response.follow(
detail_url,
callback=self.parse_detail,
meta={'item': item} # 传递数据给下一个回调
)

# 翻页
next_page = response.css('a.next::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)

def parse_detail(self, response):
"""解析详情页"""
item = response.meta['item'] # 获取传递的数据
item['description'] = response.css('div.description::text').get()
item['isbn'] = response.css('span.isbn::text').get()
yield item

使用 start() 方法(推荐)

从 Scrapy 2.13 开始,推荐使用 start() 方法替代 start_urls

import scrapy

class ModernSpider(scrapy.Spider):
name = 'modern'
allowed_domains = ['example.com']

async def start(self):
"""生成初始请求(异步方法)"""
# 方式1:直接 yield Request
yield scrapy.Request('https://example.com/books/1', callback=self.parse)
yield scrapy.Request('https://example.com/books/2', callback=self.parse)

# 方式2:动态生成
for i in range(1, 11):
yield scrapy.Request(f'https://example.com/books?page={i}')

# 方式3:从文件读取
# with open('urls.txt') as f:
# for url in f:
# yield scrapy.Request(url.strip())

# 方式4:yield Item(Scrapy 2.12+ 支持)
# 可以直接在 start() 中 yield Item
# yield {'type': 'initial_data', 'source': 'start'}

重要说明

从 Scrapy 2.13 开始:

  • asyncio reactor 默认启用:无需手动配置,项目自动使用异步 I/O
  • start_requests() 可以 yield items:不仅限于 Request,还可以直接 yield Item
  • start() 是异步方法:可以在其中使用 await 调用异步函数,例如从数据库或消息队列获取起始 URL

Spider 参数

Spider 可以接收命令行参数,用于灵活控制爬取行为:

import scrapy

class ParamSpider(scrapy.Spider):
name = 'param'

def __init__(self, category=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.category = category

async def start(self):
url = f'https://example.com/{self.category}'
yield scrapy.Request(url)

# 命令行运行
# scrapy crawl param -a category=books
# scrapy crawl param -a category=electronics

CrawlSpider(爬取型 Spider)

CrawlSpider 是最常用的 Spider 类型,用于爬取整站链接。它通过定义规则(Rules)自动跟踪链接:

from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
from myproject.items import ArticleItem

class ArticleSpider(CrawlSpider):
name = 'articles'
allowed_domains = ['example.com']
start_urls = ['https://example.com/']

# 定义爬取规则
rules = (
# 规则1:匹配所有包含 /article/ 的链接,使用 parse_article 处理
Rule(
LinkExtractor(allow=r'/article/\d+\.html'),
callback='parse_article',
follow=True # 是否继续跟踪该页面中的链接
),

# 规则2:匹配分类页面,只跟踪链接不解析
Rule(
LinkExtractor(allow=r'/category/\w+\.html'),
follow=True
),

# 规则3:排除特定链接
Rule(
LinkExtractor(
allow=r'/page/\d+',
deny=r'/page/login' # 排除登录页面
),
follow=True
),
)

def parse_article(self, response):
"""解析文章详情页"""
item = ArticleItem()
item['title'] = response.css('h1.title::text').get()
item['author'] = response.css('span.author::text').get()
item['content'] = response.css('div.content').get()
item['url'] = response.url

# 获取链接文本(来自 LinkExtractor)
item['link_text'] = response.meta.get('link_text', '')

yield item

LinkExtractor 详解

LinkExtractor 是 CrawlSpider 的核心组件,用于从页面中提取链接:

from scrapy.linkextractors import LinkExtractor

# 创建链接提取器
extractor = LinkExtractor(
# 允许的 URL 正则表达式(绝对 URL)
allow=(r'/article/\d+', r'/news/\d+'),

# 排除的 URL 正则表达式
deny=(r'/login', r'/register'),

# 允许的域名
allow_domains=('example.com', 'blog.example.com'),

# 排除的域名
deny_domains=('ads.example.com',),

# 排除的文件扩展名
deny_extensions=('.pdf', '.zip', '.jpg', '.png'),

# 限制在特定 XPath 区域内提取链接
restrict_xpaths=('//div[@class="content"]', '//ul[@class="list"]'),

# 限制在特定 CSS 区域内提取链接
restrict_css=('.content', '.list'),

# 链接文本必须匹配的正则
restrict_text=(r'下一页', r'Next'),

# 要提取的标签,默认为 ('a', 'area')
tags=('a', 'area'),

# 要提取的属性,默认为 ('href',)
attrs=('href',),

# 是否规范化 URL
canonicalize=False,

# 是否去重
unique=True,

# 处理提取值的函数
process_value=lambda x: x.strip() if x else None,
)

# 使用示例
from scrapy.spiders import CrawlSpider, Rule

class MySpider(CrawlSpider):
name = 'my'
rules = (
# 只提取 content 区域内的文章链接
Rule(
LinkExtractor(
allow=r'/article/\d+',
restrict_xpaths='//div[@class="content"]'
),
callback='parse_article'
),

# 提取分页链接
Rule(
LinkExtractor(
allow=r'/page/\d+',
restrict_text=r'第\s*\d+\s*页'
),
follow=True
),
)

XMLFeedSpider

用于解析 XML 数据源(如 RSS、Atom):

from scrapy.spiders import XMLFeedSpider
from myproject.items import FeedItem

class RSSSpider(XMLFeedSpider):
name = 'rss'
allowed_domains = ['example.com']
start_urls = ['https://example.com/feed.xml']

# 迭代器类型:'iternodes'(默认,最快)、'html'、'xml'
iterator = 'iternodes'

# 要迭代的标签名
itertag = 'item'

# 命名空间(如果 XML 有命名空间)
namespaces = [
('dc', 'http://purl.org/dc/elements/1.1/'),
]

def parse_node(self, response, node):
"""解析每个 item 节点"""
item = FeedItem()
item['title'] = node.xpath('title/text()').get()
item['link'] = node.xpath('link/text()').get()
item['description'] = node.xpath('description/text()').get()
item['pub_date'] = node.xpath('pubDate/text()').get()

# 使用命名空间
item['creator'] = node.xpath('dc:creator/text()').get()

return item

CSVFeedSpider

用于解析 CSV 数据源:

from scrapy.spiders import CSVFeedSpider
from myproject.items import ProductItem

class CSVSpider(CSVFeedSpider):
name = 'csv'
allowed_domains = ['example.com']
start_urls = ['https://example.com/products.csv']

# CSV 分隔符
delimiter = ','

# 引号字符
quotechar = '"'

# 列名(如果 CSV 没有标题行)
headers = ['id', 'name', 'price', 'category']

def parse_row(self, response, row):
"""解析每一行"""
item = ProductItem()
item['id'] = row['id']
item['name'] = row['name']
item['price'] = float(row['price'])
item['category'] = row['category']
return item

SitemapSpider

通过网站的 sitemap.xml 发现链接:

from scrapy.spiders import SitemapSpider

class SitemapExampleSpider(SitemapSpider):
name = 'sitemap'
sitemap_urls = ['https://example.com/sitemap.xml']

# 根据 URL 路径选择回调函数
sitemap_rules = [
(r'/product/', 'parse_product'),
(r'/category/', 'parse_category'),
]

# 只跟踪包含特定路径的 sitemap
sitemap_follow = [r'/sitemap_products']

def parse_product(self, response):
yield {
'title': response.css('h1::text').get(),
'price': response.css('.price::text').get(),
}

def parse_category(self, response):
yield {
'name': response.css('h1::text').get(),
}

选择器

Scrapy 使用 parsel 库提供选择器功能,支持 CSS 和 XPath 两种语法。

CSS 选择器

from scrapy.spiders import Spider

class CSSSpider(Spider):
name = 'css'

def parse(self, response):
# 提取文本
title = response.css('h1.title::text').get() # 第一个
titles = response.css('h1.title::text').getall() # 所有

# 提取属性
href = response.css('a.link::attr(href)').get()
src = response.css('img::attr(src)').getall()

# 嵌套选择
for article in response.css('div.article'):
title = article.css('h2::text').get()
author = article.css('.author::text').get()

# 伪类选择
first_item = response.css('li:first-child::text').get()
nth_item = response.css('li:nth-child(2)::text').get()

# 属性选择器
links = response.css('a[href^="http"]::attr(href)').getall() # 以 http 开头
pdf_links = response.css('a[href$=".pdf"]::attr(href)').getall() # 以 .pdf 结尾

# 包含文本
python_links = response.css('a:contains("Python")::attr(href)').getall()

XPath 选择器

from scrapy.spiders import Spider

class XPathSpider(Spider):
name = 'xpath'

def parse(self, response):
# 基本选择
titles = response.xpath('//h1/text()').getall()

# 属性选择
links = response.xpath('//a/@href').getall()

# 条件选择
div = response.xpath('//div[@class="content"]')

# 文本匹配
login_link = response.xpath('//a[text()="登录"]/@href').get()
python_links = response.xpath('//a[contains(text(), "Python")]/@href').getall()

# 位置选择
first_li = response.xpath('//li[1]/text()').get()
last_li = response.xpath('//li[last()]/text()').get()

# 轴选择
following = response.xpath('//div[@id="main"]/following-sibling::div')
preceding = response.xpath('//div[@id="main"]/preceding-sibling::div')

# 使用正则
prices = response.xpath('//span[@class="price"]/text()').re(r'\d+\.?\d*')

选择器性能优化

def parse(self, response):
# 推荐:使用 .get() 和 .getall()
title = response.css('h1::text').get()

# 避免:使用 .extract() 和 .extract_first()(已弃用)
# title = response.css('h1::text').extract_first()

# 推荐:链式选择
for article in response.css('div.article'):
# 直接在子选择器中继续选择
title = article.css('h2::text').get()

# 性能:尽量使用更具体的选择器
# 慢
items = response.xpath('//div[@class="list"]//div[@class="item"]')
# 快
items = response.css('.list > .item')

响应类型

Scrapy 提供了多种响应类型,根据响应内容的 Content-Type 自动选择:

HtmlResponse(默认)

用于 HTML 响应,支持 CSS 和 XPath 选择器:

def parse(self, response):
# response 是 HtmlResponse 类型
title = response.css('h1::text').get()
links = response.xpath('//a/@href').getall()

TextResponse

用于纯文本响应:

def parse(self, response):
# 纯文本内容
text = response.text

JsonResponse(Scrapy 2.12+)

用于 JSON API 响应,提供便捷的 JSON 数据访问:

import scrapy

class ApiSpider(scrapy.Spider):
name = 'api'

def parse(self, response):
# 当 Content-Type 是 application/json 时
# response 会自动转换为 JsonResponse

# 方式1:使用 jmespath 查询(推荐)
# 需要安装:pip install jmespath
names = response.jmespath('items[*].name').getall()
first_item = response.jmespath('items[0]').get()

# 方式2:直接访问 JSON 数据
data = response.json()
for item in data.get('items', []):
yield {
'name': item.get('name'),
'price': item.get('price'),
}

# 方式3:json() 方法返回解析后的字典
# 与 response.json() 等价
json_data = response.json()

JsonResponse 的优势

方法说明
response.json()返回解析后的 Python 字典/列表
response.jmespath(query)使用 JMESPath 查询 JSON 数据
response.jmespath(query).get()获取单个查询结果
response.jmespath(query).getall()获取所有匹配结果

JMESPath 查询示例

# 假设 JSON 数据结构:
# {
# "items": [
# {"name": "Python", "price": 59.9},
# {"name": "Java", "price": 49.9}
# ],
# "total": 100,
# "page": 1
# }

# 获取所有名称
names = response.jmespath('items[*].name').getall()
# ['Python', 'Java']

# 获取第一个元素
first = response.jmespath('items[0]').get()
# {'name': 'Python', 'price': 59.9}

# 获取价格大于 50 的商品
expensive = response.jmespath('items[?price > `50`]').getall()

# 获取总数
total = response.jmespath('total').get()
# 100

XmlResponse

用于 XML 响应:

def parse(self, response):
# XML 响应
items = response.xpath('//item')
for item in items:
yield {
'title': item.xpath('title/text()').get(),
'link': item.xpath('link/text()').get(),
}

Item 和 Item Loader

定义 Item

Item 是定义爬取数据的容器:

# items.py
import scrapy

class BookItem(scrapy.Item):
"""书籍信息"""
# 基本字段
title = scrapy.Field()
author = scrapy.Field()
price = scrapy.Field()
isbn = scrapy.Field()

# URL 相关
url = scrapy.Field()
cover_image = scrapy.Field()

# 分类信息
category = scrapy.Field()
tags = scrapy.Field()

# 时间信息
publish_date = scrapy.Field()
crawl_time = scrapy.Field()

class ArticleItem(scrapy.Item):
"""文章信息"""
title = scrapy.Field()
content = scrapy.Field()
author = scrapy.Field()
publish_time = scrapy.Field()
url = scrapy.Field()

使用 Item

from myproject.items import BookItem
import datetime

class BookSpider(scrapy.Spider):
name = 'books'

def parse(self, response):
for book in response.css('div.book-item'):
# 创建 Item 实例
item = BookItem()

# 填充数据
item['title'] = book.css('h3::text').get()
item['author'] = book.css('.author::text').get()
item['price'] = book.css('.price::text').get()
item['url'] = response.url
item['crawl_time'] = datetime.datetime.now().isoformat()

yield item

# 或者使用字典形式
yield {
'title': response.css('h1::text').get(),
'content': response.css('div.content::text').get(),
}

Item Loader

Item Loader 提供了一种更结构化的方式来填充 Item,支持输入处理和输出处理:

# item_loaders.py
from itemloaders import ItemLoader
from itemloaders.processors import MapCompose, TakeFirst, Join, Identity
from myproject.items import BookItem
import datetime
import re

class BookLoader(ItemLoader):
"""书籍 Item Loader"""

# 默认输出处理器:取第一个非空值
default_output_processor = TakeFirst()

# 特殊字段处理器
title_in = MapCompose(str.strip) # 输入时去除空白
title_out = TakeFirst()

price_in = MapCompose(
lambda x: x.strip(),
lambda x: re.search(r'[\d.]+', x).group() if re.search(r'[\d.]+', x) else None,
lambda x: float(x) if x else None
)

tags_in = MapCompose(str.strip)
tags_out = Identity() # 保持列表形式

content_out = Join('\n') # 将多个文本片段合并

# 在 Spider 中使用
class BookSpider(scrapy.Spider):
name = 'books'

def parse(self, response):
for book in response.css('div.book-item'):
loader = BookLoader(item=BookItem(), selector=book)

# 使用 CSS 选择器添加值
loader.add_css('title', 'h3.title::text')
loader.add_css('author', '.author::text')
loader.add_css('price', '.price::text')
loader.add_css('tags', '.tag::text')

# 使用 XPath 添加值
loader.add_xpath('isbn', '//span[@class="isbn"]/text()')

# 直接添加值
loader.add_value('url', response.url)
loader.add_value('crawl_time', datetime.datetime.now().isoformat())

yield loader.load_item()

Item Pipeline

Pipeline 用于处理 Spider 提取的数据,包括清洗、验证、去重、存储等。

Pipeline 方法

class MyPipeline:
def open_spider(self, spider):
"""Spider 启动时调用,用于初始化资源"""
pass

def close_spider(self, spider):
"""Spider 关闭时调用,用于清理资源"""
pass

def process_item(self, item, spider):
"""处理每个 Item,必须实现"""
return item # 返回 item 继续传递给下一个 Pipeline
# 或 raise DropItem('原因') # 丢弃 item

数据清洗 Pipeline

# pipelines.py
import re
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class CleaningPipeline:
"""数据清洗"""

def process_item(self, item, spider):
adapter = ItemAdapter(item)

# 清理字符串空白
for field in adapter.field_names():
value = adapter.get(field)
if isinstance(value, str):
adapter[field] = value.strip()

return item

class PricePipeline:
"""价格处理"""

vat_factor = 1.15 # 增值税率

def process_item(self, item, spider):
adapter = ItemAdapter(item)

# 提取价格数字
price = adapter.get('price')
if price:
match = re.search(r'[\d.]+', str(price))
if match:
adapter['price'] = float(match.group())
else:
adapter['price'] = None

# 如果价格不含税,添加税费
if adapter.get('price') and adapter.get('price_excludes_vat'):
adapter['price'] = adapter['price'] * self.vat_factor

return item

class ValidationPipeline:
"""数据验证"""

required_fields = ['title', 'url']

def process_item(self, item, spider):
adapter = ItemAdapter(item)

# 检查必填字段
for field in self.required_fields:
if not adapter.get(field):
raise DropItem(f"缺少必填字段: {field}")

return item

class DuplicatesPipeline:
"""去重"""

def __init__(self):
self.ids_seen = set()

def process_item(self, item, spider):
adapter = ItemAdapter(item)

# 根据 id 去重
item_id = adapter.get('id') or adapter.get('url')
if item_id in self.ids_seen:
raise DropItem(f"重复项: {item_id}")

self.ids_seen.add(item_id)
return item

存储 Pipeline

JSON 文件存储

import json
from itemadapter import ItemAdapter

class JsonPipeline:
"""保存到 JSON Lines 文件"""

def open_spider(self, spider):
self.file = open('items.jl', 'w', encoding='utf-8')

def close_spider(self, spider):
self.file.close()

def process_item(self, item, spider):
line = json.dumps(ItemAdapter(item).asdict(), ensure_ascii=False) + '\n'
self.file.write(line)
return item

SQLite 存储

import sqlite3

class SQLitePipeline:
"""保存到 SQLite"""

def __init__(self, sqlite_file):
self.sqlite_file = sqlite_file

@classmethod
def from_crawler(cls, crawler):
"""从设置中获取参数"""
return cls(
sqlite_file=crawler.settings.get('SQLITE_FILE', 'items.db')
)

def open_spider(self, spider):
self.conn = sqlite3.connect(self.sqlite_file)
self.cursor = self.conn.cursor()

# 创建表
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
author TEXT,
price REAL,
url TEXT UNIQUE,
crawl_time TEXT
)
''')
self.conn.commit()

def close_spider(self, spider):
self.conn.close()

def process_item(self, item, spider):
self.cursor.execute('''
INSERT OR IGNORE INTO books (title, author, price, url, crawl_time)
VALUES (?, ?, ?, ?, ?)
''', (
item.get('title'),
item.get('author'),
item.get('price'),
item.get('url'),
item.get('crawl_time')
))
self.conn.commit()
return item

MongoDB 存储

import pymongo
from itemadapter import ItemAdapter

class MongoDBPipeline:
"""保存到 MongoDB"""

collection_name = 'items'

def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db

@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://localhost:27017'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy')
)

def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]

def close_spider(self, spider):
self.client.close()

def process_item(self, item, spider):
self.db[self.collection_name].insert_one(ItemAdapter(item).asdict())
return item

MySQL 存储

import pymysql

class MySQLPipeline:
"""保存到 MySQL"""

def __init__(self, host, port, user, password, database):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database

@classmethod
def from_crawler(cls, crawler):
return cls(
host=crawler.settings.get('MYSQL_HOST', 'localhost'),
port=crawler.settings.get('MYSQL_PORT', 3306),
user=crawler.settings.get('MYSQL_USER', 'root'),
password=crawler.settings.get('MYSQL_PASSWORD', ''),
database=crawler.settings.get('MYSQL_DATABASE', 'scrapy')
)

def open_spider(self, spider):
self.conn = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4'
)
self.cursor = self.conn.cursor()

def close_spider(self, spider):
self.conn.close()

def process_item(self, item, spider):
columns = ', '.join(item.keys())
placeholders = ', '.join(['%s'] * len(item))
sql = f"INSERT INTO books ({columns}) VALUES ({placeholders})"
self.cursor.execute(sql, list(item.values()))
self.conn.commit()
return item

启用 Pipeline

settings.py 中配置 Pipeline:

# settings.py

# 启用 Pipeline,数字表示优先级(0-1000),数字越小越先执行
ITEM_PIPELINES = {
'myproject.pipelines.CleaningPipeline': 100, # 先清洗
'myproject.pipelines.ValidationPipeline': 200, # 再验证
'myproject.pipelines.DuplicatesPipeline': 300, # 去重
'myproject.pipelines.SQLitePipeline': 400, # 最后存储
# 'myproject.pipelines.MongoDBPipeline': 500, # 可以配置多个存储
}

# 数据库配置
SQLITE_FILE = 'data/items.db'
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'scrapy'

中间件

中间件是 Scrapy 扩展功能的核心机制,分为下载器中间件和爬虫中间件。

下载器中间件

下载器中间件处理 Engine 和 Downloader 之间的请求和响应。

中间件方法

class MyDownloaderMiddleware:
def process_request(self, request, spider):
"""
处理请求,在请求发送到 Downloader 之前调用

返回值:
- None:继续处理请求
- Response:直接返回响应,不再下载
- Request:重新调度请求
- raise IgnoreRequest:忽略请求
"""
# 设置请求头
request.headers['User-Agent'] = 'Mozilla/5.0 ...'
return None

def process_response(self, request, response, spider):
"""
处理响应,在响应返回给 Engine 之前调用

返回值:
- Response:继续处理响应
- Request:重新调度请求
- raise IgnoreRequest:忽略请求
"""
# 检查响应状态
if response.status == 403:
spider.logger.warning(f'访问被拒绝: {request.url}')
return response

def process_exception(self, request, exception, spider):
"""
处理异常,在下载或处理过程中发生异常时调用

返回值:
- None:继续处理异常
- Response:返回响应
- Request:重新调度请求
"""
spider.logger.error(f'请求异常: {request.url}, 错误: {exception}')
return None

代理中间件

# middlewares.py
import random

class ProxyMiddleware:
"""代理中间件"""

def __init__(self, proxy_list):
self.proxy_list = proxy_list

@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_list=crawler.settings.get('PROXY_LIST', [])
)

def process_request(self, request, spider):
if self.proxy_list:
proxy = random.choice(self.proxy_list)
request.meta['proxy'] = proxy
spider.logger.debug(f'使用代理: {proxy}')

# settings.py
PROXY_LIST = [
'http://127.0.0.1:7890',
'http://user:pass@proxy1:8080',
'http://user:pass@proxy2:8080',
]

DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.ProxyMiddleware': 100,
}

User-Agent 中间件

import random

class UserAgentMiddleware:
"""随机 User-Agent 中间件"""

def __init__(self):
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/120.0.0.0',
]

def process_request(self, request, spider):
request.headers['User-Agent'] = random.choice(self.user_agents)

重试中间件

from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.utils.response import response_status_message

class CustomRetryMiddleware(RetryMiddleware):
"""自定义重试中间件"""

def __init__(self, settings):
super().__init__(settings)
self.max_retry_times = settings.getint('RETRY_TIMES')
self.retry_http_codes = set(int(x) for x in settings.getlist('RETRY_HTTP_CODES'))

def process_response(self, request, response, spider):
if request.meta.get('dont_retry', False):
return response

# 自定义重试逻辑
if response.status in self.retry_http_codes:
reason = response_status_message(response.status)
return self._retry(request, reason, spider) or response

return response

def process_exception(self, request, exception, spider):
if isinstance(exception, self.EXCEPTIONS_TO_RETRY):
return self._retry(request, exception, spider)

爬虫中间件

爬虫中间件处理 Engine 和 Spider 之间的交互:

class MySpiderMiddleware:
def process_spider_input(self, response, spider):
"""
响应进入 Spider 之前调用
返回 None 或抛出异常
"""
pass

def process_spider_output(self, response, result, spider):
"""
Spider 处理完响应后调用
必须返回可迭代对象(Request 或 Item)
"""
for item in result:
yield item

def process_spider_exception(self, response, exception, spider):
"""
Spider 或中间件抛出异常时调用
返回 None 或可迭代对象
"""
spider.logger.error(f'Spider 异常: {exception}')
return None

def process_start_requests(self, start_requests, spider):
"""
处理 Spider 的初始请求
必须返回可迭代对象
"""
for request in start_requests:
yield request

启用中间件

# settings.py

DOWNLOADER_MIDDLEWARES = {
# 自定义中间件
'myproject.middlewares.ProxyMiddleware': 100,
'myproject.middlewares.UserAgentMiddleware': 200,

# 禁用内置中间件(设为 None)
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
}

SPIDER_MIDDLEWARES = {
'myproject.middlewares.MySpiderMiddleware': 100,
}

配置设置

常用设置

# settings.py

# ==================== 基本设置 ====================

# 项目名称
BOT_NAME = 'myproject'

# Spider 模块
SPIDER_MODULES = ['myproject.spiders']
NEWSPIDER_MODULE = 'myproject.spiders'

# User-Agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'

# 是否遵循 robots.txt
ROBOTSTXT_OBEY = True

# ==================== 并发设置 ====================

# 并发请求数(全局)
CONCURRENT_REQUESTS = 16

# 并发请求数(每个域名)
CONCURRENT_REQUESTS_PER_DOMAIN = 8

# 并发请求数(每个 IP)
CONCURRENT_REQUESTS_PER_IP = 0 # 0 表示不限制

# 下载延迟(秒)
DOWNLOAD_DELAY = 1

# 随机延迟(0.5 ~ 1.5 * DOWNLOAD_DELAY)
RANDOMIZE_DOWNLOAD_DELAY = True

# ==================== 超时设置 ====================

# 下载超时(秒)
DOWNLOAD_TIMEOUT = 30

# ==================== 重试设置 ====================

RETRY_ENABLED = True
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 522, 524, 408, 429]

# ==================== 缓存设置 ====================

# HTTP 缓存(开发时有用)
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 0 # 0 表示永不过期
HTTPCACHE_DIR = 'httpcache'
HTTPCACHE_IGNORE_HTTP_CODES = []

# ==================== 日志设置 ====================

LOG_LEVEL = 'INFO' # DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_FILE = 'logs/scrapy.log'
LOG_FORMAT = '%(asctime)s [%(name)s] %(levelname)s: %(message)s'

# ==================== 其他设置 ====================

# 默认请求头
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}

# Cookie 设置
COOKIES_ENABLED = True
COOKIES_DEBUG = False

# 自动限速扩展
AUTOTHROTTLE_ENABLED = False
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 60
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0

Feed 导出

# 导出格式
FEED_FORMAT = 'json' # json, jsonlines, csv, xml, pickle, marshal

# 导出文件
FEED_URI = 'items.json'

# 导出编码
FEED_EXPORT_ENCODING = 'utf-8'

# 导出字段(空表示全部)
FEED_EXPORT_FIELDS = ['title', 'author', 'price']

# 导出缩进
FEED_STORE_EMPTY = False
FEED_EXPORT_INDENT = 2

# 命令行动态指定
# scrapy crawl myspider -o items.json
# scrapy crawl myspider -o items.csv
# scrapy crawl myspider -o items.jl

信号

信号用于在特定事件发生时执行自定义操作:

from scrapy import signals
from scrapy.signalmanager import dispatcher

class MySpider(scrapy.Spider):
name = 'my'

@classmethod
def from_crawler(cls, crawler):
spider = super().from_crawler(crawler)

# 注册信号处理器
crawler.signals.connect(spider.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(spider.spider_error, signal=signals.spider_error)
crawler.signals.connect(spider.item_scraped, signal=signals.item_scraped)

return spider

def spider_opened(self, spider):
"""Spider 启动时调用"""
self.logger.info(f'Spider 启动: {spider.name}')

def spider_closed(self, spider, reason):
"""Spider 关闭时调用"""
self.logger.info(f'Spider 关闭: {spider.name}, 原因: {reason}')

def spider_error(self, failure, response, spider):
"""Spider 发生错误时调用"""
self.logger.error(f'Spider 错误: {failure}')

def item_scraped(self, item, response, spider):
"""Item 被抓取时调用"""
self.logger.info(f'抓取 Item: {item.get("title")}')


# 常用信号
# signals.spider_opened - Spider 启动
# signals.spider_closed - Spider 关闭
# signals.spider_error - Spider 错误
# signals.item_scraped - Item 被抓取
# signals.item_dropped - Item 被丢弃
# signals.response_received - 响应被接收
# signals.request_scheduled - 请求被调度

命令行工具

# 创建项目
scrapy startproject myproject

# 创建爬虫
scrapy genspider myspider example.com
scrapy genspider -t crawl myspider example.com # 使用 CrawlSpider 模板

# 运行爬虫
scrapy crawl myspider
scrapy crawl myspider -o items.json # 导出结果
scrapy crawl myspider -a category=books # 传递参数
scrapy crawl myspider -s LOG_LEVEL=DEBUG # 覆盖设置

# 列出所有爬虫
scrapy list

# 交互式 Shell(测试选择器)
scrapy shell https://example.com
scrapy shell 'https://example.com' -s USER_AGENT='Mozilla/5.0...'

# 检查爬虫
scrapy check

# 查看设置
scrapy settings --get BOT_NAME
scrapy settings --get DOWNLOAD_DELAY

# 运行爬虫并保存状态(断点续爬)
scrapy crawl myspider -s JOBDIR=crawls/myspider-1

# 版本信息
scrapy version
scrapy version -v

完整项目示例

下面是一个完整的 Scrapy 项目示例,爬取书籍信息:

项目结构

bookscraper/
├── scrapy.cfg
└── bookscraper/
├── __init__.py
├── items.py
├── middlewares.py
├── pipelines.py
├── settings.py
└── spiders/
├── __init__.py
└── books.py

items.py

import scrapy

class BookItem(scrapy.Item):
title = scrapy.Field()
author = scrapy.Field()
price = scrapy.Field()
rating = scrapy.Field()
category = scrapy.Field()
description = scrapy.Field()
url = scrapy.Field()
crawl_time = scrapy.Field()

spiders/books.py

import scrapy
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
from bookscraper.items import BookItem
import datetime

class BooksSpider(CrawlSpider):
name = 'books'
allowed_domains = ['books.toscrape.com']
start_urls = ['https://books.toscrape.com/']

rules = (
# 分类页面:跟踪链接
Rule(
LinkExtractor(restrict_xpaths='//ul[@class="nav-list"]//a'),
follow=True
),
# 书籍详情页:解析内容
Rule(
LinkExtractor(restrict_xpaths='//h3/a'),
callback='parse_book'
),
# 分页:跟踪链接
Rule(
LinkExtractor(restrict_xpaths='//li[@class="next"]/a'),
follow=True
),
)

def parse_book(self, response):
"""解析书籍详情页"""
item = BookItem()

item['title'] = response.xpath('//h1/text()').get()
item['price'] = response.xpath('//p[@class="price_color"]/text()').get()
item['rating'] = response.xpath('//p[contains(@class, "star-rating")]/@class').get().split()[-1]
item['description'] = response.xpath('//div[@id="product_description"]/following-sibling::p/text()').get()
item['category'] = response.xpath('//ul[@class="breadcrumb"]/li[3]/a/text()').get()
item['url'] = response.url
item['crawl_time'] = datetime.datetime.now().isoformat()

yield item

pipelines.py

import sqlite3
import datetime
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class DuplicatesPipeline:
"""去重"""

def __init__(self):
self.urls_seen = set()

def process_item(self, item, spider):
adapter = ItemAdapter(item)
url = adapter.get('url')

if url in self.urls_seen:
raise DropItem(f'重复项: {url}')

self.urls_seen.add(url)
return item

class CleaningPipeline:
"""数据清洗"""

def process_item(self, item, spider):
adapter = ItemAdapter(item)

# 清理价格
price = adapter.get('price', '')
if price:
adapter['price'] = price.replace('£', '').strip()

# 清理描述
desc = adapter.get('description')
if desc:
adapter['description'] = desc.strip()

return item

class SQLitePipeline:
"""SQLite 存储"""

def __init__(self, sqlite_file):
self.sqlite_file = sqlite_file

@classmethod
def from_crawler(cls, crawler):
return cls(sqlite_file=crawler.settings.get('SQLITE_FILE', 'books.db'))

def open_spider(self, spider):
self.conn = sqlite3.connect(self.sqlite_file)
self.cursor = self.conn.cursor()
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
author TEXT,
price REAL,
rating TEXT,
category TEXT,
description TEXT,
url TEXT UNIQUE,
crawl_time TEXT
)
''')
self.conn.commit()

def close_spider(self, spider):
self.conn.close()

def process_item(self, item, spider):
self.cursor.execute('''
INSERT OR IGNORE INTO books
(title, price, rating, category, description, url, crawl_time)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
item.get('title'),
item.get('price'),
item.get('rating'),
item.get('category'),
item.get('description'),
item.get('url'),
item.get('crawl_time')
))
self.conn.commit()
return item

settings.py

BOT_NAME = 'bookscraper'
SPIDER_MODULES = ['bookscraper.spiders']
NEWSPIDER_MODULE = 'bookscraper.spiders'

ROBOTSTXT_OBEY = True

CONCURRENT_REQUESTS = 8
DOWNLOAD_DELAY = 1

ITEM_PIPELINES = {
'bookscraper.pipelines.DuplicatesPipeline': 100,
'bookscraper.pipelines.CleaningPipeline': 200,
'bookscraper.pipelines.SQLitePipeline': 300,
}

SQLITE_FILE = 'data/books.db'

LOG_LEVEL = 'INFO'

运行

# 创建数据目录
mkdir data

# 运行爬虫
scrapy crawl books

# 导出为 JSON
scrapy crawl books -o books.json

# 传递参数
scrapy crawl books -s DOWNLOAD_DELAY=2

最佳实践

1. 合理设置并发和延迟

# 对于小型网站
CONCURRENT_REQUESTS = 4
DOWNLOAD_DELAY = 2

# 对于大型网站
CONCURRENT_REQUESTS = 16
DOWNLOAD_DELAY = 0.5

# 使用自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_TARGET_CONCURRENCY = 2.0

2. 使用 HTTP 缓存加速开发

# 开发时开启缓存
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 0 # 永不过期

# 生产环境关闭
HTTPCACHE_ENABLED = False

3. 合理处理错误

class MySpider(scrapy.Spider):
def parse(self, response):
try:
# 可能出错的操作
data = response.css('.data::text').get()
if not data:
self.logger.warning(f'未找到数据: {response.url}')
return
yield {'data': data}
except Exception as e:
self.logger.error(f'解析错误: {response.url}, {e}')

4. 断点续爬

# 启用断点续爬
scrapy crawl myspider -s JOBDIR=crawls/myspider-1

# 中断后继续运行相同命令即可恢复

5. 使用 Item Loader 提高代码可维护性

# 不推荐
item['title'] = response.css('h1::text').get().strip()
item['price'] = float(response.css('.price::text').get().replace('$', ''))

# 推荐
loader = ItemLoader(item=BookItem(), response=response)
loader.add_css('title', 'h1::text')
loader.add_css('price', '.price::text')
yield loader.load_item()

小结

本章我们学习了:

  1. Scrapy 架构 - 核心组件和数据流
  2. Spider 类型 - Spider、CrawlSpider、XMLFeedSpider 等
  3. LinkExtractor - 链接提取器的详细用法
  4. 选择器 - CSS 和 XPath 选择器
  5. Item 和 Item Loader - 数据定义和处理
  6. Pipeline - 数据清洗、验证、存储
  7. 中间件 - 下载器中间件和爬虫中间件
  8. 配置设置 - 并发、延迟、缓存等
  9. 信号 - 事件驱动编程
  10. 最佳实践 - 生产环境建议

Scrapy 是一个功能完善的爬虫框架,适合大规模、长期维护的爬虫项目。

练习

  1. 使用 CrawlSpider 爬取一个新闻网站的所有文章
  2. 实现一个完整的 Pipeline,包括清洗、验证、去重和存储
  3. 编写一个自定义下载器中间件实现随机代理
  4. 使用信号统计爬虫的运行数据(请求数、Item 数量等)