跳到主要内容

实战案例

本章通过几个实际案例,综合运用前面学到的爬虫技术。

案例1:新闻网站爬虫

爬取新闻网站的标题、内容、发布时间等信息。

分析目标网站

首先使用浏览器开发者工具分析网页结构:

  1. 打开目标网站,按 F12 打开开发者工具
  2. 使用元素选择器定位新闻列表
  3. 分析每条新闻的 HTML 结构
  4. 找到分页链接的规律

实现代码

import requests
from bs4 import BeautifulSoup
import csv
import time
import random
import re
from datetime import datetime

class NewsSpider:
def __init__(self):
self.base_url = 'https://example.com/news'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}
self.session = requests.Session()
self.session.headers.update(self.headers)
self.articles = []

def get_page(self, page: int) -> str:
url = f'{self.base_url}?page={page}'
response = self.session.get(url, timeout=10)
response.encoding = response.apparent_encoding or 'utf-8'
return response.text

def parse_list(self, html: str) -> list:
soup = BeautifulSoup(html, 'lxml')
articles = []

for item in soup.select('.news-item'):
article = {}

title_elem = item.select_one('.title a')
if title_elem:
article['title'] = title_elem.get_text(strip=True)
article['url'] = title_elem.get('href', '')

summary_elem = item.select_one('.summary')
article['summary'] = summary_elem.get_text(strip=True) if summary_elem else ''

date_elem = item.select_one('.date')
article['date'] = date_elem.get_text(strip=True) if date_elem else ''

if article.get('title'):
articles.append(article)

return articles

def parse_detail(self, html: str) -> dict:
soup = BeautifulSoup(html, 'lxml')
detail = {}

content_elem = soup.select_one('.article-content')
if content_elem:
for tag in content_elem.select('script, style'):
tag.decompose()
detail['content'] = content_elem.get_text(strip=True)

author_elem = soup.select_one('.author')
detail['author'] = author_elem.get_text(strip=True) if author_elem else ''

return detail

def clean_text(self, text: str) -> str:
if not text:
return ''
text = re.sub(r'\s+', ' ', text)
return text.strip()

def save_to_csv(self, filename: str = 'news.csv'):
if not self.articles:
return

with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = ['title', 'url', 'summary', 'date', 'author', 'content']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()

for article in self.articles:
row = {k: self.clean_text(str(v)) for k, v in article.items() if k in fieldnames}
writer.writerow(row)

print(f'已保存 {len(self.articles)} 条新闻到 {filename}')

def run(self, pages: int = 5, fetch_detail: bool = False):
for page in range(1, pages + 1):
print(f'正在抓取第 {page} 页...')

try:
html = self.get_page(page)
articles = self.parse_list(html)

for article in articles:
if fetch_detail and article.get('url'):
time.sleep(random.uniform(0.5, 1.5))
detail_html = self.session.get(article['url'], timeout=10).text
detail = self.parse_detail(detail_html)
article.update(detail)

self.articles.append(article)

print(f'第 {page} 页完成,获取 {len(articles)} 条新闻')

except Exception as e:
print(f'第 {page} 页出错: {e}')

time.sleep(random.uniform(1, 3))

self.save_to_csv()

if __name__ == '__main__':
spider = NewsSpider()
spider.run(pages=3, fetch_detail=False)

关键点说明

  1. 列表页解析:提取新闻标题、链接、摘要等信息
  2. 详情页解析:可选,获取完整内容
  3. 文本清洗:去除多余空白字符
  4. 错误处理:捕获异常,避免程序中断
  5. 请求间隔:控制请求频率

案例2:电商商品爬虫

爬取电商网站的商品信息,包括名称、价格、评价等。

分析动态加载

很多电商网站使用 JavaScript 动态加载数据,需要找到 API 接口:

  1. 打开开发者工具,切换到 Network 标签
  2. 筛选 XHR/Fetch 请求
  3. 找到返回商品数据的 API
  4. 分析请求参数和响应格式

实现代码

import requests
import json
import time
import random
import csv
from typing import List, Dict

class ProductSpider:
def __init__(self):
self.api_url = 'https://api.example.com/products'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
'Referer': 'https://example.com/products',
}
self.session = requests.Session()
self.session.headers.update(self.headers)
self.products = []

def get_products(self, page: int, category: str = '') -> List[Dict]:
params = {
'page': page,
'size': 20,
'category': category,
}

response = self.session.get(self.api_url, params=params, timeout=10)
data = response.json()

return data.get('items', [])

