Scrapy 框架详解
Scrapy 是 Python 最强大的专业爬虫框架,提供了完整的爬虫解决方案,包括请求调度、数据处理、存储等功能。相比使用 requests + BeautifulSoup 手动实现爬虫,Scrapy 提供了更完善的架构和更高的效率。
本教程内容基于 Scrapy 官方文档。
当前版本: v2.14.2 | Python 要求: 3.10+
为什么选择 Scrapy?
Scrapy vs 手动实现
| 特性 | Scrapy | requests + BeautifulSoup |
|---|---|---|
| 并发请求 | 内置并发,自动调度 | 需要手动实现多线程/异步 |
| 请求去重 | 自动去重 | 需要手动实现 |
| 数据管道 | 内置 Pipeline 机制 | 需要手动处理 |
| 中间件 | 完善的中间件系统 | 需要手动实现 |
| 日志统计 | 内置日志和统计 | 需要手动实现 |
| 断点续爬 | 内置支持 | 需要手动实现 |
| 分布式 | scrapy-redis 支持 | 需要自己实现 |
适用场景
- 大规模爬取:需要爬取大量页面,对效率有要求
- 长期维护:项目需要持续运行和维护
- 复杂数据处理:需要多步骤处理和验证数据
- 团队协作:结构清晰,便于多人协作
安装 Scrapy
pip install scrapy
# 验证安装
scrapy version
# 安装 IPython(可选,用于更好的 shell 体验)
pip install ipython
项目结构
创建项目
# 创建新项目
scrapy startproject myspider
# 项目结构
myspider/
├── myspider/ # Python 包
│ ├── __init__.py
│ ├── items.py # 定义数据结构
│ ├── middlewares.py # 中间件
│ ├── pipelines.py # 数据处理管道
│ ├── settings.py # 配置文件
│ └── spiders/ # 爬虫目录
│ └── __init__.py
└── scrapy.cfg # 项目配置文件
创建爬虫
# 进入项目目录
cd myspider
# 创建爬虫(基本模板)
scrapy genspider example example.com
# 创建爬虫(指定模板)
scrapy genspider -t crawl example example.com # CrawlSpider 模板
架构详解
理解 Scrapy 的架构对于开发高质量的爬虫至关重要。
核心组件
┌─────────────────────────────────────────────────────────────────┐
│ Scrapy Engine │
│ (控制数据流和触发事件) │
└───────────────────────────┬─────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Scheduler │ │ Downloader │ │ Spiders │
│ (调度器) │◄──│ (下载器) │◄──│ (爬虫) │
│ │ │ │ │ │
│ 管理请求队列 │ │ 发送HTTP请求 │ │ 解析响应 │
│ 去重过滤 │ │ 返回响应 │ │ 提取数据 │
└───────────────┘ └───────┬───────┘ └───────┬───────┘
▲ │ │
│ ┌───────▼───────┐ │
│ │ Middleware │ │
│ │ (下载器中间件) │ │
│ │ │ │
│ │ 请求/响应处理 │ │
│ │ 代理、重试等 │ │
│ └───────────────┘ │
│ │
│ ┌───────────────┐ ┌───────▼───────┐
│ │Spider Middleware│ │ Item Pipeline │
└───────────│ (爬虫中间件) │──│ (数据管道) │
│ │ │ │
│ 处理爬虫输入输出│ │ 数据清洗 │
│ 深度过滤等 │ │ 数据存储 │
└───────────────┘ └───────────────┘
数据流过程
Scrapy 中的数据流由引擎控制,流程如下:
- 引擎从 Spider 获取初始请求:Spider 的
start()方法生成初始 Request - 请求被调度:Request 进入 Scheduler 队列,进行去重
- 发送请求:Engine 从 Scheduler 获取下一个 Request,发送给 Downloader
- 下载响应:Downloader 完成下载,生成 Response 返回给 Engine
- 回调处理:Engine 将 Response 发送给 Spider 的回调函数处理
- 返回数据:Spider 返回 Items 或新的 Requests
- 数据处理:Items 进入 Pipeline 进行处理和存储
- 循环继续:新的 Requests 进入 Scheduler,重复步骤 2-7
组件职责
| 组件 | 职责 |
|---|---|
| Engine | 控制整个系统的数据流,触发事件 |
| Scheduler | 接收 Engine 的请求,排队,去重 |
| Downloader | 获取网页内容,返回响应 |
| Spiders | 解析响应,提取数据,生成新请求 |
| Item Pipeline | 处理 Spider 提取的数据 |
| Downloader Middleware | 处理 Engine 和 Downloader 之间的请求/响应 |
| Spider Middleware | 处理 Engine 和 Spider 之间的请求/响应/异常 |
Spider 类型
Scrapy 提供了多种 Spider 类型,适用于不同的爬取场景。
scrapy.Spider(基类)
所有 Spider 的基类,提供了最基本的爬虫功能:
import scrapy
from myproject.items import BookItem
class BasicSpider(scrapy.Spider):
name = 'basic' # 爬虫名称,必须唯一
allowed_domains = ['example.com'] # 允许的域名
start_urls = [
'https://example.com/books/page1',
'https://example.com/books/page2',
]
# 自定义设置(覆盖项目设置)
custom_settings = {
'DOWNLOAD_DELAY': 1,
'CONCURRENT_REQUESTS': 8,
}
def parse(self, response):
"""默认回调函数,处理 start_urls 的响应"""
# 提取书籍列表
for book in response.css('div.book-item'):
item = BookItem()
item['title'] = book.css('h3.title::text').get()
item['author'] = book.css('span.author::text').get()
item['price'] = book.css('span.price::text').get()
item['url'] = response.urljoin(book.css('a::attr(href)').get())
# yield item 会发送到 Pipeline
yield item
# 也可以 yield Request 继续爬取详情页
detail_url = book.css('a::attr(href)').get()
if detail_url:
yield response.follow(
detail_url,
callback=self.parse_detail,
meta={'item': item} # 传递数据给下一个回调
)
# 翻页
next_page = response.css('a.next::attr(href)').get()
if next_page:
yield response.follow(next_page, callback=self.parse)
def parse_detail(self, response):
"""解析详情页"""
item = response.meta['item'] # 获取传递的数据
item['description'] = response.css('div.description::text').get()
item['isbn'] = response.css('span.isbn::text').get()
yield item
使用 start() 方法(推荐)
从 Scrapy 2.13 开始,推荐使用 start() 方法替代 start_urls:
import scrapy
class ModernSpider(scrapy.Spider):
name = 'modern'
allowed_domains = ['example.com']
async def start(self):
"""生成初始请求(异步方法)"""
# 方式1:直接 yield Request
yield scrapy.Request('https://example.com/books/1', callback=self.parse)
yield scrapy.Request('https://example.com/books/2', callback=self.parse)
# 方式2:动态生成
for i in range(1, 11):
yield scrapy.Request(f'https://example.com/books?page={i}')
# 方式3:从文件读取
# with open('urls.txt') as f:
# for url in f:
# yield scrapy.Request(url.strip())
# 方式4:yield Item(Scrapy 2.12+ 支持)
# 可以直接在 start() 中 yield Item
# yield {'type': 'initial_data', 'source': 'start'}
重要说明:
从 Scrapy 2.13 开始:
- asyncio reactor 默认启用:无需手动配置,项目自动使用异步 I/O
start_requests()可以 yield items:不仅限于 Request,还可以直接 yield Itemstart()是异步方法:可以在其中使用await调用异步函数,例如从数据库或消息队列获取起始 URL
Spider 参数
Spider 可以接收命令行参数,用于灵活控制爬取行为:
import scrapy
class ParamSpider(scrapy.Spider):
name = 'param'
def __init__(self, category=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.category = category
async def start(self):
url = f'https://example.com/{self.category}'
yield scrapy.Request(url)
# 命令行运行
# scrapy crawl param -a category=books
# scrapy crawl param -a category=electronics
CrawlSpider(爬取型 Spider)
CrawlSpider 是最常用的 Spider 类型,用于爬取整站链接。它通过定义规则(Rules)自动跟踪链接:
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
from myproject.items import ArticleItem
class ArticleSpider(CrawlSpider):
name = 'articles'
allowed_domains = ['example.com']
start_urls = ['https://example.com/']
# 定义爬取规则
rules = (
# 规则1:匹配所有包含 /article/ 的链接,使用 parse_article 处理
Rule(
LinkExtractor(allow=r'/article/\d+\.html'),
callback='parse_article',
follow=True # 是否继续跟踪该页面中的链接
),
# 规则2:匹配分类页面,只跟踪链接不解析
Rule(
LinkExtractor(allow=r'/category/\w+\.html'),
follow=True
),
# 规则3:排除特定链接
Rule(
LinkExtractor(
allow=r'/page/\d+',
deny=r'/page/login' # 排除登录页面
),
follow=True
),
)
def parse_article(self, response):
"""解析文章详情页"""
item = ArticleItem()
item['title'] = response.css('h1.title::text').get()
item['author'] = response.css('span.author::text').get()
item['content'] = response.css('div.content').get()
item['url'] = response.url
# 获取链接文本(来自 LinkExtractor)
item['link_text'] = response.meta.get('link_text', '')
yield item
LinkExtractor 详解
LinkExtractor 是 CrawlSpider 的核心组件,用于从页面中提取链接:
from scrapy.linkextractors import LinkExtractor
# 创建链接提取器
extractor = LinkExtractor(
# 允许的 URL 正则表达式(绝对 URL)
allow=(r'/article/\d+', r'/news/\d+'),
# 排除的 URL 正则表达式
deny=(r'/login', r'/register'),
# 允许的域名
allow_domains=('example.com', 'blog.example.com'),
# 排除的域名
deny_domains=('ads.example.com',),
# 排除的文件扩展名
deny_extensions=('.pdf', '.zip', '.jpg', '.png'),
# 限制在特定 XPath 区域内提取链接
restrict_xpaths=('//div[@class="content"]', '//ul[@class="list"]'),
# 限制在特定 CSS 区域内提取链接
restrict_css=('.content', '.list'),
# 链接文本必须匹配的正则
restrict_text=(r'下一页', r'Next'),
# 要提取的标签,默认为 ('a', 'area')
tags=('a', 'area'),
# 要提取的属性,默认为 ('href',)
attrs=('href',),
# 是否规范化 URL
canonicalize=False,
# 是否去重
unique=True,
# 处理提取值的函数
process_value=lambda x: x.strip() if x else None,
)
# 使用示例
from scrapy.spiders import CrawlSpider, Rule
class MySpider(CrawlSpider):
name = 'my'
rules = (
# 只提取 content 区域内的文章链接
Rule(
LinkExtractor(
allow=r'/article/\d+',
restrict_xpaths='//div[@class="content"]'
),
callback='parse_article'
),
# 提取分页链接
Rule(
LinkExtractor(
allow=r'/page/\d+',
restrict_text=r'第\s*\d+\s*页'
),
follow=True
),
)
XMLFeedSpider
用于解析 XML 数据源(如 RSS、Atom):
from scrapy.spiders import XMLFeedSpider
from myproject.items import FeedItem
class RSSSpider(XMLFeedSpider):
name = 'rss'
allowed_domains = ['example.com']
start_urls = ['https://example.com/feed.xml']
# 迭代器类型:'iternodes'(默认,最快)、'html'、'xml'
iterator = 'iternodes'
# 要迭代的标签名
itertag = 'item'
# 命名空间(如果 XML 有命名空间)
namespaces = [
('dc', 'http://purl.org/dc/elements/1.1/'),
]
def parse_node(self, response, node):
"""解析每个 item 节点"""
item = FeedItem()
item['title'] = node.xpath('title/text()').get()
item['link'] = node.xpath('link/text()').get()
item['description'] = node.xpath('description/text()').get()
item['pub_date'] = node.xpath('pubDate/text()').get()
# 使用命名空间
item['creator'] = node.xpath('dc:creator/text()').get()
return item
CSVFeedSpider
用于解析 CSV 数据源:
from scrapy.spiders import CSVFeedSpider
from myproject.items import ProductItem
class CSVSpider(CSVFeedSpider):
name = 'csv'
allowed_domains = ['example.com']
start_urls = ['https://example.com/products.csv']
# CSV 分隔符
delimiter = ','
# 引号字符
quotechar = '"'
# 列名(如果 CSV 没有标题行)
headers = ['id', 'name', 'price', 'category']
def parse_row(self, response, row):
"""解析每一行"""
item = ProductItem()
item['id'] = row['id']
item['name'] = row['name']
item['price'] = float(row['price'])
item['category'] = row['category']
return item
SitemapSpider
通过网站的 sitemap.xml 发现链接:
from scrapy.spiders import SitemapSpider
class SitemapExampleSpider(SitemapSpider):
name = 'sitemap'
sitemap_urls = ['https://example.com/sitemap.xml']
# 根据 URL 路径选择回调函数
sitemap_rules = [
(r'/product/', 'parse_product'),
(r'/category/', 'parse_category'),
]
# 只跟踪包含特定路径的 sitemap
sitemap_follow = [r'/sitemap_products']
def parse_product(self, response):
yield {
'title': response.css('h1::text').get(),
'price': response.css('.price::text').get(),
}
def parse_category(self, response):
yield {
'name': response.css('h1::text').get(),
}
选择器
Scrapy 使用 parsel 库提供选择器功能,支持 CSS 和 XPath 两种语法。
CSS 选择器
from scrapy.spiders import Spider
class CSSSpider(Spider):
name = 'css'
def parse(self, response):
# 提取文本
title = response.css('h1.title::text').get() # 第一个
titles = response.css('h1.title::text').getall() # 所有
# 提取属性
href = response.css('a.link::attr(href)').get()
src = response.css('img::attr(src)').getall()
# 嵌套选择
for article in response.css('div.article'):
title = article.css('h2::text').get()
author = article.css('.author::text').get()
# 伪类选择
first_item = response.css('li:first-child::text').get()
nth_item = response.css('li:nth-child(2)::text').get()
# 属性选择器
links = response.css('a[href^="http"]::attr(href)').getall() # 以 http 开头
pdf_links = response.css('a[href$=".pdf"]::attr(href)').getall() # 以 .pdf 结尾
# 包含文本
python_links = response.css('a:contains("Python")::attr(href)').getall()
XPath 选择器
from scrapy.spiders import Spider
class XPathSpider(Spider):
name = 'xpath'
def parse(self, response):
# 基本选择
titles = response.xpath('//h1/text()').getall()
# 属性选择
links = response.xpath('//a/@href').getall()
# 条件选择
div = response.xpath('//div[@class="content"]')
# 文本匹配
login_link = response.xpath('//a[text()="登录"]/@href').get()
python_links = response.xpath('//a[contains(text(), "Python")]/@href').getall()
# 位置选择
first_li = response.xpath('//li[1]/text()').get()
last_li = response.xpath('//li[last()]/text()').get()
# 轴选择
following = response.xpath('//div[@id="main"]/following-sibling::div')
preceding = response.xpath('//div[@id="main"]/preceding-sibling::div')
# 使用正则
prices = response.xpath('//span[@class="price"]/text()').re(r'\d+\.?\d*')
选择器性能优化
def parse(self, response):
# 推荐:使用 .get() 和 .getall()
title = response.css('h1::text').get()
# 避免:使用 .extract() 和 .extract_first()(已弃用)
# title = response.css('h1::text').extract_first()
# 推荐:链式选择
for article in response.css('div.article'):
# 直接在子选择器中继续选择
title = article.css('h2::text').get()
# 性能:尽量使用更具体的选择器
# 慢
items = response.xpath('//div[@class="list"]//div[@class="item"]')
# 快
items = response.css('.list > .item')
响应类型
Scrapy 提供了多种响应类型,根据响应内容的 Content-Type 自动选择:
HtmlResponse(默认)
用于 HTML 响应,支持 CSS 和 XPath 选择器:
def parse(self, response):
# response 是 HtmlResponse 类型
title = response.css('h1::text').get()
links = response.xpath('//a/@href').getall()
TextResponse
用于纯文本响应:
def parse(self, response):
# 纯文本内容
text = response.text
JsonResponse(Scrapy 2.12+)
用于 JSON API 响应,提供便捷的 JSON 数据访问:
import scrapy
class ApiSpider(scrapy.Spider):
name = 'api'
def parse(self, response):
# 当 Content-Type 是 application/json 时
# response 会自动转换为 JsonResponse
# 方式1:使用 jmespath 查询(推荐)
# 需要安装:pip install jmespath
names = response.jmespath('items[*].name').getall()
first_item = response.jmespath('items[0]').get()
# 方式2:直接访问 JSON 数据
data = response.json()
for item in data.get('items', []):
yield {
'name': item.get('name'),
'price': item.get('price'),
}
# 方式3:json() 方法返回解析后的字典
# 与 response.json() 等价
json_data = response.json()
JsonResponse 的优势:
| 方法 | 说明 |
|---|---|
response.json() | 返回解析后的 Python 字典/列表 |
response.jmespath(query) | 使用 JMESPath 查询 JSON 数据 |
response.jmespath(query).get() | 获取单个查询结果 |
response.jmespath(query).getall() | 获取所有匹配结果 |
JMESPath 查询示例:
# 假设 JSON 数据结构:
# {
# "items": [
# {"name": "Python", "price": 59.9},
# {"name": "Java", "price": 49.9}
# ],
# "total": 100,
# "page": 1
# }
# 获取所有名称
names = response.jmespath('items[*].name').getall()
# ['Python', 'Java']
# 获取第一个元素
first = response.jmespath('items[0]').get()
# {'name': 'Python', 'price': 59.9}
# 获取价格大于 50 的商品
expensive = response.jmespath('items[?price > `50`]').getall()
# 获取总数
total = response.jmespath('total').get()
# 100
XmlResponse
用于 XML 响应:
def parse(self, response):
# XML 响应
items = response.xpath('//item')
for item in items:
yield {
'title': item.xpath('title/text()').get(),
'link': item.xpath('link/text()').get(),
}
Item 和 Item Loader
定义 Item
Item 是定义爬取数据的容器:
# items.py
import scrapy
class BookItem(scrapy.Item):
"""书籍信息"""
# 基本字段
title = scrapy.Field()
author = scrapy.Field()
price = scrapy.Field()
isbn = scrapy.Field()
# URL 相关
url = scrapy.Field()
cover_image = scrapy.Field()
# 分类信息
category = scrapy.Field()
tags = scrapy.Field()
# 时间信息
publish_date = scrapy.Field()
crawl_time = scrapy.Field()
class ArticleItem(scrapy.Item):
"""文章信息"""
title = scrapy.Field()
content = scrapy.Field()
author = scrapy.Field()
publish_time = scrapy.Field()
url = scrapy.Field()
使用 Item
from myproject.items import BookItem
import datetime
class BookSpider(scrapy.Spider):
name = 'books'
def parse(self, response):
for book in response.css('div.book-item'):
# 创建 Item 实例
item = BookItem()
# 填充数据
item['title'] = book.css('h3::text').get()
item['author'] = book.css('.author::text').get()
item['price'] = book.css('.price::text').get()
item['url'] = response.url
item['crawl_time'] = datetime.datetime.now().isoformat()
yield item
# 或者使用字典形式
yield {
'title': response.css('h1::text').get(),
'content': response.css('div.content::text').get(),
}
Item Loader
Item Loader 提供了一种更结构化的方式来填充 Item,支持输入处理和输出处理:
# item_loaders.py
from itemloaders import ItemLoader
from itemloaders.processors import MapCompose, TakeFirst, Join, Identity
from myproject.items import BookItem
import datetime
import re
class BookLoader(ItemLoader):
"""书籍 Item Loader"""
# 默认输出处理器:取第一个非空值
default_output_processor = TakeFirst()
# 特殊字段处理器
title_in = MapCompose(str.strip) # 输入时去除空白
title_out = TakeFirst()
price_in = MapCompose(
lambda x: x.strip(),
lambda x: re.search(r'[\d.]+', x).group() if re.search(r'[\d.]+', x) else None,
lambda x: float(x) if x else None
)
tags_in = MapCompose(str.strip)
tags_out = Identity() # 保持列表形式
content_out = Join('\n') # 将多个文本片段合并
# 在 Spider 中使用
class BookSpider(scrapy.Spider):
name = 'books'
def parse(self, response):
for book in response.css('div.book-item'):
loader = BookLoader(item=BookItem(), selector=book)
# 使用 CSS 选择器添加值
loader.add_css('title', 'h3.title::text')
loader.add_css('author', '.author::text')
loader.add_css('price', '.price::text')
loader.add_css('tags', '.tag::text')
# 使用 XPath 添加值
loader.add_xpath('isbn', '//span[@class="isbn"]/text()')
# 直接添加值
loader.add_value('url', response.url)
loader.add_value('crawl_time', datetime.datetime.now().isoformat())
yield loader.load_item()
Item Pipeline
Pipeline 用于处理 Spider 提取的数据,包括清洗、验证、去重、存储等。
Pipeline 方法
class MyPipeline:
def open_spider(self, spider):
"""Spider 启动时调用,用于初始化资源"""
pass
def close_spider(self, spider):
"""Spider 关闭时调用,用于清理资源"""
pass
def process_item(self, item, spider):
"""处理每个 Item,必须实现"""
return item # 返回 item 继续传递给下一个 Pipeline
# 或 raise DropItem('原因') # 丢弃 item
数据清洗 Pipeline
# pipelines.py
import re
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class CleaningPipeline:
"""数据清洗"""
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 清理字符串空白
for field in adapter.field_names():
value = adapter.get(field)
if isinstance(value, str):
adapter[field] = value.strip()
return item
class PricePipeline:
"""价格处理"""
vat_factor = 1.15 # 增值税率
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 提取价格数字
price = adapter.get('price')
if price:
match = re.search(r'[\d.]+', str(price))
if match:
adapter['price'] = float(match.group())
else:
adapter['price'] = None
# 如果价格不含税,添加税费
if adapter.get('price') and adapter.get('price_excludes_vat'):
adapter['price'] = adapter['price'] * self.vat_factor
return item
class ValidationPipeline:
"""数据验证"""
required_fields = ['title', 'url']
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 检查必填字段
for field in self.required_fields:
if not adapter.get(field):
raise DropItem(f"缺少必填字段: {field}")
return item
class DuplicatesPipeline:
"""去重"""
def __init__(self):
self.ids_seen = set()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 根据 id 去重
item_id = adapter.get('id') or adapter.get('url')
if item_id in self.ids_seen:
raise DropItem(f"重复项: {item_id}")
self.ids_seen.add(item_id)
return item
存储 Pipeline
JSON 文件存储
import json
from itemadapter import ItemAdapter
class JsonPipeline:
"""保存到 JSON Lines 文件"""
def open_spider(self, spider):
self.file = open('items.jl', 'w', encoding='utf-8')
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(ItemAdapter(item).asdict(), ensure_ascii=False) + '\n'
self.file.write(line)
return item
SQLite 存储
import sqlite3
class SQLitePipeline:
"""保存到 SQLite"""
def __init__(self, sqlite_file):
self.sqlite_file = sqlite_file
@classmethod
def from_crawler(cls, crawler):
"""从设置中获取参数"""
return cls(
sqlite_file=crawler.settings.get('SQLITE_FILE', 'items.db')
)
def open_spider(self, spider):
self.conn = sqlite3.connect(self.sqlite_file)
self.cursor = self.conn.cursor()
# 创建表
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
author TEXT,
price REAL,
url TEXT UNIQUE,
crawl_time TEXT
)
''')
self.conn.commit()
def close_spider(self, spider):
self.conn.close()
def process_item(self, item, spider):
self.cursor.execute('''
INSERT OR IGNORE INTO books (title, author, price, url, crawl_time)
VALUES (?, ?, ?, ?, ?)
''', (
item.get('title'),
item.get('author'),
item.get('price'),
item.get('url'),
item.get('crawl_time')
))
self.conn.commit()
return item
MongoDB 存储
import pymongo
from itemadapter import ItemAdapter
class MongoDBPipeline:
"""保存到 MongoDB"""
collection_name = 'items'
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI', 'mongodb://localhost:27017'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'scrapy')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
self.db[self.collection_name].insert_one(ItemAdapter(item).asdict())
return item
MySQL 存储
import pymysql
class MySQLPipeline:
"""保存到 MySQL"""
def __init__(self, host, port, user, password, database):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
@classmethod
def from_crawler(cls, crawler):
return cls(
host=crawler.settings.get('MYSQL_HOST', 'localhost'),
port=crawler.settings.get('MYSQL_PORT', 3306),
user=crawler.settings.get('MYSQL_USER', 'root'),
password=crawler.settings.get('MYSQL_PASSWORD', ''),
database=crawler.settings.get('MYSQL_DATABASE', 'scrapy')
)
def open_spider(self, spider):
self.conn = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4'
)
self.cursor = self.conn.cursor()
def close_spider(self, spider):
self.conn.close()
def process_item(self, item, spider):
columns = ', '.join(item.keys())
placeholders = ', '.join(['%s'] * len(item))
sql = f"INSERT INTO books ({columns}) VALUES ({placeholders})"
self.cursor.execute(sql, list(item.values()))
self.conn.commit()
return item
启用 Pipeline
在 settings.py 中配置 Pipeline:
# settings.py
# 启用 Pipeline,数字表示优先级(0-1000),数字越小越先执行
ITEM_PIPELINES = {
'myproject.pipelines.CleaningPipeline': 100, # 先清洗
'myproject.pipelines.ValidationPipeline': 200, # 再验证
'myproject.pipelines.DuplicatesPipeline': 300, # 去重
'myproject.pipelines.SQLitePipeline': 400, # 最后存储
# 'myproject.pipelines.MongoDBPipeline': 500, # 可以配置多个存储
}
# 数据库配置
SQLITE_FILE = 'data/items.db'
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'scrapy'
中间件
中间件是 Scrapy 扩展功能的核心机制,分为下载器中间件和爬虫中间件。
下载器中间件
下载器中间件处理 Engine 和 Downloader 之间的请求和响应。
中间件方法
class MyDownloaderMiddleware:
def process_request(self, request, spider):
"""
处理请求,在请求发送到 Downloader 之前调用
返回值:
- None:继续处理请求
- Response:直接返回响应,不再下载
- Request:重新调度请求
- raise IgnoreRequest:忽略请求
"""
# 设置请求头
request.headers['User-Agent'] = 'Mozilla/5.0 ...'
return None
def process_response(self, request, response, spider):
"""
处理响应,在响应返回给 Engine 之前调用
返回值:
- Response:继续处理响应
- Request:重新调度请求
- raise IgnoreRequest:忽略请求
"""
# 检查响应状态
if response.status == 403:
spider.logger.warning(f'访问被拒绝: {request.url}')
return response
def process_exception(self, request, exception, spider):
"""
处理异常,在下载或处理过程中发生异常时调用
返回值:
- None:继续处理异常
- Response:返回响应
- Request:重新调度请求
"""
spider.logger.error(f'请求异常: {request.url}, 错误: {exception}')
return None
代理中间件
# middlewares.py
import random
class ProxyMiddleware:
"""代理中间件"""
def __init__(self, proxy_list):
self.proxy_list = proxy_list
@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_list=crawler.settings.get('PROXY_LIST', [])
)
def process_request(self, request, spider):
if self.proxy_list:
proxy = random.choice(self.proxy_list)
request.meta['proxy'] = proxy
spider.logger.debug(f'使用代理: {proxy}')
# settings.py
PROXY_LIST = [
'http://127.0.0.1:7890',
'http://user:pass@proxy1:8080',
'http://user:pass@proxy2:8080',
]
DOWNLOADER_MIDDLEWARES = {
'myproject.middlewares.ProxyMiddleware': 100,
}
User-Agent 中间件
import random
class UserAgentMiddleware:
"""随机 User-Agent 中间件"""
def __init__(self):
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/120.0.0.0',
]
def process_request(self, request, spider):
request.headers['User-Agent'] = random.choice(self.user_agents)
重试中间件
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.utils.response import response_status_message
class CustomRetryMiddleware(RetryMiddleware):
"""自定义重试中间件"""
def __init__(self, settings):
super().__init__(settings)
self.max_retry_times = settings.getint('RETRY_TIMES')
self.retry_http_codes = set(int(x) for x in settings.getlist('RETRY_HTTP_CODES'))
def process_response(self, request, response, spider):
if request.meta.get('dont_retry', False):
return response
# 自定义重试逻辑
if response.status in self.retry_http_codes:
reason = response_status_message(response.status)
return self._retry(request, reason, spider) or response
return response
def process_exception(self, request, exception, spider):
if isinstance(exception, self.EXCEPTIONS_TO_RETRY):
return self._retry(request, exception, spider)
爬虫中间件
爬虫中间件处理 Engine 和 Spider 之间的交互:
class MySpiderMiddleware:
def process_spider_input(self, response, spider):
"""
响应进入 Spider 之前调用
返回 None 或抛出异常
"""
pass
def process_spider_output(self, response, result, spider):
"""
Spider 处理完响应后调用
必须返回可迭代对象(Request 或 Item)
"""
for item in result:
yield item
def process_spider_exception(self, response, exception, spider):
"""
Spider 或中间件抛出异常时调用
返回 None 或可迭代对象
"""
spider.logger.error(f'Spider 异常: {exception}')
return None
def process_start_requests(self, start_requests, spider):
"""
处理 Spider 的初始请求
必须返回可迭代对象
"""
for request in start_requests:
yield request
启用中间件
# settings.py
DOWNLOADER_MIDDLEWARES = {
# 自定义中间件
'myproject.middlewares.ProxyMiddleware': 100,
'myproject.middlewares.UserAgentMiddleware': 200,
# 禁用内置中间件(设为 None)
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
}
SPIDER_MIDDLEWARES = {
'myproject.middlewares.MySpiderMiddleware': 100,
}
配置设置
常用设置
# settings.py
# ==================== 基本设置 ====================
# 项目名称
BOT_NAME = 'myproject'
# Spider 模块
SPIDER_MODULES = ['myproject.spiders']
NEWSPIDER_MODULE = 'myproject.spiders'
# User-Agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
# 是否遵循 robots.txt
ROBOTSTXT_OBEY = True
# ==================== 并发设置 ====================
# 并发请求数(全局)
CONCURRENT_REQUESTS = 16
# 并发请求数(每个域名)
CONCURRENT_REQUESTS_PER_DOMAIN = 8
# 并发请求数(每个 IP)
CONCURRENT_REQUESTS_PER_IP = 0 # 0 表示不限制
# 下载延迟(秒)
DOWNLOAD_DELAY = 1
# 随机延迟(0.5 ~ 1.5 * DOWNLOAD_DELAY)
RANDOMIZE_DOWNLOAD_DELAY = True
# ==================== 超时设置 ====================
# 下载超时(秒)
DOWNLOAD_TIMEOUT = 30
# ==================== 重试设置 ====================
RETRY_ENABLED = True
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 522, 524, 408, 429]
# ==================== 缓存设置 ====================
# HTTP 缓存(开发时有用)
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 0 # 0 表示永不过期
HTTPCACHE_DIR = 'httpcache'
HTTPCACHE_IGNORE_HTTP_CODES = []
# ==================== 日志设置 ====================
LOG_LEVEL = 'INFO' # DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_FILE = 'logs/scrapy.log'
LOG_FORMAT = '%(asctime)s [%(name)s] %(levelname)s: %(message)s'
# ==================== 其他设置 ====================
# 默认请求头
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
# Cookie 设置
COOKIES_ENABLED = True
COOKIES_DEBUG = False
# 自动限速扩展
AUTOTHROTTLE_ENABLED = False
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 60
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
Feed 导出
# 导出格式
FEED_FORMAT = 'json' # json, jsonlines, csv, xml, pickle, marshal
# 导出文件
FEED_URI = 'items.json'
# 导出编码
FEED_EXPORT_ENCODING = 'utf-8'
# 导出字段(空表示全部)
FEED_EXPORT_FIELDS = ['title', 'author', 'price']
# 导出缩进
FEED_STORE_EMPTY = False
FEED_EXPORT_INDENT = 2
# 命令行动态指定
# scrapy crawl myspider -o items.json
# scrapy crawl myspider -o items.csv
# scrapy crawl myspider -o items.jl
信号
信号用于在特定事件发生时执行自定义操作:
from scrapy import signals
from scrapy.signalmanager import dispatcher
class MySpider(scrapy.Spider):
name = 'my'
@classmethod
def from_crawler(cls, crawler):
spider = super().from_crawler(crawler)
# 注册信号处理器
crawler.signals.connect(spider.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(spider.spider_error, signal=signals.spider_error)
crawler.signals.connect(spider.item_scraped, signal=signals.item_scraped)
return spider
def spider_opened(self, spider):
"""Spider 启动时调用"""
self.logger.info(f'Spider 启动: {spider.name}')
def spider_closed(self, spider, reason):
"""Spider 关闭时调用"""
self.logger.info(f'Spider 关闭: {spider.name}, 原因: {reason}')
def spider_error(self, failure, response, spider):
"""Spider 发生错误时调用"""
self.logger.error(f'Spider 错误: {failure}')
def item_scraped(self, item, response, spider):
"""Item 被抓取时调用"""
self.logger.info(f'抓取 Item: {item.get("title")}')
# 常用信号
# signals.spider_opened - Spider 启动
# signals.spider_closed - Spider 关闭
# signals.spider_error - Spider 错误
# signals.item_scraped - Item 被抓取
# signals.item_dropped - Item 被丢弃
# signals.response_received - 响应被接收
# signals.request_scheduled - 请求被调度
命令行工具
# 创建项目
scrapy startproject myproject
# 创建爬虫
scrapy genspider myspider example.com
scrapy genspider -t crawl myspider example.com # 使用 CrawlSpider 模板
# 运行爬虫
scrapy crawl myspider
scrapy crawl myspider -o items.json # 导出结果
scrapy crawl myspider -a category=books # 传递参数
scrapy crawl myspider -s LOG_LEVEL=DEBUG # 覆盖设置
# 列出所有爬虫
scrapy list
# 交互式 Shell(测试选择器)
scrapy shell https://example.com
scrapy shell 'https://example.com' -s USER_AGENT='Mozilla/5.0...'
# 检查爬虫
scrapy check
# 查看设置
scrapy settings --get BOT_NAME
scrapy settings --get DOWNLOAD_DELAY
# 运行爬虫并保存状态(断点续爬)
scrapy crawl myspider -s JOBDIR=crawls/myspider-1
# 版本信息
scrapy version
scrapy version -v
完整项目示例
下面是一个完整的 Scrapy 项目示例,爬取书籍信息:
项目结构
bookscraper/
├── scrapy.cfg
└── bookscraper/
├── __init__.py
├── items.py
├── middlewares.py
├── pipelines.py
├── settings.py
└── spiders/
├── __init__.py
└── books.py
items.py
import scrapy
class BookItem(scrapy.Item):
title = scrapy.Field()
author = scrapy.Field()
price = scrapy.Field()
rating = scrapy.Field()
category = scrapy.Field()
description = scrapy.Field()
url = scrapy.Field()
crawl_time = scrapy.Field()
spiders/books.py
import scrapy
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
from bookscraper.items import BookItem
import datetime
class BooksSpider(CrawlSpider):
name = 'books'
allowed_domains = ['books.toscrape.com']
start_urls = ['https://books.toscrape.com/']
rules = (
# 分类页面:跟踪链接
Rule(
LinkExtractor(restrict_xpaths='//ul[@class="nav-list"]//a'),
follow=True
),
# 书籍详情页:解析内容
Rule(
LinkExtractor(restrict_xpaths='//h3/a'),
callback='parse_book'
),
# 分页:跟踪链接
Rule(
LinkExtractor(restrict_xpaths='//li[@class="next"]/a'),
follow=True
),
)
def parse_book(self, response):
"""解析书籍详情页"""
item = BookItem()
item['title'] = response.xpath('//h1/text()').get()
item['price'] = response.xpath('//p[@class="price_color"]/text()').get()
item['rating'] = response.xpath('//p[contains(@class, "star-rating")]/@class').get().split()[-1]
item['description'] = response.xpath('//div[@id="product_description"]/following-sibling::p/text()').get()
item['category'] = response.xpath('//ul[@class="breadcrumb"]/li[3]/a/text()').get()
item['url'] = response.url
item['crawl_time'] = datetime.datetime.now().isoformat()
yield item
pipelines.py
import sqlite3
import datetime
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class DuplicatesPipeline:
"""去重"""
def __init__(self):
self.urls_seen = set()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
url = adapter.get('url')
if url in self.urls_seen:
raise DropItem(f'重复项: {url}')
self.urls_seen.add(url)
return item
class CleaningPipeline:
"""数据清洗"""
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 清理价格
price = adapter.get('price', '')
if price:
adapter['price'] = price.replace('£', '').strip()
# 清理描述
desc = adapter.get('description')
if desc:
adapter['description'] = desc.strip()
return item
class SQLitePipeline:
"""SQLite 存储"""
def __init__(self, sqlite_file):
self.sqlite_file = sqlite_file
@classmethod
def from_crawler(cls, crawler):
return cls(sqlite_file=crawler.settings.get('SQLITE_FILE', 'books.db'))
def open_spider(self, spider):
self.conn = sqlite3.connect(self.sqlite_file)
self.cursor = self.conn.cursor()
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
author TEXT,
price REAL,
rating TEXT,
category TEXT,
description TEXT,
url TEXT UNIQUE,
crawl_time TEXT
)
''')
self.conn.commit()
def close_spider(self, spider):
self.conn.close()
def process_item(self, item, spider):
self.cursor.execute('''
INSERT OR IGNORE INTO books
(title, price, rating, category, description, url, crawl_time)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
item.get('title'),
item.get('price'),
item.get('rating'),
item.get('category'),
item.get('description'),
item.get('url'),
item.get('crawl_time')
))
self.conn.commit()
return item
settings.py
BOT_NAME = 'bookscraper'
SPIDER_MODULES = ['bookscraper.spiders']
NEWSPIDER_MODULE = 'bookscraper.spiders'
ROBOTSTXT_OBEY = True
CONCURRENT_REQUESTS = 8
DOWNLOAD_DELAY = 1
ITEM_PIPELINES = {
'bookscraper.pipelines.DuplicatesPipeline': 100,
'bookscraper.pipelines.CleaningPipeline': 200,
'bookscraper.pipelines.SQLitePipeline': 300,
}
SQLITE_FILE = 'data/books.db'
LOG_LEVEL = 'INFO'
运行
# 创建数据目录
mkdir data
# 运行爬虫
scrapy crawl books
# 导出为 JSON
scrapy crawl books -o books.json
# 传递参数
scrapy crawl books -s DOWNLOAD_DELAY=2
最佳实践
1. 合理设置并发和延迟
# 对于小型网站
CONCURRENT_REQUESTS = 4
DOWNLOAD_DELAY = 2
# 对于大型网站
CONCURRENT_REQUESTS = 16
DOWNLOAD_DELAY = 0.5
# 使用自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_TARGET_CONCURRENCY = 2.0
2. 使用 HTTP 缓存加速开发
# 开发时开启缓存
HTTPCACHE_ENABLED = True
HTTPCACHE_EXPIRATION_SECS = 0 # 永不过期
# 生产环境关闭
HTTPCACHE_ENABLED = False
3. 合理处理错误
class MySpider(scrapy.Spider):
def parse(self, response):
try:
# 可能出错的操作
data = response.css('.data::text').get()
if not data:
self.logger.warning(f'未找到数据: {response.url}')
return
yield {'data': data}
except Exception as e:
self.logger.error(f'解析错误: {response.url}, {e}')
4. 断点续爬
# 启用断点续爬
scrapy crawl myspider -s JOBDIR=crawls/myspider-1
# 中断后继续运行相同命令即可恢复
5. 使用 Item Loader 提高代码可维护性
# 不推荐
item['title'] = response.css('h1::text').get().strip()
item['price'] = float(response.css('.price::text').get().replace('$', ''))
# 推荐
loader = ItemLoader(item=BookItem(), response=response)
loader.add_css('title', 'h1::text')
loader.add_css('price', '.price::text')
yield loader.load_item()
小结
本章我们学习了:
- Scrapy 架构 - 核心组件和数据流
- Spider 类型 - Spider、CrawlSpider、XMLFeedSpider 等
- LinkExtractor - 链接提取器的详细用法
- 选择器 - CSS 和 XPath 选择器
- Item 和 Item Loader - 数据定义和处理
- Pipeline - 数据清洗、验证、存储
- 中间件 - 下载器中间件和爬虫中间件
- 配置设置 - 并发、延迟、缓存等
- 信号 - 事件驱动编程
- 最佳实践 - 生产环境建议
Scrapy 是一个功能完善的爬虫框架,适合大规模、长期维护的爬虫项目。
练习
- 使用 CrawlSpider 爬取一个新闻网站的所有文章
- 实现一个完整的 Pipeline,包括清洗、验证、去重和存储
- 编写一个自定义下载器中间件实现随机代理
- 使用信号统计爬虫的运行数据(请求数、Item 数量等)