实战案例
本章通过几个实际案例,综合运用前面学到的爬虫技术。
案例1:新闻网站爬虫
爬取新闻网站的标题、内容、发布时间等信息。
分析目标网站
首先使用浏览器开发者工具分析网页结构:
- 打开目标网站,按 F12 打开开发者工具
- 使用元素选择器定位新闻列表
- 分析每条新闻的 HTML 结构
- 找到分页链接的规律
实现代码
import requests
from bs4 import BeautifulSoup
import csv
import time
import random
import re
from datetime import datetime
class NewsSpider:
def __init__(self):
self.base_url = 'https://example.com/news'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}
self.session = requests.Session()
self.session.headers.update(self.headers)
self.articles = []
def get_page(self, page: int) -> str:
url = f'{self.base_url}?page={page}'
response = self.session.get(url, timeout=10)
response.encoding = response.apparent_encoding or 'utf-8'
return response.text
def parse_list(self, html: str) -> list:
soup = BeautifulSoup(html, 'lxml')
articles = []
for item in soup.select('.news-item'):
article = {}
title_elem = item.select_one('.title a')
if title_elem:
article['title'] = title_elem.get_text(strip=True)
article['url'] = title_elem.get('href', '')
summary_elem = item.select_one('.summary')
article['summary'] = summary_elem.get_text(strip=True) if summary_elem else ''
date_elem = item.select_one('.date')
article['date'] = date_elem.get_text(strip=True) if date_elem else ''
if article.get('title'):
articles.append(article)
return articles
def parse_detail(self, html: str) -> dict:
soup = BeautifulSoup(html, 'lxml')
detail = {}
content_elem = soup.select_one('.article-content')
if content_elem:
for tag in content_elem.select('script, style'):
tag.decompose()
detail['content'] = content_elem.get_text(strip=True)
author_elem = soup.select_one('.author')
detail['author'] = author_elem.get_text(strip=True) if author_elem else ''
return detail
def clean_text(self, text: str) -> str:
if not text:
return ''
text = re.sub(r'\s+', ' ', text)
return text.strip()
def save_to_csv(self, filename: str = 'news.csv'):
if not self.articles:
return
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = ['title', 'url', 'summary', 'date', 'author', 'content']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for article in self.articles:
row = {k: self.clean_text(str(v)) for k, v in article.items() if k in fieldnames}
writer.writerow(row)
print(f'已保存 {len(self.articles)} 条新闻到 {filename}')
def run(self, pages: int = 5, fetch_detail: bool = False):
for page in range(1, pages + 1):
print(f'正在抓取第 {page} 页...')
try:
html = self.get_page(page)
articles = self.parse_list(html)
for article in articles:
if fetch_detail and article.get('url'):
time.sleep(random.uniform(0.5, 1.5))
detail_html = self.session.get(article['url'], timeout=10).text
detail = self.parse_detail(detail_html)
article.update(detail)
self.articles.append(article)
print(f'第 {page} 页完成,获取 {len(articles)} 条新闻')
except Exception as e:
print(f'第 {page} 页出错: {e}')
time.sleep(random.uniform(1, 3))
self.save_to_csv()
if __name__ == '__main__':
spider = NewsSpider()
spider.run(pages=3, fetch_detail=False)
关键点说明
- 列表页解析:提取新闻标题、链接、摘要等信息
- 详情页解析:可选,获取完整内容
- 文本清洗:去除多余空白字符
- 错误处理:捕获异常,避免程序中断
- 请求间隔:控制请求频率
案例2:电商商品爬虫
爬取电商网站的商品信息,包括名称、价格、评价等。
分析动态加载
很多电商网站使用 JavaScript 动态加载数据,需要找到 API 接口:
- 打开开发者工具,切换到 Network 标签
- 筛选 XHR/Fetch 请求
- 找到返回商品数据的 API
- 分析请求参数和响应格式
实现代码
import requests
import json
import time
import random
import csv
from typing import List, Dict
class ProductSpider:
def __init__(self):
self.api_url = 'https://api.example.com/products'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
'Referer': 'https://example.com/products',
}
self.session = requests.Session()
self.session.headers.update(self.headers)
self.products = []
def get_products(self, page: int, category: str = '') -> List[Dict]:
params = {
'page': page,
'size': 20,
'category': category,
}
response = self.session.get(self.api_url, params=params, timeout=10)
data = response.json()
return data.get('items', [])
def parse_product(self, item: Dict) -> Dict:
return {
'id': item.get('id'),
'name': item.get('name'),
'price': item.get('price'),
'original_price': item.get('originalPrice'),
'discount': item.get('discount', 0),
'rating': item.get('rating', 0),
'review_count': item.get('reviewCount', 0),
'sales': item.get('sales', 0),
'shop': item.get('shop', {}).get('name'),
'category': item.get('category'),
'url': f"https://example.com/product/{item.get('id')}",
}
def save_to_json(self, filename: str = 'products.json'):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.products, f, ensure_ascii=False, indent=2)
print(f'已保存 {len(self.products)} 个商品到 {filename}')
def save_to_csv(self, filename: str = 'products.csv'):
if not self.products:
return
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = list(self.products[0].keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.products)
print(f'已保存 {len(self.products)} 个商品到 {filename}')
def run(self, pages: int = 10, categories: List[str] = None):
if categories is None:
categories = ['']
for category in categories:
print(f'正在爬取分类: {category or "全部"}')
for page in range(1, pages + 1):
print(f' 第 {page} 页...')
try:
items = self.get_products(page, category)
if not items:
print(f' 第 {page} 页没有数据,停止')
break
for item in items:
product = self.parse_product(item)
self.products.append(product)
print(f' 第 {page} 页完成,获取 {len(items)} 个商品')
except Exception as e:
print(f' 第 {page} 页出错: {e}')
time.sleep(random.uniform(0.5, 1.5))
self.save_to_json()
self.save_to_csv()
if __name__ == '__main__':
spider = ProductSpider()
spider.run(pages=5)
关键点说明
- API 分析:找到数据接口,直接请求 JSON 数据
- 数据解析:从 JSON 响应中提取需要的字段
- 分类爬取:支持按分类爬取商品
- 多格式保存:同时保存 JSON 和 CSV 格式
案例3:异步爬虫实战
使用异步爬虫高效爬取大量页面。
import asyncio
import aiohttp
import csv
import random
import time
from typing import List, Optional
from bs4 import BeautifulSoup
class AsyncSpider:
def __init__(
self,
max_concurrent: int = 10,
request_delay: float = 0.3,
timeout: int = 30,
max_retries: int = 3,
):
self.max_concurrent = max_concurrent
self.request_delay = request_delay
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.semaphore = asyncio.Semaphore(max_concurrent)
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}
self.results = []
async def fetch(
self,
session: aiohttp.ClientSession,
url: str,
) -> Optional[str]:
async with self.semaphore:
await asyncio.sleep(random.uniform(0, self.request_delay))
for attempt in range(self.max_retries):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.text()
elif response.status in [429, 500, 502, 503, 504]:
wait = 2 ** attempt + random.random()
await asyncio.sleep(wait)
else:
return None
except asyncio.TimeoutError:
print(f'超时: {url}')
except Exception as e:
if attempt == self.max_retries - 1:
print(f'错误: {url} - {e}')
else:
await asyncio.sleep(2 ** attempt)
return None
def parse(self, html: str, url: str) -> dict:
soup = BeautifulSoup(html, 'lxml')
title_elem = soup.select_one('h1')
title = title_elem.get_text(strip=True) if title_elem else ''
content_elem = soup.select_one('.content')
content = content_elem.get_text(strip=True) if content_elem else ''
return {
'url': url,
'title': title,
'content': content[:500] if content else '',
}
async def crawl_page(
self,
session: aiohttp.ClientSession,
url: str,
):
html = await self.fetch(session, url)
if html:
result = self.parse(html, url)
self.results.append(result)
async def run(self, urls: List[str]):
async with aiohttp.ClientSession(
headers=self.headers,
timeout=self.timeout
) as session:
tasks = [self.crawl_page(session, url) for url in urls]
await asyncio.gather(*tasks)
def save_to_csv(self, filename: str = 'results.csv'):
if not self.results:
return
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = list(self.results[0].keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.results)
print(f'已保存 {len(self.results)} 条数据到 {filename}')
async def main():
spider = AsyncSpider(
max_concurrent=10,
request_delay=0.3,
timeout=30,
max_retries=3,
)
urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(50)]
start = time.time()
await spider.run(urls)
elapsed = time.time() - start
print(f'完成: {len(spider.results)}/{len(urls)} 个请求')
print(f'耗时: {elapsed:.2f}秒')
spider.save_to_csv()
if __name__ == '__main__':
asyncio.run(main())
性能对比
同样的任务,同步和异步的性能差异:
| 请求数 | 同步耗时 | 异步耗时 | 提升 |
|---|---|---|---|
| 10 | 10秒 | 1秒 | 10倍 |
| 50 | 50秒 | 5秒 | 10倍 |
| 100 | 100秒 | 10秒 | 10倍 |
案例4:Scrapy 项目实战
使用 Scrapy 框架构建完整的爬虫项目。
创建项目
scrapy startproject myproject
cd myproject
scrapy genspider books example.com
定义 Item
# myproject/items.py
import scrapy
class BookItem(scrapy.Item):
title = scrapy.Field()
author = scrapy.Field()
price = scrapy.Field()
rating = scrapy.Field()
category = scrapy.Field()
url = scrapy.Field()
编写 Spider
# myproject/spiders/books.py
import scrapy
from myproject.items import BookItem
class BooksSpider(scrapy.Spider):
name = 'books'
allowed_domains = ['example.com']
start_urls = ['https://example.com/books']
def parse(self, response):
for book in response.css('.book-item'):
item = BookItem()
item['title'] = book.css('.title::text').get()
item['author'] = book.css('.author::text').get()
item['price'] = book.css('.price::text').get()
item['rating'] = book.css('.rating::text').get()
item['url'] = response.urljoin(book.css('a::attr(href)').get())
yield item
next_page = response.css('.next-page::attr(href)').get()
if next_page:
yield response.follow(next_page, self.parse)
编写 Pipeline
# myproject/pipelines.py
import csv
import json
class JsonPipeline:
def open_spider(self, spider):
self.file = open('books.jl', 'w', encoding='utf-8')
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False) + '\n'
self.file.write(line)
return item
class CsvPipeline:
def open_spider(self, spider):
self.file = open('books.csv', 'w', newline='', encoding='utf-8-sig')
self.writer = None
def close_spider(self, spider):
if self.writer:
self.file.close()
def process_item(self, item, spider):
if self.writer is None:
self.writer = csv.DictWriter(self.file, fieldnames=item.keys())
self.writer.writeheader()
self.writer.writerow(dict(item))
return item
class DuplicatesPipeline:
def __init__(self):
self.urls_seen = set()
def process_item(self, item, spider):
url = item.get('url')
if url in self.urls_seen:
raise scrapy.exceptions.DropItem(f'重复: {url}')
self.urls_seen.add(url)
return item
配置 Settings
# myproject/settings.py
BOT_NAME = 'myproject'
SPIDER_MODULES = ['myproject.spiders']
NEWSPIDER_MODULE = 'myproject.spiders'
ROBOTSTXT_OBEY = True
CONCURRENT_REQUESTS = 16
DOWNLOAD_DELAY = 1
DEFAULT_REQUEST_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
}
ITEM_PIPELINES = {
'myproject.pipelines.DuplicatesPipeline': 100,
'myproject.pipelines.JsonPipeline': 200,
'myproject.pipelines.CsvPipeline': 300,
}
运行爬虫
scrapy crawl books
scrapy crawl books -o books.json
爬虫最佳实践
1. 遵守 robots.txt
import urllib.robotparser
rp = urllib.robotparser.RobotFileParser()
rp.set_url('https://example.com/robots.txt')
rp.read()
can_fetch = rp.can_fetch('MyBot', 'https://example.com/page')
2. 设置合理的请求间隔
import time
import random
def get_with_delay(url, min_delay=1, max_delay=3):
time.sleep(random.uniform(min_delay, max_delay))
return requests.get(url)
3. 使用 Session 复用连接
session = requests.Session()
for url in urls:
response = session.get(url)
4. 实现断点续爬
import json
import os
def load_progress(filename):
if os.path.exists(filename):
with open(filename, 'r') as f:
return json.load(f)
return {'completed': [], 'failed': []}
def save_progress(filename, progress):
with open(filename, 'w') as f:
json.dump(progress, f)
def crawl_with_resume(urls):
progress = load_progress('progress.json')
completed = set(progress['completed'])
for url in urls:
if url in completed:
continue
try:
response = requests.get(url)
process(response)
completed.add(url)
except Exception as e:
progress['failed'].append({'url': url, 'error': str(e)})
progress['completed'] = list(completed)
save_progress('progress.json', progress)
5. 日志记录
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='spider.log'
)
logger = logging.getLogger(__name__)
logger.info('开始爬取')
logger.warning('请求失败,重试中')
logger.error('爬取出错')
小结
本章通过四个实战案例,综合运用了爬虫技术:
- 新闻网站爬虫:列表页和详情页解析
- 电商商品爬虫:API 接口分析
- 异步爬虫:高效并发爬取
- Scrapy 项目:框架化开发
同时学习了爬虫的最佳实践,包括遵守规则、控制频率、错误处理等。
练习
- 选择一个新闻网站,实现完整的新闻爬虫
- 分析一个电商网站的 API,爬取商品信息
- 使用异步爬虫爬取 100 个页面,对比同步爬虫的性能
- 使用 Scrapy 框架重构之前的爬虫项目