def parse_product(self, item: Dict) -> Dict:
return {
'id': item.get('id'),
'name': item.get('name'),
'price': item.get('price'),
'original_price': item.get('originalPrice'),
'discount': item.get('discount', 0),
'rating': item.get('rating', 0),
'review_count': item.get('reviewCount', 0),
'sales': item.get('sales', 0),
'shop': item.get('shop', {}).get('name'),
'category': item.get('category'),
'url': f"https://example.com/product/{item.get('id')}",
}

def save_to_json(self, filename: str = 'products.json'):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.products, f, ensure_ascii=False, indent=2)
print(f'已保存 {len(self.products)} 个商品到 {filename}')

def save_to_csv(self, filename: str = 'products.csv'):
if not self.products:
return

with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = list(self.products[0].keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.products)

print(f'已保存 {len(self.products)} 个商品到 {filename}')

def run(self, pages: int = 10, categories: List[str] = None):
if categories is None:
categories = ['']

for category in categories:
print(f'正在爬取分类: {category or "全部"}')

for page in range(1, pages + 1):
print(f' 第 {page} 页...')

try:
items = self.get_products(page, category)

if not items:
print(f' 第 {page} 页没有数据,停止')
break

for item in items:
product = self.parse_product(item)
self.products.append(product)

print(f' 第 {page} 页完成,获取 {len(items)} 个商品')

except Exception as e:
print(f' 第 {page} 页出错: {e}')

time.sleep(random.uniform(0.5, 1.5))

self.save_to_json()
self.save_to_csv()

if __name__ == '__main__':
spider = ProductSpider()
spider.run(pages=5)

关键点说明

  1. API 分析:找到数据接口,直接请求 JSON 数据
  2. 数据解析:从 JSON 响应中提取需要的字段
  3. 分类爬取:支持按分类爬取商品
  4. 多格式保存:同时保存 JSON 和 CSV 格式

案例3:异步爬虫实战

使用异步爬虫高效爬取大量页面。

import asyncio
import aiohttp
import csv
import random
import time
from typing import List, Optional
from bs4 import BeautifulSoup

class AsyncSpider:
def __init__(
self,
max_concurrent: int = 10,
request_delay: float = 0.3,
timeout: int = 30,
max_retries: int = 3,
):
self.max_concurrent = max_concurrent
self.request_delay = request_delay
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.semaphore = asyncio.Semaphore(max_concurrent)

self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}

self.results = []

async def fetch(
self,
session: aiohttp.ClientSession,
url: str,
) -> Optional[str]:
async with self.semaphore:
await asyncio.sleep(random.uniform(0, self.request_delay))

for attempt in range(self.max_retries):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.text()
elif response.status in [429, 500, 502, 503, 504]:
wait = 2 ** attempt + random.random()
await asyncio.sleep(wait)
else:
return None
except asyncio.TimeoutError:
print(f'超时: {url}')
except Exception as e:
if attempt == self.max_retries - 1:
print(f'错误: {url} - {e}')
else:
await asyncio.sleep(2 ** attempt)

return None

def parse(self, html: str, url: str) -> dict:
soup = BeautifulSoup(html, 'lxml')

title_elem = soup.select_one('h1')
title = title_elem.get_text(strip=True) if title_elem else ''

content_elem = soup.select_one('.content')
content = content_elem.get_text(strip=True) if content_elem else ''

return {
'url': url,
'title': title,
'content': content[:500] if content else '',
}

async def crawl_page(
self,
session: aiohttp.ClientSession,
url: str,
):
html = await self.fetch(session, url)
if html:
result = self.parse(html, url)
self.results.append(result)

async def run(self, urls: List[str]):
async with aiohttp.ClientSession(
headers=self.headers,
timeout=self.timeout
) as session:
tasks = [self.crawl_page(session, url) for url in urls]
await asyncio.gather(*tasks)

def save_to_csv(self, filename: str = 'results.csv'):
if not self.results:
return

with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = list(self.results[0].keys())
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.results)

print(f'已保存 {len(self.results)} 条数据到 {filename}')

async def main():
spider = AsyncSpider(
max_concurrent=10,
request_delay=0.3,
timeout=30,
max_retries=3,
)

urls = [f'https://httpbin.org/delay/1?id={i}' for i in range(50)]

start = time.time()
await spider.run(urls)
elapsed = time.time() - start

print(f'完成: {len(spider.results)}/{len(urls)} 个请求')
print(f'耗时: {elapsed:.2f}秒')

spider.save_to_csv()

if __name__ == '__main__':
asyncio.run(main())

性能对比

同样的任务,同步和异步的性能差异:

请求数同步耗时异步耗时提升
1010秒1秒10倍
5050秒5秒10倍
100100秒10秒10倍

案例4:Scrapy 项目实战

使用 Scrapy 框架构建完整的爬虫项目。

创建项目

scrapy startproject myproject
cd myproject
scrapy genspider books example.com

