数据存储
爬取的数据需要保存到本地或数据库中。本章介绍常用的数据存储方式。
JSON 存储
JSON 是最常用的数据交换格式,适合存储结构化数据。
写入 JSON 文件
import json
data = [
{'name': 'Python编程', 'author': '张三', 'price': 59.9},
{'name': 'Java入门', 'author': '李四', 'price': 49.9},
{'name': 'Go语言实战', 'author': '王五', 'price': 69.9},
]
with open('books.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
参数说明:
ensure_ascii=False:保留中文字符,不转义为 Unicodeindent=2:美化输出,缩进 2 空格
读取 JSON 文件
import json
with open('books.json', 'r', encoding='utf-8') as f:
data = json.load(f)
for item in data:
print(f"{item['name']} - {item['author']}")
追加数据到 JSON
import json
import os
def append_to_json(filename, new_data):
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as f:
data = json.load(f)
else:
data = []
data.append(new_data)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
append_to_json('books.json', {'name': '新书籍', 'author': '新作者', 'price': 39.9})
JSON Lines 格式
对于大量数据,推荐使用 JSON Lines 格式(每行一个 JSON 对象):
import json
data = [
{'id': 1, 'title': '文章1'},
{'id': 2, 'title': '文章2'},
{'id': 3, 'title': '文章3'},
]
with open('articles.jl', 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
with open('articles.jl', 'r', encoding='utf-8') as f:
for line in f:
item = json.loads(line)
print(item['title'])
JSON Lines 的优点:
- 可以逐行读取,不需要一次性加载全部数据
- 追加数据时不需要读取和重写整个文件
- 适合流式处理大数据
CSV 存储
CSV 是表格数据的通用格式,可以用 Excel 打开。
写入 CSV 文件
import csv
data = [
{'name': 'Python编程', 'author': '张三', 'price': 59.9},
{'name': 'Java入门', 'author': '李四', 'price': 49.9},
{'name': 'Go语言实战', 'author': '王五', 'price': 69.9},
]
with open('books.csv', 'w', newline='', encoding='utf-8-sig') as f:
fieldnames = ['name', 'author', 'price']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
参数说明:
newline='':防止出现空行encoding='utf-8-sig':添加 BOM,让 Excel 正确识别中文
读取 CSV 文件
import csv
with open('books.csv', 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
print(f"{row['name']} - {row['author']} - {row['price']}")
追加数据到 CSV
import csv
import os
def append_to_csv(filename, new_data, fieldnames):
file_exists = os.path.exists(filename)
with open(filename, 'a', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
writer.writerow(new_data)
append_to_csv('books.csv', {'name': '新书籍', 'author': '新作者', 'price': 39.9},
fieldnames=['name', 'author', 'price'])
处理特殊字符
CSV 中的特殊字符(逗号、引号、换行)需要正确处理:
import csv
data = [
{'name': 'Python, 编程', 'description': '学习"Python"的好书', 'price': 59.9},
{'name': 'Java入门\n进阶', 'description': '适合初学者', 'price': 49.9},
]
with open('books_special.csv', 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=['name', 'description', 'price'])
writer.writeheader()
writer.writerows(data)
csv 模块会自动处理这些特殊字符,将包含特殊字符的字段用引号包裹。
SQLite 存储
SQLite 是轻量级的嵌入式数据库,无需安装服务器,适合小型项目。
创建数据库和表
import sqlite3
conn = sqlite3.connect('spider.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
author TEXT,
price REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
插入数据
import sqlite3
conn = sqlite3.connect('spider.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (?, ?, ?)
''', ('Python编程', '张三', 59.9))
conn.commit()
conn.close()
使用参数化查询(? 占位符)防止 SQL 注入。
批量插入
import sqlite3
books = [
('Java入门', '李四', 49.9),
('Go语言实战', '王五', 69.9),
('Rust编程', '赵六', 79.9),
]
conn = sqlite3.connect('spider.db')
cursor = conn.cursor()
cursor.executemany('''
INSERT INTO books (name, author, price)
VALUES (?, ?, ?)
''', books)
conn.commit()
conn.close()
查询数据
import sqlite3
conn = sqlite3.connect('spider.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM books WHERE price > ?', (50,))
rows = cursor.fetchall()
for row in rows:
print(row)
conn.close()
使用上下文管理器
import sqlite3
class Database:
def __init__(self, db_path):
self.db_path = db_path
def __enter__(self):
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.conn.commit()
else:
self.conn.rollback()
self.conn.close()
def insert_book(self, name, author, price):
self.cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (?, ?, ?)
''', (name, author, price))
def get_books(self, min_price=0):
self.cursor.execute('SELECT * FROM books WHERE price > ?', (min_price,))
return self.cursor.fetchall()
with Database('spider.db') as db:
db.insert_book('新书籍', '新作者', 39.9)
books = db.get_books(50)
for book in books:
print(book)
MySQL 存储
对于大型项目,通常使用 MySQL 等数据库服务器。
安装驱动
pip install pymysql
连接 MySQL
import pymysql
conn = pymysql.connect(
host='localhost',
port=3306,
user='root',
password='password',
database='spider',
charset='utf8mb4'
)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
author VARCHAR(100),
price DECIMAL(10, 2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
插入数据
import pymysql
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='spider',
charset='utf8mb4'
)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (%s, %s, %s)
''', ('Python编程', '张三', 59.9))
conn.commit()
conn.close()
批量插入
import pymysql
books = [
('Java入门', '李四', 49.9),
('Go语言实战', '王五', 69.9),
]
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='spider',
charset='utf8mb4'
)
cursor = conn.cursor()
cursor.executemany('''
INSERT INTO books (name, author, price)
VALUES (%s, %s, %s)
''', books)
conn.commit()
print(f'插入了 {cursor.rowcount} 条记录')
conn.close()
使用连接池
import pymysql
from dbutils.pooled_db import PooledDB
pool = PooledDB(
creator=pymysql,
host='localhost',
user='root',
password='password',
database='spider',
charset='utf8mb4',
maxconnections=10,
mincached=2,
)
def save_book(book):
conn = pool.connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO books (name, author, price)
VALUES (%s, %s, %s)
''', (book['name'], book['author'], book['price']))
conn.commit()
conn.close()
save_book({'name': '新书籍', 'author': '新作者', 'price': 39.9})
MongoDB 存储
MongoDB 是文档型数据库,适合存储非结构化数据。
安装驱动
pip install pymongo
连接 MongoDB
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['spider']
collection = db['books']
插入文档
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['spider']
collection = db['books']
book = {
'name': 'Python编程',
'author': '张三',
'price': 59.9,
'tags': ['Python', '编程', '入门']
}
result = collection.insert_one(book)
print(f'插入的文档ID: {result.inserted_id}')
client.close()
批量插入
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']
books = [
{'name': 'Java入门', 'author': '李四', 'price': 49.9},
{'name': 'Go语言实战', 'author': '王五', 'price': 69.9},
{'name': 'Rust编程', 'author': '赵六', 'price': 79.9},
]
result = collection.insert_many(books)
print(f'插入了 {len(result.inserted_ids)} 条文档')
client.close()
查询文档
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']
for book in collection.find({'price': {'$gt': 50}}):
print(f"{book['name']} - {book['price']}")
client.close()
常用查询操作符:
$eq:等于$ne:不等于$gt:大于$gte:大于等于$lt:小于$lte:小于等于$in:在列表中$nin:不在列表中
更新文档
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']
result = collection.update_one(
{'name': 'Python编程'},
{'$set': {'price': 55.9}}
)
print(f'修改了 {result.modified_count} 条文档')
client.close()
创建索引
from pymongo import MongoClient, ASCENDING, DESCENDING
client = MongoClient('mongodb://localhost:27017/')
collection = client['spider']['books']
collection.create_index([('name', ASCENDING)], unique=True)
collection.create_index([('author', ASCENDING), ('price', DESCENDING)])
client.close()
存储方案选择
| 存储方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| JSON | 简单易用,可读性好 | 大文件性能差 | 小型数据,配置文件 |
| JSON Lines | 支持流式处理 | 不能随机访问 | 大量数据,日志 |
| CSV | Excel 兼容,表格形式 | 不支持嵌套结构 | 表格数据,报表 |
| SQLite | 无需服务器,单文件 | 并发能力有限 | 小型应用,本地存储 |
| MySQL | 功能强大,社区活跃 | 需要服务器 | 中大型应用 |
| MongoDB | 灵活,适合非结构化数据 | 占用内存较大 | 非结构化数据,快速迭代 |
完整示例
import csv
import json
import sqlite3
from typing import List, Dict
class DataStorage:
def __init__(self, output_dir: str = 'output'):
self.output_dir = output_dir
def save_to_json(self, data: List[Dict], filename: str):
filepath = f'{self.output_dir}/{filename}.json'
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f'已保存 {len(data)} 条数据到 {filepath}')
def save_to_csv(self, data: List[Dict], filename: str):
if not data:
return
filepath = f'{self.output_dir}/{filename}.csv'
fieldnames = list(data[0].keys())
with open(filepath, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f'已保存 {len(data)} 条数据到 {filepath}')
def save_to_sqlite(self, data: List[Dict], filename: str, table: str):
if not data:
return
filepath = f'{self.output_dir}/{filename}.db'
conn = sqlite3.connect(filepath)
cursor = conn.cursor()
columns = list(data[0].keys())
columns_def = ', '.join([
f'{col} TEXT' if col != 'id' else f'{col} INTEGER PRIMARY KEY'
for col in columns
])
cursor.execute(f'CREATE TABLE IF NOT EXISTS {table} ({columns_def})')
placeholders = ', '.join(['?' for _ in columns])
cursor.executemany(
f'INSERT INTO {table} ({", ".join(columns)}) VALUES ({placeholders})',
[tuple(item.values()) for item in data]
)
conn.commit()
conn.close()
print(f'已保存 {len(data)} 条数据到 {filepath}')
if __name__ == '__main__':
import os
os.makedirs('output', exist_ok=True)
storage = DataStorage()
books = [
{'id': 1, 'name': 'Python编程', 'author': '张三', 'price': '59.9'},
{'id': 2, 'name': 'Java入门', 'author': '李四', 'price': '49.9'},
{'id': 3, 'name': 'Go语言实战', 'author': '王五', 'price': '69.9'},
]
storage.save_to_json(books, 'books')
storage.save_to_csv(books, 'books')
storage.save_to_sqlite(books, 'books', 'books')
小结
本章我们学习了:
- JSON 存储 - 结构化数据的通用格式
- CSV 存储 - 表格数据,Excel 兼容
- SQLite 存储 - 轻量级嵌入式数据库
- MySQL 存储 - 关系型数据库服务器
- MongoDB 存储 - 文档型数据库
- 存储方案选择 - 根据场景选择合适的存储方式
练习
- 将爬取的新闻数据保存为 JSON 和 CSV 两种格式
- 使用 SQLite 存储爬取的商品信息,并实现去重
- 实现一个支持多种存储方式的数据保存类