集成测试
集成测试验证多个模块或组件之间的交互是否正常工作。本章节将介绍集成测试的概念、策略和实践方法。
什么是集成测试?
集成测试是在单元测试之后进行的测试阶段,目的是发现模块间接口和交互的问题。
集成测试 vs 单元测试
理解两者的区别有助于正确选择测试策略:
| 特点 | 单元测试 | 集成测试 |
|---|---|---|
| 测试范围 | 单个组件 | 多个组件的交互 |
| 隔离程度 | 完全隔离 | 有限隔离 |
| 依赖处理 | 使用 Mock/Stub | 使用真实依赖或 Test Container |
| 执行速度 | 快速(毫秒级) | 中等(秒级) |
| 问题定位 | 精准 | 可能涉及多个组件 |
| 测试环境 | 不需要外部资源 | 需要数据库、服务等 |
为什么需要集成测试?
单元测试虽然能验证代码的正确性,但它们无法保证整个系统协同工作时的正确性:
-
发现接口问题
- 数据格式不匹配
- 接口契约变更
- 调用顺序错误
- 参数传递问题
-
验证数据流
- 数据在组件间的传递
- 数据转换和映射
- 状态同步
-
测试真实交互
- 数据库访问
- 外部 API 调用
- 消息队列通信
- 缓存系统交互
集成测试策略
1. 大爆炸集成(Big Bang)
所有模块一次性集成并测试。
优点:
- 简单直接,无需特殊策略
- 适合小型项目
缺点:
- 问题定位困难
- 风险集中
- 不适合大型项目
2. 自顶向下集成(Top-Down)
从顶层模块开始,逐步集成下层模块。
桩模块(Stub):模拟下层模块的行为,用于替代尚未开发的模块。
class PaymentServiceStub:
"""支付服务桩模块 - 模拟支付处理"""
def process_payment(self, amount, card_info):
# 不实际处理支付,直接返回模拟结果
return {
"status": "success",
"transaction_id": "stub-transaction-12345",
"amount": amount
}
# 在集成测试中使用桩模块
def test_order_creation_with_stub():
payment_stub = PaymentServiceStub()
order_service = OrderService(payment_stub)
order = order_service.create_order(
items=[{"product_id": 1, "quantity": 2, "price": 100}],
user_id=1
)
assert order.status == "created"
assert order.payment_status == "pending"
优点:
- 早期验证主要控制逻辑
- 演示系统功能较早
- 缺陷通常在顶层模块
缺点:
- 底层模块可能延迟测试
- 需要大量桩模块
3. 自底向上集成(Bottom-Up)
从底层模块开始,逐步集成到顶层。
驱动模块(Driver):模拟上层模块的调用,用于测试底层模块。
class OrderServiceDriver:
"""订单服务驱动模块 - 模拟上层调用"""
def __init__(self, payment_service):
self.payment_service = payment_service
def test_payment_flow(self):
"""测试完整支付流程"""
# 准备测试数据
order_data = {
"order_id": "test-order-001",
"amount": 199.99,
"currency": "USD"
}
# 调用支付服务
result = self.payment_service.process_payment(
amount=order_data["amount"],
card_info={"number": "4111111111111111", "expiry": "12/25"}
)
# 验证结果
assert result["status"] == "success"
return result
优点:
- 底层模块充分测试
- 适合底层复杂的系统
- 驱动模块通常比桩模块简单
缺点:
- 顶层模块最后测试
- 不能早期演示系统功能
4. 三明治集成(Sandwich)
结合自顶向下和自底向上,同时从中间层开始。
优点:
- 结合两种方法的优点
- 可以并行进行测试
缺点:
- 需要同时维护桩模块和驱动模块
- 测试成本较高
集成测试类型
1. API 集成测试
测试 API 端点和业务逻辑,验证 HTTP 接口的正确性。
import requests
import pytest
class TestUserAPI:
"""用户 API 集成测试"""
BASE_URL = "http://localhost:8000/api"
def test_create_user(self):
"""测试创建用户"""
response = requests.post(
f"{self.BASE_URL}/users",
json={
"username": "testuser",
"email": "[email protected]",
"password": "Test@123456"
}
)
assert response.status_code == 201
data = response.json()
assert data["username"] == "testuser"
assert "id" in data
assert "password" not in data # 密码不应返回
def test_get_user(self):
"""测试获取用户"""
# 先创建用户
create_resp = requests.post(
f"{self.BASE_URL}/users",
json={
"username": "testuser",
"email": "[email protected]",
"password": "Test@123456"
}
)
user_id = create_resp.json()["id"]
# 再获取用户
response = requests.get(f"{self.BASE_URL}/users/{user_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == user_id
assert data["username"] == "testuser"
def test_user_not_found(self):
"""测试获取不存在的用户"""
response = requests.get(f"{self.BASE_URL}/users/99999")
assert response.status_code == 404
assert "error" in response.json()
def test_create_duplicate_user(self):
"""测试创建重复用户"""
user_data = {
"username": "duplicate_user",
"email": "[email protected]",
"password": "Test@123456"
}
# 第一次创建应该成功
response1 = requests.post(f"{self.BASE_URL}/users", json=user_data)
assert response1.status_code == 201
# 第二次创建应该失败
response2 = requests.post(f"{self.BASE_URL}/users", json=user_data)
assert response2.status_code == 409 # Conflict
2. 数据库集成测试
测试与数据库的交互,验证 SQL 查询和数据操作的正确性。
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from myapp.models import Base, User, Order
class TestUserRepository:
"""用户仓库集成测试"""
@pytest.fixture(scope="class")
def db_engine(self):
"""创建测试数据库引擎"""
# 使用内存数据库进行测试
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
yield engine
engine.dispose()
@pytest.fixture
def db_session(self, db_engine):
"""创建数据库会话"""
Session = sessionmaker(bind=db_engine)
session = Session()
yield session
# 清理:回滚事务
session.rollback()
session.close()
def test_create_user(self, db_session):
"""测试创建用户"""
repo = UserRepository(db_session)
user = repo.create({
"username": "testuser",
"email": "[email protected]",
"password_hash": "hashed_password"
})
assert user.id is not None
assert user.username == "testuser"
assert user.email == "[email protected]"
def test_find_user_by_username(self, db_session):
"""测试通过用户名查找用户"""
repo = UserRepository(db_session)
# 创建用户
repo.create({
"username": "testuser",
"email": "[email protected]",
"password_hash": "hashed_password"
})
# 查找用户
user = repo.find_by_username("testuser")
assert user is not None
assert user.email == "[email protected]"
def test_find_nonexistent_user(self, db_session):
"""测试查找不存在的用户"""
repo = UserRepository(db_session)
user = repo.find_by_username("nonexistent")
assert user is None
def test_update_user(self, db_session):
"""测试更新用户"""
repo = UserRepository(db_session)
# 创建用户
user = repo.create({
"username": "testuser",
"email": "[email protected]",
"password_hash": "hashed_password"
})
# 更新用户
updated = repo.update(user.id, {"email": "[email protected]"})
assert updated.email == "[email protected]"
def test_delete_user(self, db_session):
"""测试删除用户"""
repo = UserRepository(db_session)
# 创建用户
user = repo.create({
"username": "testuser",
"email": "[email protected]",
"password_hash": "hashed_password"
})
# 删除用户
repo.delete(user.id)
# 验证已删除
deleted = repo.find_by_id(user.id)
assert deleted is None
3. 消息队列集成测试
测试异步消息处理,验证消息的发送和消费。
import pytest
import json
import pika
from unittest.mock import Mock
class TestMessageQueue:
"""消息队列集成测试"""
@pytest.fixture
def rabbitmq_connection(self):
"""创建 RabbitMQ 连接"""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', port=5672)
)
yield connection
connection.close()
except pika.exceptions.AMQPConnectionError:
pytest.skip("RabbitMQ not available")
def test_order_created_event(self, rabbitmq_connection):
"""测试订单创建事件"""
channel = rabbitmq_connection.channel()
# 声明测试队列
queue_name = 'test_orders'
channel.queue_declare(queue=queue_name, durable=True)
# 发送消息
event = {
"event_type": "order_created",
"order_id": "ORD-001",
"user_id": "USER-001",
"amount": 100.00,
"timestamp": "2024-01-01T12:00:00Z"
}
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(event),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)
)
# 消费消息
method, properties, body = channel.basic_get(queue_name, auto_ack=True)
assert method is not None
received_event = json.loads(body)
assert received_event["order_id"] == "ORD-001"
assert received_event["event_type"] == "order_created"
4. 外部服务集成测试
测试与第三方服务的集成,使用 Mock 或实际调用。
import pytest
import responses
import requests
class TestExternalService:
"""外部服务集成测试"""
@responses.activate
def test_payment_gateway_success(self):
"""测试支付网关成功响应"""
# Mock 外部服务响应
responses.add(
responses.POST,
"https://api.payment.com/v1/charges",
json={
"id": "charge_123",
"status": "succeeded",
"amount": 1000
},
status=200
)
# 调用支付服务
payment_service = PaymentService(api_key="test_key")
result = payment_service.charge(1000, "tok_visa")
assert result["status"] == "succeeded"
assert result["amount"] == 1000
@responses.activate
def test_payment_gateway_failure(self):
"""测试支付网关失败响应"""
responses.add(
responses.POST,
"https://api.payment.com/v1/charges",
json={
"error": {
"type": "card_error",
"message": "Your card was declined"
}
},
status=402
)
payment_service = PaymentService(api_key="test_key")
with pytest.raises(PaymentError) as exc_info:
payment_service.charge(1000, "tok_declined")
assert "declined" in str(exc_info.value).lower()
@responses.activate
def test_external_api_timeout(self):
"""测试外部 API 超时"""
responses.add(
responses.GET,
"https://api.external.com/data",
body=responses.ConnectionError("Connection timeout")
)
with pytest.raises(ConnectionError):
requests.get("https://api.external.com/data")
Test Containers
使用 Docker 容器进行集成测试,提供真实的环境,避免环境配置问题。
安装
pip install testcontainers
数据库测试容器
import pytest
from testcontainers.postgres import PostgresContainer
from sqlalchemy import create_engine, text
@pytest.fixture(scope="module")
def postgres_container():
"""PostgreSQL 测试容器"""
with PostgresContainer("postgres:15") as postgres:
yield postgres
@pytest.fixture
def db_engine(postgres_container):
"""创建数据库引擎"""
connection_url = postgres_container.get_connection_url()
engine = create_engine(connection_url)
# 创建测试表
with engine.connect() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL
)
"""))
conn.commit()
yield engine
engine.dispose()
def test_with_postgres(db_engine):
"""使用 PostgreSQL 容器进行测试"""
with db_engine.connect() as conn:
# 插入数据
conn.execute(
text("INSERT INTO users (username, email) VALUES (:username, :email)"),
{"username": "testuser", "email": "[email protected]"}
)
conn.commit()
# 查询数据
result = conn.execute(
text("SELECT * FROM users WHERE username = :username"),
{"username": "testuser"}
)
user = result.fetchone()
assert user is not None
assert user.username == "testuser"
Redis 测试容器
import pytest
from testcontainers.redis import RedisContainer
import redis
@pytest.fixture(scope="module")
def redis_container():
"""Redis 测试容器"""
with RedisContainer("redis:7") as container:
yield container
@pytest.fixture
def redis_client(redis_container):
"""创建 Redis 客户端"""
host = redis_container.get_container_host_ip()
port = redis_container.get_exposed_port(6379)
client = redis.Redis(host=host, port=port, decode_responses=True)
yield client
client.close()
def test_cache_operations(redis_client):
"""测试缓存操作"""
cache = CacheService(redis_client)
# 设置缓存
cache.set("user:1", {"name": "张三", "email": "[email protected]"}, ttl=300)
# 获取缓存
value = cache.get("user:1")
assert value["name"] == "张三"
# 删除缓存
cache.delete("user:1")
assert cache.get("user:1") is None
def test_cache_expiration(redis_client):
"""测试缓存过期"""
import time
cache = CacheService(redis_client)
cache.set("temp", "value", ttl=1)
assert cache.get("temp") == "value"
time.sleep(2)
assert cache.get("temp") is None
MySQL 测试容器
import pytest
from testcontainers.mysql import MySqlContainer
from sqlalchemy import create_engine
@pytest.fixture(scope="module")
def mysql_container():
"""MySQL 测试容器"""
with MySqlContainer("mysql:8.0") as mysql:
yield mysql
@pytest.fixture
def db_engine(mysql_container):
"""创建数据库引擎"""
connection_url = mysql_container.get_connection_url()
engine = create_engine(connection_url)
yield engine
engine.dispose()
def test_mysql_connection(db_engine):
"""测试 MySQL 连接"""
with db_engine.connect() as conn:
result = conn.execute("SELECT 1")
assert result.scalar() == 1
完整示例:Web 应用集成测试
import pytest
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer
import requests
import time
class TestWebApplication:
"""Web 应用完整集成测试"""
@pytest.fixture(scope="module")
def infrastructure(self):
"""启动测试基础设施"""
with PostgresContainer("postgres:15") as postgres, \
RedisContainer("redis:7") as redis_container:
# 获取连接信息
postgres_url = postgres.get_connection_url()
redis_host = redis_container.get_container_host_ip()
redis_port = redis_container.get_exposed_port(6379)
# 配置应用(假设应用支持环境变量配置)
import os
os.environ["DATABASE_URL"] = postgres_url
os.environ["REDIS_HOST"] = redis_host
os.environ["REDIS_PORT"] = str(redis_port)
# 启动应用
from myapp import create_app
app = create_app()
yield {
"app": app,
"base_url": "http://localhost:8000",
"postgres": postgres,
"redis": redis_container
}
def test_user_registration_flow(self, infrastructure):
"""测试用户注册完整流程"""
base_url = infrastructure["base_url"]
# 1. 注册用户
register_resp = requests.post(
f"{base_url}/api/register",
json={
"username": "newuser",
"email": "[email protected]",
"password": "Password123!"
}
)
assert register_resp.status_code == 201
# 2. 登录
login_resp = requests.post(
f"{base_url}/api/login",
json={
"username": "newuser",
"password": "Password123!"
}
)
assert login_resp.status_code == 200
token = login_resp.json()["token"]
# 3. 获取用户信息
profile_resp = requests.get(
f"{base_url}/api/profile",
headers={"Authorization": f"Bearer {token}"}
)
assert profile_resp.status_code == 200
assert profile_resp.json()["username"] == "newuser"
def test_order_creation_flow(self, infrastructure):
"""测试订单创建完整流程"""
base_url = infrastructure["base_url"]
# 登录
login_resp = requests.post(
f"{base_url}/api/login",
json={"username": "testuser", "password": "Password123!"}
)
token = login_resp.json()["token"]
headers = {"Authorization": f"Bearer {token}"}
# 创建订单
order_resp = requests.post(
f"{base_url}/api/orders",
headers=headers,
json={
"items": [
{"product_id": 1, "quantity": 2},
{"product_id": 2, "quantity": 1}
]
}
)
assert order_resp.status_code == 201
order_id = order_resp.json()["id"]
# 查询订单
get_order_resp = requests.get(
f"{base_url}/api/orders/{order_id}",
headers=headers
)
assert get_order_resp.status_code == 200
assert len(get_order_resp.json()["items"]) == 2
契约测试
验证服务间的契约(接口约定),确保服务提供者和消费者之间的接口一致。
Pact 契约测试
from pact import Consumer, Provider
import pytest
@pytest.fixture
def pact():
"""创建 Pact 契约"""
pact = Consumer('user-service').has_pact_with(
Provider('order-service'),
pact_dir='./pacts'
)
pact.start_service()
yield pact
pact.stop_service()
def test_get_user_orders(pact):
"""测试获取用户订单契约"""
expected = {
"orders": [
{"id": 1, "total": 100.00, "status": "completed"},
{"id": 2, "total": 200.00, "status": "pending"}
],
"total": 2
}
(pact
.given('user has orders')
.upon_receiving('a request for user orders')
.with_request('GET', '/users/1/orders')
.will_respond_with(200, body=expected))
with pact:
result = user_service.get_user_orders(1)
assert len(result["orders"]) == 2
def test_create_order_contract(pact):
"""测试创建订单契约"""
order_request = {
"user_id": 1,
"items": [
{"product_id": 1, "quantity": 2}
]
}
order_response = {
"id": 3,
"status": "created",
"total": 200.00
}
(pact
.given('products are available')
.upon_receiving('a request to create an order')
.with_request('POST', '/orders', body=order_request)
.will_respond_with(201, body=order_response))
with pact:
result = order_service.create_order(order_request)
assert result["status"] == "created"
集成测试最佳实践
1. 测试数据管理
使用工厂模式创建测试数据,提高可维护性:
import factory
from faker import Faker
fake = Faker()
class UserFactory(factory.Factory):
"""用户数据工厂"""
class Meta:
model = User
username = factory.LazyAttribute(lambda x: fake.user_name())
email = factory.LazyAttribute(lambda x: fake.email())
password_hash = factory.LazyAttribute(lambda x: fake.sha256())
class Admin:
"""管理员用户"""
is_admin = True
# 在测试中使用
def test_with_factory(db_session):
user = UserFactory.create(username="testuser")
assert user.id is not None
assert user.username == "testuser"
2. 测试隔离
确保每个测试独立运行,互不影响:
@pytest.fixture(autouse=True)
def cleanup_database(db_session):
"""每个测试后清理数据库"""
yield
db_session.rollback()
# 清理所有表数据
for table in reversed(Base.metadata.sorted_tables):
db_session.execute(table.delete())
db_session.commit()
@pytest.fixture
def isolated_test_data(db_session):
"""隔离的测试数据"""
user = UserFactory.create()
order = OrderFactory.create(user=user)
yield {
"user": user,
"order": order
}
# 清理
db_session.delete(order)
db_session.delete(user)
db_session.commit()
3. 配置管理
使用独立的测试配置:
# config/test.py
class TestConfig:
"""测试环境配置"""
TESTING = True
DEBUG = True
# 使用内存数据库
SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"
# 使用独立的 Redis 数据库
REDIS_URL = "redis://localhost:6379/15"
# 禁用 CSRF
WTF_CSRF_ENABLED = False
# 使用简单的缓存
CACHE_TYPE = "simple"
# 外部服务使用 Mock
EXTERNAL_API_URL = "http://mock-api.test"
4. 并行测试
使用并行执行加速测试:
# pytest-xdist 并行执行
pip install pytest-xdist
# 自动检测 CPU 核心数并并行执行
pytest -n auto
# 指定并行数
pytest -n 4
# 按模块分发
pytest -n auto --dist loadscope
集成测试 vs 单元测试选择
| 场景 | 推荐测试类型 | 原因 |
|---|---|---|
| 复杂业务逻辑 | 单元测试 | 快速、精准定位问题 |
| 数据访问层 | 集成测试 | 需要验证 SQL 和数据库交互 |
| API 端点 | 集成测试 | 需要验证请求/响应完整流程 |
| 外部服务调用 | 集成测试(Mock) | 验证集成点,避免依赖外部 |
| 消息队列 | 集成测试 | 需要验证消息格式和处理 |
| 用户完整流程 | E2E测试 | 模拟真实用户场景 |
常见问题
1. 集成测试太慢怎么办?
解决方案:
- 使用内存数据库(SQLite)进行简单测试
- 并行执行测试
- 只测试关键路径
- 使用 Test Containers 的复用功能
# 使用内存数据库加速测试
@pytest.fixture
def fast_db():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
yield engine
engine.dispose()
2. 如何处理测试数据依赖?
解决方案:
- 使用工厂模式创建数据
- 每个测试独立准备数据
- 使用事务回滚清理数据
- 避免测试之间的依赖关系
3. 外部服务不稳定怎么办?
解决方案:
- 使用 Mock/Stub 替代不稳定的外部服务
- 实现熔断和重试机制
- 使用契约测试验证接口
@responses.activate
def test_with_mocked_external_service():
responses.add(
responses.GET,
"https://api.external.com/data",
json={"data": "test"},
status=200
)
# 测试代码...
练习
- 为 REST API 编写集成测试,覆盖 CRUD 操作
- 使用 Test Containers 测试数据库访问层
- 实现消息队列的集成测试
- 编写契约测试验证服务间接口
- 配置并行测试执行,优化测试速度