定义 Item

# myproject/items.py
import scrapy

class BookItem(scrapy.Item):
title = scrapy.Field()
author = scrapy.Field()
price = scrapy.Field()
rating = scrapy.Field()
category = scrapy.Field()
url = scrapy.Field()

编写 Spider

# myproject/spiders/books.py
import scrapy
from myproject.items import BookItem

class BooksSpider(scrapy.Spider):
name = 'books'
allowed_domains = ['example.com']
start_urls = ['https://example.com/books']

def parse(self, response):
for book in response.css('.book-item'):
item = BookItem()
item['title'] = book.css('.title::text').get()
item['author'] = book.css('.author::text').get()
item['price'] = book.css('.price::text').get()
item['rating'] = book.css('.rating::text').get()
item['url'] = response.urljoin(book.css('a::attr(href)').get())
yield item

next_page = response.css('.next-page::attr(href)').get()
if next_page:
yield response.follow(next_page, self.parse)

编写 Pipeline

# myproject/pipelines.py
import csv
import json

class JsonPipeline:
def open_spider(self, spider):
self.file = open('books.jl', 'w', encoding='utf-8')

def close_spider(self, spider):
self.file.close()

def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False) + '\n'
self.file.write(line)
return item

class CsvPipeline:
def open_spider(self, spider):
self.file = open('books.csv', 'w', newline='', encoding='utf-8-sig')
self.writer = None

def close_spider(self, spider):
if self.writer:
self.file.close()

def process_item(self, item, spider):
if self.writer is None:
self.writer = csv.DictWriter(self.file, fieldnames=item.keys())
self.writer.writeheader()
self.writer.writerow(dict(item))
return item

class DuplicatesPipeline:
def __init__(self):
self.urls_seen = set()

def process_item(self, item, spider):
url = item.get('url')
if url in self.urls_seen:
raise scrapy.exceptions.DropItem(f'重复: {url}')
self.urls_seen.add(url)
return item

配置 Settings

# myproject/settings.py

BOT_NAME = 'myproject'

SPIDER_MODULES = ['myproject.spiders']
NEWSPIDER_MODULE = 'myproject.spiders'

ROBOTSTXT_OBEY = True

CONCURRENT_REQUESTS = 16
DOWNLOAD_DELAY = 1

DEFAULT_REQUEST_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
}

ITEM_PIPELINES = {
'myproject.pipelines.DuplicatesPipeline': 100,
'myproject.pipelines.JsonPipeline': 200,
'myproject.pipelines.CsvPipeline': 300,
}

运行爬虫

scrapy crawl books
scrapy crawl books -o books.json

爬虫最佳实践

1. 遵守 robots.txt

import urllib.robotparser

rp = urllib.robotparser.RobotFileParser()
rp.set_url('https://example.com/robots.txt')
rp.read()

can_fetch = rp.can_fetch('MyBot', 'https://example.com/page')

2. 设置合理的请求间隔

import time
import random

def get_with_delay(url, min_delay=1, max_delay=3):
time.sleep(random.uniform(min_delay, max_delay))
return requests.get(url)

3. 使用 Session 复用连接

session = requests.Session()
for url in urls:
response = session.get(url)

4. 实现断点续爬

import json
import os

def load_progress(filename):
if os.path.exists(filename):
with open(filename, 'r') as f:
return json.load(f)
return {'completed': [], 'failed': []}

def save_progress(filename, progress):
with open(filename, 'w') as f:
json.dump(progress, f)

def crawl_with_resume(urls):
progress = load_progress('progress.json')
completed = set(progress['completed'])

for url in urls:
if url in completed:
continue

try:
response = requests.get(url)
process(response)
completed.add(url)
except Exception as e:
progress['failed'].append({'url': url, 'error': str(e)})

progress['completed'] = list(completed)
save_progress('progress.json', progress)

5. 日志记录

import logging

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='spider.log'
)

logger = logging.getLogger(__name__)

logger.info('开始爬取')
logger.warning('请求失败,重试中')
logger.error('爬取出错')

小结

本章通过四个实战案例,综合运用了爬虫技术:

  1. 新闻网站爬虫:列表页和详情页解析
  2. 电商商品爬虫:API 接口分析
  3. 异步爬虫:高效并发爬取
  4. Scrapy 项目:框架化开发

同时学习了爬虫的最佳实践,包括遵守规则、控制频率、错误处理等。

练习

  1. 选择一个新闻网站,实现完整的新闻爬虫
  2. 分析一个电商网站的 API,爬取商品信息
  3. 使用异步爬虫爬取 100 个页面,对比同步爬虫的性能
  4. 使用 Scrapy 框架重构之前的爬虫项目