集成测试
集成测试验证多个模块或组件之间的交互是否正常工作。本章节将介绍集成测试的概念、策略和实践方法。
什么是集成测试?
集成测试是在单元测试之后进行的测试阶段,目的是发现模块间接口和交互的问题。
┌─────────────────────────────────────────────────────────────┐
│ 集成测试 vs 单元测试 │
├─────────────────────────────────────────────────────────────┤
│ 单元测试 │ 集成测试 │
│ ─────────────────────│ ──────────────────────────────── │
│ 测试单个组件 │ 测试多个组件的交互 │
│ 完全隔离 │ 有限隔离 │
│ 使用 Mock/Stub │ 使用真实依赖或 Test Container │
│ 执行速度快 │ 执行速度中等 │
│ 定位问题精准 │ 可能涉及多个组件 │
└─────────────────────────────────────────────────────────────┘
为什么需要集成测试?
-
发现接口问题
- 数据格式不匹配
- 接口契约变更
- 调用顺序错误
-
验证数据流
- 数据在组件间的传递
- 数据转换和映射
- 状态同步
-
测试真实交互
- 数据库访问
- 外部 API 调用
- 消息队列通信
集成测试策略
1. 大爆炸集成(Big Bang)
所有模块一次性集成并测试。
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 模块 A │ │ 模块 B │ │ 模块 C │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└─────────────┼─────────────┘
▼
┌─────────────┐
│ 集成测试 │
└─────────────┘
优点:
- 简单直接
缺点:
- 问题定位困难
- 风险集中
- 不适合大型项目
2. 自顶向下集成(Top-Down)
从顶层模块开始,逐步集成下层模块。
┌─────────┐
│ 主控 │
└────┬────┘
│
┌───────┴───────┐
▼ ▼
┌─────────┐ ┌─────────┐
│ 子模块A │ │ 子模块B │
└────┬────┘ └────┬────┘
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│ 子模块C │ │ 子模块D │
└─────────┘ └─────────┘
桩模块(Stub):模拟下层模块的行为。
# 桩模块示例
class PaymentServiceStub:
"""支付服务桩模块"""
def process_payment(self, amount):
# 模拟成功响应
return {
"status": "success",
"transaction_id": "stub-12345"
}
3. 自底向上集成(Bottom-Up)
从底层模块开始,逐步集成到顶层。
┌─────────┐ ┌─────────┐
│ 基础模块A│ │ 基础模块B│
└────┬────┘ └────┬────┘
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│ 业务模块A│ │ 业务模块B│
└────┬────┘ └────┬────┘
│ │
└───────┬───────┘
▼
┌─────────┐
│ 主控 │
└─────────┘
驱动模块(Driver):模拟上层模块的调用。
# 驱动模块示例
class OrderServiceDriver:
"""订单服务驱动模块"""
def test_create_order(self):
# 驱动订单创建流程
order_data = self.prepare_order_data()
result = self.order_service.create(order_data)
return result
4. 三明治集成(Sandwich)
结合自顶向下和自底向上,同时从中间层开始。
┌─────────┐
│ 顶层 │ ← 自顶向下
└────┬────┘
│
┌───────┴───────┐
▼ ▼
┌─────────┐ ┌─────────┐
│ 中间层A │ │ 中间层B │ ← 重点测试
└────┬────┘ └────┬────┘
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│ 底层A │ │ 底层B │ ← 自底向上
└─────────┘ └─────────┘
集成测试类型
1. API 集成测试
测试 API 端点和业务逻辑。
import requests
import pytest
class TestUserAPI:
"""用户 API 集成测试"""
BASE_URL = "http://localhost:8000/api"
def test_create_user(self):
"""测试创建用户"""
response = requests.post(
f"{self.BASE_URL}/users",
json={
"username": "testuser",
"email": "[email protected]"
}
)
assert response.status_code == 201
data = response.json()
assert data["username"] == "testuser"
assert "id" in data
def test_get_user(self):
"""测试获取用户"""
# 先创建用户
create_resp = requests.post(
f"{self.BASE_URL}/users",
json={"username": "testuser", "email": "[email protected]"}
)
user_id = create_resp.json()["id"]
# 再获取用户
response = requests.get(f"{self.BASE_URL}/users/{user_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == user_id
def test_user_not_found(self):
"""测试获取不存在的用户"""
response = requests.get(f"{self.BASE_URL}/users/99999")
assert response.status_code == 404
2. 数据库集成测试
测试与数据库的交互。
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class TestUserRepository:
"""用户仓库集成测试"""
@pytest.fixture
def db_session(self):
"""创建数据库会话"""
# 使用测试数据库
engine = create_engine("postgresql://test:test@localhost/test_db")
Session = sessionmaker(bind=engine)
session = Session()
# 创建表
Base.metadata.create_all(engine)
yield session
# 清理
session.rollback()
session.close()
Base.metadata.drop_all(engine)
def test_create_user(self, db_session):
"""测试创建用户"""
repo = UserRepository(db_session)
user = repo.create({
"username": "testuser",
"email": "[email protected]"
})
assert user.id is not None
assert user.username == "testuser"
def test_find_user_by_username(self, db_session):
"""测试通过用户名查找用户"""
repo = UserRepository(db_session)
# 创建用户
repo.create({"username": "testuser", "email": "[email protected]"})
# 查找用户
user = repo.find_by_username("testuser")
assert user is not None
assert user.email == "[email protected]"
3. 消息队列集成测试
测试异步消息处理。
import pytest
import pika
import json
class TestMessageQueue:
"""消息队列集成测试"""
@pytest.fixture
def rabbitmq_connection(self):
"""创建 RabbitMQ 连接"""
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
yield connection
connection.close()
def test_order_created_event(self, rabbitmq_connection):
"""测试订单创建事件"""
channel = rabbitmq_connection.channel()
# 声明队列
channel.queue_declare(queue='test_orders')
# 发送消息
event = {
"event_type": "order_created",
"order_id": "12345",
"user_id": "user_001",
"amount": 100.00
}
channel.basic_publish(
exchange='',
routing_key='test_orders',
body=json.dumps(event)
)
# 消费消息
method, properties, body = channel.basic_get('test_orders')
assert method is not None
received_event = json.loads(body)
assert received_event["order_id"] == "12345"
4. 外部服务集成测试
测试与第三方服务的集成。
import pytest
import responses
import requests
class TestExternalService:
"""外部服务集成测试"""
@responses.activate
def test_payment_gateway(self):
"""测试支付网关"""
# Mock 外部服务响应
responses.add(
responses.POST,
"https://api.payment.com/v1/charges",
json={
"id": "charge_123",
"status": "succeeded",
"amount": 1000
},
status=200
)
# 调用支付服务
payment_service = PaymentService()
result = payment_service.charge(1000, "tok_visa")
assert result["status"] == "succeeded"
assert result["amount"] == 1000
Test Containers
使用 Docker 容器进行集成测试,提供真实的环境。
安装
pip install testcontainers
数据库测试容器
import pytest
from testcontainers.postgres import PostgresContainer
from sqlalchemy import create_engine
@pytest.fixture(scope="module")
def postgres_container():
"""PostgreSQL 测试容器"""
with PostgresContainer("postgres:15") as postgres:
yield postgres
@pytest.fixture
def db_engine(postgres_container):
"""创建数据库引擎"""
connection_url = postgres_container.get_connection_url()
engine = create_engine(connection_url)
yield engine
engine.dispose()
def test_with_postgres(db_engine):
"""使用 PostgreSQL 容器进行测试"""
with db_engine.connect() as conn:
result = conn.execute("SELECT 1")
assert result.scalar() == 1
Redis 测试容器
from testcontainers.redis import RedisContainer
import redis
@pytest.fixture(scope="module")
def redis_container():
"""Redis 测试容器"""
with RedisContainer("redis:7") as redis_container:
yield redis_container
@pytest.fixture
def redis_client(redis_container):
"""创建 Redis 客户端"""
host = redis_container.get_container_host_ip()
port = redis_container.get_exposed_port(6379)
client = redis.Redis(host=host, port=port, decode_responses=True)
yield client
client.close()
def test_cache_operations(redis_client):
"""测试缓存操作"""
cache = CacheService(redis_client)
# 设置缓存
cache.set("key1", "value1", ttl=300)
# 获取缓存
value = cache.get("key1")
assert value == "value1"
完整示例:Web 应用集成测试
import pytest
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer
import requests
import time
class TestWebApplication:
"""Web 应用集成测试"""
@pytest.fixture(scope="module")
def infrastructure(self):
"""启动测试基础设施"""
with PostgresContainer("postgres:15") as postgres, \
RedisContainer("redis:7") as redis:
# 获取连接信息
postgres_url = postgres.get_connection_url()
redis_host = redis.get_container_host_ip()
redis_port = redis.get_exposed_port(6379)
# 启动应用(使用测试配置)
app = create_app(
database_url=postgres_url,
redis_host=redis_host,
redis_port=redis_port
)
# 等待服务就绪
time.sleep(2)
yield {
"app": app,
"base_url": "http://localhost:8000"
}
def test_user_registration_flow(self, infrastructure):
"""测试用户注册完整流程"""
base_url = infrastructure["base_url"]
# 1. 注册用户
register_resp = requests.post(
f"{base_url}/api/register",
json={
"username": "newuser",
"email": "[email protected]",
"password": "password123"
}
)
assert register_resp.status_code == 201
# 2. 登录
login_resp = requests.post(
f"{base_url}/api/login",
json={
"username": "newuser",
"password": "password123"
}
)
assert login_resp.status_code == 200
token = login_resp.json()["token"]
# 3. 获取用户信息
profile_resp = requests.get(
f"{base_url}/api/profile",
headers={"Authorization": f"Bearer {token}"}
)
assert profile_resp.status_code == 200
assert profile_resp.json()["username"] == "newuser"
契约测试
验证服务间的契约(接口约定)。
Pact 契约测试
from pact import Consumer, Provider
import pytest
@pytest.fixture
def pact():
"""创建 Pact 契约"""
pact = Consumer('user-service').has_pact_with(
Provider('order-service'),
pact_dir='./pacts'
)
pact.start_service()
yield pact
pact.stop_service()
def test_get_user_orders(pact):
"""测试获取用户订单契约"""
expected = {
"orders": [
{"id": 1, "total": 100.00},
{"id": 2, "total": 200.00}
]
}
(pact
.given('user has orders')
.upon_receiving('a request for user orders')
.with_request('GET', '/users/1/orders')
.will_respond_with(200, body=expected))
with pact:
result = user_service.get_user_orders(1)
assert len(result["orders"]) == 2
集成测试最佳实践
1. 测试数据管理
@pytest.fixture
def test_data(db_session):
"""准备测试数据"""
# 使用工厂模式创建测试数据
user = UserFactory.create()
order = OrderFactory.create(user=user)
yield {
"user": user,
"order": order
}
# 清理
db_session.delete(order)
db_session.delete(user)
db_session.commit()
2. 测试隔离
@pytest.fixture(autouse=True)
def cleanup_database(db_session):
"""每个测试后清理数据库"""
yield
db_session.rollback()
# 清理测试数据
db_session.execute("TRUNCATE TABLE orders, users RESTART IDENTITY CASCADE")
db_session.commit()
3. 配置管理
# config/test.py
class TestConfig:
TESTING = True
DATABASE_URL = "postgresql://test:test@localhost/test_db"
REDIS_URL = "redis://localhost:6379/1"
# 使用内存缓存替代外部缓存
CACHE_TYPE = "simple"
4. 并行测试
# pytest 并行执行
pytest -n auto
# 指定并行数
pytest -n 4
集成测试 vs 单元测试选择
| 场景 | 推荐测试类型 | 原因 |
|---|---|---|
| 复杂业务逻辑 | 单元测试 | 快速、精准定位问题 |
| 数据访问层 | 集成测试 | 需要验证 SQL 和数据库交互 |
| API 端点 | 集成测试 | 需要验证请求/响应完整流程 |
| 外部服务调用 | 集成测试(Mock) | 验证集成点,避免依赖 |
| 消息队列 | 集成测试 | 需要验证消息格式和处理 |
常见问题
1. 集成测试太慢怎么办?
- 使用内存数据库(SQLite)进行简单测试
- 并行执行测试
- 只测试关键路径
- 使用 Test Containers 的复用功能
2. 如何处理测试数据依赖?
- 使用工厂模式创建数据
- 每个测试独立准备数据
- 使用事务回滚清理数据
3. 外部服务不稳定怎么办?
- 使用 Mock/Stub 替代不稳定的外部服务
- 实现熔断和重试机制
- 使用契约测试验证接口
练习
- 为 REST API 编写集成测试
- 使用 Test Containers 测试数据库访问
- 实现消息队列的集成测试
- 编写契约测试验证服务间接口
- 配置并行测试执行