跳到主要内容

数据存储

爬取的数据需要保存到本地或数据库中。本章介绍常用的数据存储方式。

JSON 存储

JSON 是最常用的数据交换格式,适合存储结构化数据。

写入 JSON 文件

import json

data = [
{'name': 'Python编程', 'author': '张三', 'price': 59.9},
{'name': 'Java入门', 'author': '李四', 'price': 49.9},
{'name': 'Go语言实战', 'author': '王五', 'price': 69.9},
]

with open('books.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)

参数说明:

  • ensure_ascii=False:保留中文字符,不转义为 Unicode
  • indent=2:美化输出,缩进 2 空格

读取 JSON 文件

import json

with open('books.json', 'r', encoding='utf-8') as f:
data = json.load(f)

for item in data:
print(f"{item['name']} - {item['author']}")

追加数据到 JSON

import json
import os

def append_to_json(filename, new_data):
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
data = []

data.append(new_data)

with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)

append_to_json('books.json', {'name': '新书籍', 'author': '新作者', 'price': 39.9})

JSON Lines 格式

对于大量数据,推荐使用 JSON Lines 格式(每行一个 JSON 对象):

import json

data = [
{'id': 1, 'title': '文章1'},
{'id': 2, 'title': '文章2'},
{'id': 3, 'title': '文章3'},
]

