数据存储
爬取的数据需要保存到本地或数据库中。本章介绍常用的数据存储方式。
JSON 存储
JSON 是最常用的数据交换格式,适合存储结构化数据。
写入 JSON 文件
import json
data = [
{'name': 'Python编程', 'author': '张三', 'price': 59.9},
{'name': 'Java入门', 'author': '李四', 'price': 49.9},
{'name': 'Go语言实战', 'author': '王五', 'price': 69.9},
]
with open('books.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
参数说明:
ensure_ascii=False:保留中文字符,不转义为 Unicodeindent=2:美化输出,缩进 2 空格
读取 JSON 文件
import json
with open('books.json', 'r', encoding='utf-8') as f:
data = json.load(f)
for item in data:
print(f"{item['name']} - {item['author']}")
追加数据到 JSON
import json
import os
def append_to_json(filename, new_data):
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
data = []
data.append(new_data)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
append_to_json('books.json', {'name': '新书籍', 'author': '新作者', 'price': 39.9})
JSON Lines 格式
对于大量数据,推荐使用 JSON Lines 格式(每行一个 JSON 对象):
import json
data = [
{'id': 1, 'title': '文章1'},
{'id': 2, 'title': '文章2'},
{'id': 3, 'title': '文章3'},
]
with open('articles.jl', 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
with open('articles.jl', 'r', encoding='utf-8') as f:
for line in f:
item = json.loads(line)
print(item['title'])
JSON Lines 的优点:
- 可以逐行读取,不需要一次性加载全部数据
- 追加数据时不需要读取和重写整个文件
- 适合流式处理大数据
CSV 存储
CSV 是表格数据的通用格式,可以用 Excel 打开。
写入 CSV 文件
import csv
data = [
{'name': 'Python编程', 'author': '张三', 'price': 59.9},
{'name': 'Java入门', 'author': '李四', 'price': 49.9},
{'name': 'Go语言实战', 'author': '王五', 'price': 69.9},
]
with open('books.csv', 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = ['name', 'author', 'price']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
参数说明:
newline='':防止出现空行encoding='utf-8-sig':添加 BOM,让 Excel 正确识别中文
读取 CSV 文件
import csv
with open('books.csv', 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
print(f"{row['name']} - {row['author']} - {row['price']}")
追加数据到 CSV
import csv
import os
def append_to_csv(filename, new_data, fieldnames):
file_exists = os.path.exists(filename)
with open(filename, 'a', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerow(new_data)
append_to_csv('books.csv', {'name': '新书籍', 'author': '新作者', 'price': 39.9},
fieldnames=['name', 'author', 'price'])
处理特殊字符
CSV 中的特殊字符(逗号、引号、换行)需要正确处理:
import csv
data = [
{'name': 'Python, 编程', 'description': '学习"Python"的好书', 'price': 59.9},
{'name': 'Java入门\n进阶', 'description': '适合初学者', 'price': 49.9},
]
with open('books_special.csv', 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=['name', 'description', 'price'])
writer.writeheader()
writer.writerows(data)
csv 模块会自动处理这些特殊字符,将包含特殊字符的字段用引号包裹。
SQLite 存储
SQLite 是轻量级的嵌入式数据库,无需安装服务器,适合小型项目。
创建数据库和表
import sqlite3
conn = sqlite3.connect('spider.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
author TEXT,
price REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
插入数据
import sqlite3
conn = sqlite3.connect('spider.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (?, ?, ?)
''', ('Python编程', '张三', 59.9))
conn.commit()
conn.close()
使用参数化查询(? 占位符)防止 SQL 注入。
批量插入
import sqlite3
books = [
('Java入门', '李四', 49.9),
('Go语言实战', '王五', 69.9),
('Rust编程', '赵六', 79.9),
]
conn = sqlite3.connect('spider.db')
cursor = conn.cursor()
cursor.executemany('''
INSERT INTO books (name, author, price)
VALUES (?, ?, ?)
''', books)
conn.commit()
conn.close()
查询数据
import sqlite3
conn = sqlite3.connect('spider.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM books WHERE price > ?', (50,))
rows = cursor.fetchall()
for row in rows:
print(row)
conn.close()
使用上下文管理器
import sqlite3
class Database:
def __init__(self, db_path):
self.db_path = db_path
def __enter__(self):
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.conn.commit()
else:
self.conn.rollback()
self.conn.close()
def insert_book(self, name, author, price):
self.cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (?, ?, ?)
''', (name, author, price))
def get_books(self, min_price=0):
self.cursor.execute('SELECT * FROM books WHERE price > ?', (min_price,))
return self.cursor.fetchall()
with Database('spider.db') as db:
db.insert_book('新书籍', '新作者', 39.9)
books = db.get_books(50)
for book in books:
print(book)
MySQL 存储
对于大型项目,通常使用 MySQL 等数据库服务器。
安装驱动
pip install pymysql
连接 MySQL
import pymysql
conn = pymysql.connect(
host='localhost',
port=3306,
user='root',
password='password',
database='spider',
charset='utf8mb4'
)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
author VARCHAR(100),
price DECIMAL(10, 2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
插入数据
import pymysql
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='spider',
charset='utf8mb4'
)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (%s, %s, %s)
''', ('Python编程', '张三', 59.9))
conn.commit()
conn.close()
批量插入
import pymysql
books = [
('Java入门', '李四', 49.9),
('Go语言实战', '王五', 69.9),
]
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='spider',
charset='utf8mb4'
)
cursor = conn.cursor()
cursor.executemany('''
INSERT INTO books (name, author, price)
VALUES (%s, %s, %s)
''', books)
conn.commit()
print(f'插入了 {cursor.rowcount} 条记录')
conn.close()
使用连接池
import pymysql
from dbutils.pooled_db import PooledDB
pool = PooledDB(
creator=pymysql,
host='localhost',
user='root',
password='password',
database='spider',
charset='utf8mb4',
maxconnections=10,
mincached=2,
)
def save_book(book):
conn = pool.connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (%s, %s, %s)
''', (book['name'], book['author'], book['price']))
conn.commit()
conn.close()
save_book({'name': '新书籍', 'author': '新作者', 'price': 39.9})
MongoDB 存储
MongoDB 是文档型数据库,适合存储非结构化数据。
安装驱动
pip install pymongo
连接 MongoDB
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['spider']
collection = db['books']
插入文档
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['spider']
collection = db['books']
book = {
'name': 'Python编程',
'author': '张三',
'price': 59.9,
'tags': ['Python', '编程', '入门']
}
result = collection.insert_one(book)
print(f'插入的文档ID: {result.inserted_id}')
client.close()
批量插入
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']
books = [
{'name': 'Java入门', 'author': '李四', 'price': 49.9},
{'name': 'Go语言实战', 'author': '王五', 'price': 69.9},
{'name': 'Rust编程', 'author': '赵六', 'price': 79.9},
]
result = collection.insert_many(books)
print(f'插入了 {len(result.inserted_ids)} 条文档')
client.close()
查询文档
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']
for book in collection.find({'price': {'$gt': 50}}):
print(f"{book['name']} - {book['price']}")
client.close()
常用查询操作符:
$eq:等于$ne:不等于$gt:大于$gte:大于等于$lt:小于$lte:小于等于$in:在列表中$nin:不在列表中
更新文档
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']
result = collection.update_one(
{'name': 'Python编程'},
{'$set': {'price': 55.9}}
)
print(f'修改了 {result.modified_count} 条文档')
client.close()
创建索引
from pymongo import MongoClient, ASCENDING, DESCENDING
client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']
collection.create_index([('name', ASCENDING)], unique=True)
collection.create_index([('author', ASCENDING), ('price', DESCENDING)])
client.close()
存储方案选择
选择合适的数据存储方案是爬虫项目的重要决策。不同的存储方式有不同的特点和适用场景。
存储方案对比
| 存储方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| JSON | 简单易用,可读性好,跨语言支持 | 大文件性能差,不支持随机访问 | 小型数据,配置文件,API 数据 |
| JSON Lines | 支持流式处理,可追加写入 | 不能随机访问,需要逐行解析 | 大量数据,日志,增量爬取 |
| CSV | Excel 兼容,表格形式直观 | 不支持嵌套结构,编码问题 | 表格数据,报表,数据分析 |
| SQLite | 无需服务器,单文件,ACID 事务 | 并发写入能力有限,单机存储 | 小型应用,本地存储,嵌入式场景 |
| MySQL | 功能强大,社区活跃,高性能 | 需要服务器,运维成本 | 中大型应用,需要事务支持 |
| PostgreSQL | 功能最全,扩展性强,JSON 支持 | 配置复杂,学习曲线较陡 | 复杂查询,地理信息,企业应用 |
| MongoDB | 灵活,适合非结构化数据,高性能 | 占用内存大,无事务(部分支持) | 非结构化数据,快速迭代,日志 |
选择决策流程
选择存储方案时,可以按照以下思路进行决策:
1. 数据量评估
数据量 < 1GB → JSON / SQLite / CSV
数据量 1-10GB → SQLite / MySQL / PostgreSQL
数据量 > 10GB → MySQL / PostgreSQL / MongoDB
2. 数据结构分析
结构化数据(固定字段) → 关系型数据库(MySQL/PostgreSQL/SQLite)
半结构化数据(JSON) → MongoDB / PostgreSQL(JSONB)
非结构化数据(文本) → 文件存储 + Elasticsearch
3. 查询需求
简单查询(按 ID 查找) → 任意存储
复杂查询(多表关联) → 关系型数据库
全文搜索 → Elasticsearch / PostgreSQL 全文搜索
聚合分析 → MongoDB / PostgreSQL
4. 并发需求
单线程爬取 → SQLite / JSON 文件
多进程爬取 → MySQL / PostgreSQL / MongoDB
高并发写入 → MongoDB / PostgreSQL
混合存储策略
实际项目中,往往需要组合使用多种存储方式:
import json
import sqlite3
from datetime import datetime
from typing import Dict, Any, Optional
class HybridStorage:
"""
混合存储策略示例
- 原始数据:存储到 JSON Lines 文件(便于备份和重新处理)
- 结构化数据:存储到 SQLite(便于查询)
- 临时状态:存储到内存字典(快速访问)
"""
def __init__(self, jsonl_path: str, sqlite_path: str):
self.jsonl_path = jsonl_path
self.sqlite_path = sqlite_path
self.cache: Dict[str, Any] = {}
self._init_sqlite()
def _init_sqlite(self):
"""初始化 SQLite 数据库"""
self.conn = sqlite3.connect(self.sqlite_path)
self.conn.execute('''
CREATE TABLE IF NOT EXISTS items (
id TEXT PRIMARY KEY,
title TEXT,
url TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
async def save(self, item: Dict[str, Any]):
"""保存数据到多个存储"""
item_id = item.get('id')
if not item_id:
return
# 1. 追加到 JSON Lines(原始数据)
with open(self.jsonl_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
# 2. 存储到 SQLite(结构化数据)
self.conn.execute('''
INSERT OR REPLACE INTO items (id, title, url, updated_at)
VALUES (?, ?, ?, ?)
''', (item_id, item.get('title'), item.get('url'), datetime.now()))
self.conn.commit()
# 3. 更新内存缓存
self.cache[item_id] = item
def get(self, item_id: str) -> Optional[Dict[str, Any]]:
"""获取数据(优先从缓存读取)"""
# 先查缓存
if item_id in self.cache:
return self.cache[item_id]
# 再查数据库
cursor = self.conn.execute(
'SELECT * FROM items WHERE id = ?', (item_id,)
)
row = cursor.fetchone()
if row:
return {
'id': row[0],
'title': row[1],
'url': row[2],
}
return None
def close(self):
"""关闭连接"""
self.conn.close()
存储最佳实践
数据去重
在爬虫中,数据去重是避免重复存储的关键:
import sqlite3
import hashlib
from typing import Dict, Any, Optional
class DedupStorage:
"""带去重功能的存储"""
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self._init_db()
self.seen_urls = set()
# 启动时加载已有 URL
self._load_seen()
def _init_db(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS items (
id TEXT PRIMARY KEY,
url TEXT UNIQUE,
content_hash TEXT,
data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.execute('CREATE INDEX IF NOT EXISTS idx_url ON items(url)')
self.conn.execute('CREATE INDEX IF NOT EXISTS idx_hash ON items(content_hash)')
self.conn.commit()
def _load_seen(self):
"""加载已有的 URL"""
cursor = self.conn.execute('SELECT url FROM items')
self.seen_urls = {row[0] for row in cursor.fetchall()}
def _compute_hash(self, data: Dict[str, Any]) -> str:
"""计算数据哈希"""
content = json.dumps(data, sort_keys=True, ensure_ascii=False)
return hashlib.md5(content.encode()).hexdigest()
def is_duplicate(self, url: str, data: Dict[str, Any] = None) -> bool:
"""检查是否重复"""
if url in self.seen_urls:
return True
if data:
content_hash = self._compute_hash(data)
cursor = self.conn.execute(
'SELECT 1 FROM items WHERE content_hash = ?', (content_hash,)
)
if cursor.fetchone():
return True
return False
def save(self, url: str, data: Dict[str, Any]) -> bool:
"""保存数据(自动去重)"""
if self.is_duplicate(url, data):
return False
content_hash = self._compute_hash(data)
item_id = hashlib.sha256(url.encode()).hexdigest()[:16]
self.conn.execute('''
INSERT INTO items (id, url, content_hash, data)
VALUES (?, ?, ?, ?)
''', (item_id, url, content_hash, json.dumps(data, ensure_ascii=False)))
self.conn.commit()
self.seen_urls.add(url)
return True
def close(self):
self.conn.close()
批量写入优化
批量写入比单条插入效率高很多:
import sqlite3
import time
from typing import List, Dict, Any
class BatchStorage:
"""支持批量写入的存储"""
def __init__(self, db_path: str, batch_size: int = 100):
self.conn = sqlite3.connect(db_path)
self.batch_size = batch_size
self.buffer: List[tuple] = []
self._init_db()
def _init_db(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
url TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def add(self, title: str, url: str):
"""添加数据到缓冲区"""
self.buffer.append((title, url))
if len(self.buffer) >= self.batch_size:
self.flush()
def flush(self):
"""将缓冲区数据写入数据库"""
if not self.buffer:
return
self.conn.executemany(
'INSERT INTO items (title, url) VALUES (?, ?)',
self.buffer
)
self.conn.commit()
self.buffer.clear()
def close(self):
self.flush() # 确保写入剩余数据
self.conn.close()
# 性能对比示例
def benchmark():
"""批量写入 vs 单条插入"""
db_path = ':memory:'
# 单条插入
conn = sqlite3.connect(db_path)
conn.execute('CREATE TABLE test1 (id INTEGER, value TEXT)')
start = time.time()
for i in range(1000):
conn.execute('INSERT INTO test1 VALUES (?, ?)', (i, f'value_{i}'))
conn.commit()
print(f'单条插入: {time.time() - start:.3f}s')
conn.close()
# 批量插入
conn = sqlite3.connect(db_path)
conn.execute('CREATE TABLE test2 (id INTEGER, value TEXT)')
start = time.time()
conn.executemany(
'INSERT INTO test2 VALUES (?, ?)',
[(i, f'value_{i}') for i in range(1000)]
)
conn.commit()
print(f'批量插入: {time.time() - start:.3f}s')
conn.close()
# 结果:
# 单条插入: 0.15s
# 批量插入: 0.005s
数据压缩
对于大量文本数据,压缩可以显著减少存储空间:
import gzip
import json
from typing import Any
def save_compressed(data: Any, filepath: str):
"""保存压缩的 JSON 数据"""
with gzip.open(filepath, 'wt', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False)
def load_compressed(filepath: str) -> Any:
"""读取压缩的 JSON 数据"""
with gzip.open(filepath, 'rt', encoding='utf-8') as f:
return json.load(f)
# 使用示例
data = [{'id': i, 'content': '很长很长的文本...' * 100} for i in range(1000)]
save_compressed(data, 'data.json.gz')
loaded = load_compressed('data.json.gz')
# 压缩效果:
# 原始 JSON: 10MB
# 压缩后: 500KB (压缩率 95%)
数据备份策略
import sqlite3
import shutil
from datetime import datetime
from pathlib import Path
class BackupManager:
"""数据库备份管理"""
def __init__(self, db_path: str, backup_dir: str = 'backups'):
self.db_path = db_path
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(exist_ok=True)
def backup(self, name: str = None):
"""创建备份"""
if name is None:
name = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = self.backup_dir / f'{name}.db'
shutil.copy2(self.db_path, backup_path)
return backup_path
def backup_sqlite(self, name: str = None):
"""使用 SQLite 的备份 API(更安全)"""
if name is None:
name = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = self.backup_dir / f'{name}.db'
source = sqlite3.connect(self.db_path)
dest = sqlite3.connect(str(backup_path))
source.backup(dest)
dest.close()
source.close()
return backup_path
def rotate(self, keep: int = 7):
"""轮转备份,只保留最近 N 个"""
backups = sorted(self.backup_dir.glob('*.db'))
while len(backups) > keep:
backups[0].unlink()
backups = backups[1:]
def restore(self, backup_name: str):
"""从备份恢复"""
backup_path = self.backup_dir / backup_name
if not backup_path.exists():
raise FileNotFoundError(f'备份不存在: {backup_name}')
shutil.copy2(backup_path, self.db_path)
完整示例
import csv
import json
import sqlite3
from typing import List, Dict
class DataStorage:
def __init__(self, output_dir: str = 'output'):
self.output_dir = output_dir
def save_to_json(self, data: List[Dict], filename: str):
filepath = f'{self.output_dir}/{filename}.json'
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f'已保存 {len(data)} 条数据到 {filepath}')
def save_to_csv(self, data: List[Dict], filename: str):
if not data:
return
filepath = f'{self.output_dir}/{filename}.csv'
fieldnames = list(data[0].keys())
with open(filepath, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f'已保存 {len(data)} 条数据到 {filepath}')
def save_to_sqlite(self, data: List[Dict], filename: str, table: str):
if not data:
return
filepath = f'{self.output_dir}/{filename}.db'
conn = sqlite3.connect(filepath)
cursor = conn.cursor()
columns = list(data[0].keys())
columns_def = ', '.join([
f'{col} TEXT' if col != 'id' else f'{col} INTEGER PRIMARY KEY'
for col in columns
])
cursor.execute(f'CREATE TABLE IF NOT EXISTS {table} ({columns_def})')
placeholders = ', '.join(['?' for _ in columns])
cursor.executemany(
f'INSERT INTO {table} ({", ".join(columns)}) VALUES ({placeholders})',
[tuple(item.values()) for item in data]
)
conn.commit()
conn.close()
print(f'已保存 {len(data)} 条数据到 {filepath}')
if __name__ == '__main__':
import os
os.makedirs('output', exist_ok=True)
storage = DataStorage()
books = [
{'id': 1, 'name': 'Python编程', 'author': '张三', 'price': '59.9'},
{'id': 2, 'name': 'Java入门', 'author': '李四', 'price': '49.9'},
{'id': 3, 'name': 'Go语言实战', 'author': '王五', 'price': '69.9'},
]
storage.save_to_json(books, 'books')
storage.save_to_csv(books, 'books')
storage.save_to_sqlite(books, 'books', 'books')
小结
本章我们学习了:
- JSON 存储 - 结构化数据的通用格式
- CSV 存储 - 表格数据,Excel 兼容
- SQLite 存储 - 轻量级嵌入式数据库
- MySQL 存储 - 关系型数据库服务器
- MongoDB 存储 - 文档型数据库
- 存储方案选择 - 根据场景选择合适的存储方式
练习
- 将爬取的新闻数据保存为 JSON 和 CSV 两种格式
- 使用 SQLite 存储爬取的商品信息,并实现去重
- 实现一个支持多种存储方式的数据保存类