with open('articles.jl', 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')

with open('articles.jl', 'r', encoding='utf-8') as f:
for line in f:
item = json.loads(line)
print(item['title'])

JSON Lines 的优点:

  • 可以逐行读取,不需要一次性加载全部数据
  • 追加数据时不需要读取和重写整个文件
  • 适合流式处理大数据

CSV 存储

CSV 是表格数据的通用格式,可以用 Excel 打开。

写入 CSV 文件

import csv

data = [
{'name': 'Python编程', 'author': '张三', 'price': 59.9},
{'name': 'Java入门', 'author': '李四', 'price': 49.9},
{'name': 'Go语言实战', 'author': '王五', 'price': 69.9},
]

with open('books.csv', 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = ['name', 'author', 'price']
writer = csv.DictWriter(f, fieldnames=fieldnames)

writer.writeheader()
writer.writerows(data)

参数说明:

  • newline='':防止出现空行
  • encoding='utf-8-sig':添加 BOM,让 Excel 正确识别中文

读取 CSV 文件

import csv

with open('books.csv', 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
print(f"{row['name']} - {row['author']} - {row['price']}")

追加数据到 CSV

import csv
import os

def append_to_csv(filename, new_data, fieldnames):
file_exists = os.path.exists(filename)

with open(filename, 'a', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)

if not file_exists:
writer.writeheader()

writer.writerow(new_data)

append_to_csv('books.csv', {'name': '新书籍', 'author': '新作者', 'price': 39.9},
fieldnames=['name', 'author', 'price'])

处理特殊字符

CSV 中的特殊字符(逗号、引号、换行)需要正确处理:

import csv

data = [
{'name': 'Python, 编程', 'description': '学习"Python"的好书', 'price': 59.9},
{'name': 'Java入门\n进阶', 'description': '适合初学者', 'price': 49.9},
]

with open('books_special.csv', 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=['name', 'description', 'price'])
writer.writeheader()
writer.writerows(data)

csv 模块会自动处理这些特殊字符,将包含特殊字符的字段用引号包裹。

SQLite 存储

SQLite 是轻量级的嵌入式数据库,无需安装服务器,适合小型项目。

创建数据库和表

import sqlite3

conn = sqlite3.connect('spider.db')
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
author TEXT,
price REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')

conn.commit()
conn.close()

插入数据

import sqlite3

conn = sqlite3.connect('spider.db')
cursor = conn.cursor()

cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (?, ?, ?)
''', ('Python编程', '张三', 59.9))

conn.commit()
conn.close()

使用参数化查询(? 占位符)防止 SQL 注入。

批量插入

import sqlite3

books = [
('Java入门', '李四', 49.9),
('Go语言实战', '王五', 69.9),
('Rust编程', '赵六', 79.9),
]

conn = sqlite3.connect('spider.db')
cursor = conn.cursor()

cursor.executemany('''
INSERT INTO books (name, author, price)
VALUES (?, ?, ?)
''', books)

conn.commit()
conn.close()

查询数据

import sqlite3

conn = sqlite3.connect('spider.db')
cursor = conn.cursor()

cursor.execute('SELECT * FROM books WHERE price > ?', (50,))
rows = cursor.fetchall()

for row in rows:
print(row)

conn.close()

使用上下文管理器

import sqlite3

class Database:
def __init__(self, db_path):
self.db_path = db_path

def __enter__(self):
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.conn.commit()
else:
self.conn.rollback()
self.conn.close()

def insert_book(self, name, author, price):
self.cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (?, ?, ?)
''', (name, author, price))

def get_books(self, min_price=0):
self.cursor.execute('SELECT * FROM books WHERE price > ?', (min_price,))
return self.cursor.fetchall()

with Database('spider.db') as db:
db.insert_book('新书籍', '新作者', 39.9)
books = db.get_books(50)
for book in books:
print(book)

MySQL 存储

对于大型项目,通常使用 MySQL 等数据库服务器。

安装驱动

pip install pymysql

连接 MySQL

import pymysql

conn = pymysql.connect(
host='localhost',
port=3306,
user='root',
password='password',
database='spider',
charset='utf8mb4'
)

cursor = conn.cursor()

cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
author VARCHAR(100),
price DECIMAL(10, 2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')

conn.commit()
conn.close()

插入数据

import pymysql

conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='spider',
charset='utf8mb4'
)

cursor = conn.cursor()

cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (%s, %s, %s)
''', ('Python编程', '张三', 59.9))

conn.commit()
conn.close()

批量插入

import pymysql

books = [
('Java入门', '李四', 49.9),
('Go语言实战', '王五', 69.9),
]

conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='spider',
charset='utf8mb4'
)

cursor = conn.cursor()

cursor.executemany('''
INSERT INTO books (name, author, price)
VALUES (%s, %s, %s)
''', books)

conn.commit()
print(f'插入了 {cursor.rowcount} 条记录')
conn.close()

使用连接池

import pymysql
from dbutils.pooled_db import PooledDB

pool = PooledDB(
creator=pymysql,
host='localhost',
user='root',
password='password',
database='spider',
charset='utf8mb4',
maxconnections=10,
mincached=2,
)

def save_book(book):
conn = pool.connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (%s, %s, %s)
''', (book['name'], book['author'], book['price']))
conn.commit()
conn.close()

save_book({'name': '新书籍', 'author': '新作者', 'price': 39.9})

MongoDB 存储

MongoDB 是文档型数据库,适合存储非结构化数据。

安装驱动

pip install pymongo

连接 MongoDB

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['spider']
collection = db['books']

插入文档

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['spider']
collection = db['books']

book = {
'name': 'Python编程',
'author': '张三',
'price': 59.9,
'tags': ['Python', '编程', '入门']
}

result = collection.insert_one(book)
print(f'插入的文档ID: {result.inserted_id}')

client.close()

批量插入

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']

books = [
{'name': 'Java入门', 'author': '李四', 'price': 49.9},
{'name': 'Go语言实战', 'author': '王五', 'price': 69.9},
{'name': 'Rust编程', 'author': '赵六', 'price': 79.9},
]

result = collection.insert_many(books)
print(f'插入了 {len(result.inserted_ids)} 条文档')

client.close()

查询文档

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']

for book in collection.find({'price': {'$gt': 50}}):
print(f"{book['name']} - {book['price']}")

client.close()

常用查询操作符:

  • $eq:等于
  • $ne:不等于
  • $gt:大于
  • $gte:大于等于
  • $lt:小于
  • $lte:小于等于
  • $in:在列表中
  • $nin:不在列表中

更新文档

from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']

result = collection.update_one(
{'name': 'Python编程'},
{'$set': {'price': 55.9}}
)
print(f'修改了 {result.modified_count} 条文档')

client.close()

创建索引

from pymongo import MongoClient, ASCENDING, DESCENDING

client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']

collection.create_index([('name', ASCENDING)], unique=True)
collection.create_index([('author', ASCENDING), ('price', DESCENDING)])

client.close()

存储方案选择

选择合适的数据存储方案是爬虫项目的重要决策。不同的存储方式有不同的特点和适用场景。

存储方案对比

存储方式优点缺点适用场景
JSON简单易用,可读性好,跨语言支持大文件性能差,不支持随机访问小型数据,配置文件,API 数据
JSON Lines支持流式处理,可追加写入不能随机访问,需要逐行解析大量数据,日志,增量爬取
CSVExcel 兼容,表格形式直观不支持嵌套结构,编码问题表格数据,报表,数据分析
SQLite无需服务器,单文件,ACID 事务并发写入能力有限,单机存储小型应用,本地存储,嵌入式场景
MySQL功能强大,社区活跃,高性能需要服务器,运维成本中大型应用,需要事务支持
PostgreSQL功能最全,扩展性强,JSON 支持配置复杂,学习曲线较陡复杂查询,地理信息,企业应用
MongoDB灵活,适合非结构化数据,高性能占用内存大,无事务(部分支持)非结构化数据,快速迭代,日志

选择决策流程

选择存储方案时,可以按照以下思路进行决策:

1. 数据量评估

数据量 < 1GB    → JSON / SQLite / CSV
数据量 1-10GB → SQLite / MySQL / PostgreSQL
数据量 > 10GB → MySQL / PostgreSQL / MongoDB

2. 数据结构分析

结构化数据(固定字段)  → 关系型数据库(MySQL/PostgreSQL/SQLite)
半结构化数据(JSON) → MongoDB / PostgreSQL(JSONB)
非结构化数据(文本) → 文件存储 + Elasticsearch

3. 查询需求

简单查询(按 ID 查找)     → 任意存储
复杂查询(多表关联) → 关系型数据库
全文搜索 → Elasticsearch / PostgreSQL 全文搜索
聚合分析 → MongoDB / PostgreSQL

4. 并发需求

单线程爬取     → SQLite / JSON 文件
多进程爬取 → MySQL / PostgreSQL / MongoDB
高并发写入 → MongoDB / PostgreSQL

混合存储策略

实际项目中,往往需要组合使用多种存储方式:

import json
import sqlite3
from datetime import datetime
from typing import Dict, Any, Optional

class HybridStorage:
"""
混合存储策略示例

- 原始数据:存储到 JSON Lines 文件(便于备份和重新处理)
- 结构化数据:存储到 SQLite(便于查询)
- 临时状态:存储到内存字典(快速访问)
"""

def __init__(self, jsonl_path: str, sqlite_path: str):
self.jsonl_path = jsonl_path
self.sqlite_path = sqlite_path
self.cache: Dict[str, Any] = {}
self._init_sqlite()

def _init_sqlite(self):
"""初始化 SQLite 数据库"""
self.conn = sqlite3.connect(self.sqlite_path)
self.conn.execute('''
CREATE TABLE IF NOT EXISTS items (
id TEXT PRIMARY KEY,
title TEXT,
url TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()

async def save(self, item: Dict[str, Any]):
"""保存数据到多个存储"""
item_id = item.get('id')
if not item_id:
return

# 1. 追加到 JSON Lines(原始数据)
with open(self.jsonl_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(item, ensure_ascii=False) + '\n')

# 2. 存储到 SQLite(结构化数据)
self.conn.execute('''
INSERT OR REPLACE INTO items (id, title, url, updated_at)
VALUES (?, ?, ?, ?)
''', (item_id, item.get('title'), item.get('url'), datetime.now()))
self.conn.commit()

# 3. 更新内存缓存
self.cache[item_id] = item

def get(self, item_id: str) -> Optional[Dict[str, Any]]:
"""获取数据(优先从缓存读取)"""
# 先查缓存
if item_id in self.cache:
return self.cache[item_id]

# 再查数据库
cursor = self.conn.execute(
'SELECT * FROM items WHERE id = ?', (item_id,)
)
row = cursor.fetchone()
if row:
return {
'id': row[0],
'title': row[1],
'url': row[2],
}
return None

def close(self):
"""关闭连接"""
self.conn.close()

存储最佳实践

数据去重

在爬虫中,数据去重是避免重复存储的关键:

import sqlite3
import hashlib
from typing import Dict, Any, Optional

class DedupStorage:
"""带去重功能的存储"""

def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self._init_db()
self.seen_urls = set()
# 启动时加载已有 URL
self._load_seen()

def _init_db(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS items (
id TEXT PRIMARY KEY,
url TEXT UNIQUE,
content_hash TEXT,
data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.execute('CREATE INDEX IF NOT EXISTS idx_url ON items(url)')
self.conn.execute('CREATE INDEX IF NOT EXISTS idx_hash ON items(content_hash)')
self.conn.commit()

def _load_seen(self):
"""加载已有的 URL"""
cursor = self.conn.execute('SELECT url FROM items')
self.seen_urls = {row[0] for row in cursor.fetchall()}

def _compute_hash(self, data: Dict[str, Any]) -> str:
"""计算数据哈希"""
content = json.dumps(data, sort_keys=True, ensure_ascii=False)
return hashlib.md5(content.encode()).hexdigest()

def is_duplicate(self, url: str, data: Dict[str, Any] = None) -> bool:
"""检查是否重复"""
if url in self.seen_urls:
return True

if data:
content_hash = self._compute_hash(data)
cursor = self.conn.execute(
'SELECT 1 FROM items WHERE content_hash = ?', (content_hash,)
)
if cursor.fetchone():
return True

return False

def save(self, url: str, data: Dict[str, Any]) -> bool:
"""保存数据(自动去重)"""
if self.is_duplicate(url, data):
return False

content_hash = self._compute_hash(data)
item_id = hashlib.sha256(url.encode()).hexdigest()[:16]

self.conn.execute('''
INSERT INTO items (id, url, content_hash, data)
VALUES (?, ?, ?, ?)
''', (item_id, url, content_hash, json.dumps(data, ensure_ascii=False)))
self.conn.commit()

self.seen_urls.add(url)
return True

def close(self):
self.conn.close()

批量写入优化

批量写入比单条插入效率高很多:

import sqlite3
import time
from typing import List, Dict, Any

class BatchStorage:
"""支持批量写入的存储"""

def __init__(self, db_path: str, batch_size: int = 100):
self.conn = sqlite3.connect(db_path)
self.batch_size = batch_size
self.buffer: List[tuple] = []
self._init_db()

def _init_db(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
url TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()

def add(self, title: str, url: str):
"""添加数据到缓冲区"""
self.buffer.append((title, url))

if len(self.buffer) >= self.batch_size:
self.flush()

def flush(self):
"""将缓冲区数据写入数据库"""
if not self.buffer:
return

self.conn.executemany(
'INSERT INTO items (title, url) VALUES (?, ?)',
self.buffer
)
self.conn.commit()
self.buffer.clear()

def close(self):
self.flush() # 确保写入剩余数据
self.conn.close()

# 性能对比示例
def benchmark():
"""批量写入 vs 单条插入"""
db_path = ':memory:'

# 单条插入
conn = sqlite3.connect(db_path)
conn.execute('CREATE TABLE test1 (id INTEGER, value TEXT)')
start = time.time()
for i in range(1000):
conn.execute('INSERT INTO test1 VALUES (?, ?)', (i, f'value_{i}'))
conn.commit()
print(f'单条插入: {time.time() - start:.3f}s')
conn.close()

# 批量插入
conn = sqlite3.connect(db_path)
conn.execute('CREATE TABLE test2 (id INTEGER, value TEXT)')
start = time.time()
conn.executemany(
'INSERT INTO test2 VALUES (?, ?)',
[(i, f'value_{i}') for i in range(1000)]
)
conn.commit()
print(f'批量插入: {time.time() - start:.3f}s')
conn.close()

# 结果:
# 单条插入: 0.15s
# 批量插入: 0.005s

数据压缩

对于大量文本数据,压缩可以显著减少存储空间:

import gzip
import json
from typing import Any

def save_compressed(data: Any, filepath: str):
"""保存压缩的 JSON 数据"""
with gzip.open(filepath, 'wt', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False)

def load_compressed(filepath: str) -> Any:
"""读取压缩的 JSON 数据"""
with gzip.open(filepath, 'rt', encoding='utf-8') as f:
return json.load(f)

# 使用示例
data = [{'id': i, 'content': '很长很长的文本...' * 100} for i in range(1000)]

save_compressed(data, 'data.json.gz')
loaded = load_compressed('data.json.gz')

# 压缩效果:
# 原始 JSON: 10MB
# 压缩后: 500KB (压缩率 95%)

数据备份策略

import sqlite3
import shutil
from datetime import datetime
from pathlib import Path

class BackupManager:
"""数据库备份管理"""

def __init__(self, db_path: str, backup_dir: str = 'backups'):
self.db_path = db_path
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(exist_ok=True)

def backup(self, name: str = None):
"""创建备份"""
if name is None:
name = datetime.now().strftime('%Y%m%d_%H%M%S')

backup_path = self.backup_dir / f'{name}.db'
shutil.copy2(self.db_path, backup_path)
return backup_path

def backup_sqlite(self, name: str = None):
"""使用 SQLite 的备份 API(更安全)"""
if name is None:
name = datetime.now().strftime('%Y%m%d_%H%M%S')

backup_path = self.backup_dir / f'{name}.db'

source = sqlite3.connect(self.db_path)
dest = sqlite3.connect(str(backup_path))
source.backup(dest)

dest.close()
source.close()

return backup_path

def rotate(self, keep: int = 7):
"""轮转备份,只保留最近 N 个"""
backups = sorted(self.backup_dir.glob('*.db'))

while len(backups) > keep:
backups[0].unlink()
backups = backups[1:]

def restore(self, backup_name: str):
"""从备份恢复"""
backup_path = self.backup_dir / backup_name
if not backup_path.exists():
raise FileNotFoundError(f'备份不存在: {backup_name}')

shutil.copy2(backup_path, self.db_path)

完整示例

import csv
import json
import sqlite3
from typing import List, Dict

class DataStorage:
def __init__(self, output_dir: str = 'output'):
self.output_dir = output_dir

def save_to_json(self, data: List[Dict], filename: str):
filepath = f'{self.output_dir}/{filename}.json'
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f'已保存 {len(data)} 条数据到 {filepath}')

def save_to_csv(self, data: List[Dict], filename: str):
if not data:
return

filepath = f'{self.output_dir}/{filename}.csv'
fieldnames = list(data[0].keys())

with open(filepath, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)

print(f'已保存 {len(data)} 条数据到 {filepath}')

def save_to_sqlite(self, data: List[Dict], filename: str, table: str):
if not data:
return

filepath = f'{self.output_dir}/{filename}.db'
conn = sqlite3.connect(filepath)
cursor = conn.cursor()

columns = list(data[0].keys())
columns_def = ', '.join([
f'{col} TEXT' if col != 'id' else f'{col} INTEGER PRIMARY KEY'
for col in columns
])

cursor.execute(f'CREATE TABLE IF NOT EXISTS {table} ({columns_def})')

placeholders = ', '.join(['?' for _ in columns])
cursor.executemany(
f'INSERT INTO {table} ({", ".join(columns)}) VALUES ({placeholders})',
[tuple(item.values()) for item in data]
)

conn.commit()
conn.close()
print(f'已保存 {len(data)} 条数据到 {filepath}')

if __name__ == '__main__':
import os
os.makedirs('output', exist_ok=True)

storage = DataStorage()

books = [
{'id': 1, 'name': 'Python编程', 'author': '张三', 'price': '59.9'},
{'id': 2, 'name': 'Java入门', 'author': '李四', 'price': '49.9'},
{'id': 3, 'name': 'Go语言实战', 'author': '王五', 'price': '69.9'},
]

storage.save_to_json(books, 'books')
storage.save_to_csv(books, 'books')
storage.save_to_sqlite(books, 'books', 'books')

小结

本章我们学习了:

  1. JSON 存储 - 结构化数据的通用格式
  2. CSV 存储 - 表格数据,Excel 兼容
  3. SQLite 存储 - 轻量级嵌入式数据库
  4. MySQL 存储 - 关系型数据库服务器
  5. MongoDB 存储 - 文档型数据库
  6. 存储方案选择 - 根据场景选择合适的存储方式

练习

  1. 将爬取的新闻数据保存为 JSON 和 CSV 两种格式
  2. 使用 SQLite 存储爬取的商品信息,并实现去重
  3. 实现一个支持多种存储方式的数据保存类