跳到主要内容

集成测试

集成测试验证多个模块或组件之间的交互是否正常工作。本章节将介绍集成测试的概念、策略和实践方法。

什么是集成测试?

集成测试是在单元测试之后进行的测试阶段,目的是发现模块间接口和交互的问题。

集成测试 vs 单元测试

理解两者的区别有助于正确选择测试策略:

特点单元测试集成测试
测试范围单个组件多个组件的交互
隔离程度完全隔离有限隔离
依赖处理使用 Mock/Stub使用真实依赖或 Test Container
执行速度快速(毫秒级)中等(秒级)
问题定位精准可能涉及多个组件
测试环境不需要外部资源需要数据库、服务等

为什么需要集成测试?

单元测试虽然能验证代码的正确性,但它们无法保证整个系统协同工作时的正确性:

  1. 发现接口问题

    • 数据格式不匹配
    • 接口契约变更
    • 调用顺序错误
    • 参数传递问题
  2. 验证数据流

    • 数据在组件间的传递
    • 数据转换和映射
    • 状态同步
  3. 测试真实交互

    • 数据库访问
    • 外部 API 调用
    • 消息队列通信
    • 缓存系统交互

集成测试策略

1. 大爆炸集成(Big Bang)

所有模块一次性集成并测试。

优点

  • 简单直接,无需特殊策略
  • 适合小型项目

缺点

  • 问题定位困难
  • 风险集中
  • 不适合大型项目

2. 自顶向下集成(Top-Down)

从顶层模块开始,逐步集成下层模块。

桩模块(Stub):模拟下层模块的行为,用于替代尚未开发的模块。

class PaymentServiceStub:
"""支付服务桩模块 - 模拟支付处理"""

def process_payment(self, amount, card_info):
# 不实际处理支付,直接返回模拟结果
return {
"status": "success",
"transaction_id": "stub-transaction-12345",
"amount": amount
}

# 在集成测试中使用桩模块
def test_order_creation_with_stub():
payment_stub = PaymentServiceStub()
order_service = OrderService(payment_stub)

order = order_service.create_order(
items=[{"product_id": 1, "quantity": 2, "price": 100}],
user_id=1
)

assert order.status == "created"
assert order.payment_status == "pending"

优点

  • 早期验证主要控制逻辑
  • 演示系统功能较早
  • 缺陷通常在顶层模块

缺点

  • 底层模块可能延迟测试
  • 需要大量桩模块

3. 自底向上集成(Bottom-Up)

从底层模块开始,逐步集成到顶层。

驱动模块(Driver):模拟上层模块的调用,用于测试底层模块。

class OrderServiceDriver:
"""订单服务驱动模块 - 模拟上层调用"""

def __init__(self, payment_service):
self.payment_service = payment_service

def test_payment_flow(self):
"""测试完整支付流程"""
# 准备测试数据
order_data = {
"order_id": "test-order-001",
"amount": 199.99,
"currency": "USD"
}

# 调用支付服务
result = self.payment_service.process_payment(
amount=order_data["amount"],
card_info={"number": "4111111111111111", "expiry": "12/25"}
)

# 验证结果
assert result["status"] == "success"
return result

优点

  • 底层模块充分测试
  • 适合底层复杂的系统
  • 驱动模块通常比桩模块简单

缺点

  • 顶层模块最后测试
  • 不能早期演示系统功能

4. 三明治集成(Sandwich)

结合自顶向下和自底向上,同时从中间层开始。

优点

  • 结合两种方法的优点
  • 可以并行进行测试

缺点

  • 需要同时维护桩模块和驱动模块
  • 测试成本较高

集成测试类型

1. API 集成测试

测试 API 端点和业务逻辑,验证 HTTP 接口的正确性。

import requests
import pytest

class TestUserAPI:
"""用户 API 集成测试"""

BASE_URL = "http://localhost:8000/api"

def test_create_user(self):
"""测试创建用户"""
response = requests.post(
f"{self.BASE_URL}/users",
json={
"username": "testuser",
"email": "[email protected]",
"password": "Test@123456"
}
)

assert response.status_code == 201
data = response.json()
assert data["username"] == "testuser"
assert "id" in data
assert "password" not in data # 密码不应返回

def test_get_user(self):
"""测试获取用户"""
# 先创建用户
create_resp = requests.post(
f"{self.BASE_URL}/users",
json={
"username": "testuser",
"email": "[email protected]",
"password": "Test@123456"
}
)
user_id = create_resp.json()["id"]

# 再获取用户
response = requests.get(f"{self.BASE_URL}/users/{user_id}")

assert response.status_code == 200
data = response.json()
assert data["id"] == user_id
assert data["username"] == "testuser"

def test_user_not_found(self):
"""测试获取不存在的用户"""
response = requests.get(f"{self.BASE_URL}/users/99999")

assert response.status_code == 404
assert "error" in response.json()

def test_create_duplicate_user(self):
"""测试创建重复用户"""
user_data = {
"username": "duplicate_user",
"email": "[email protected]",
"password": "Test@123456"
}

# 第一次创建应该成功
response1 = requests.post(f"{self.BASE_URL}/users", json=user_data)
assert response1.status_code == 201

# 第二次创建应该失败
response2 = requests.post(f"{self.BASE_URL}/users", json=user_data)
assert response2.status_code == 409 # Conflict

2. 数据库集成测试

测试与数据库的交互,验证 SQL 查询和数据操作的正确性。

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from myapp.models import Base, User, Order

class TestUserRepository:
"""用户仓库集成测试"""

@pytest.fixture(scope="class")
def db_engine(self):
"""创建测试数据库引擎"""
# 使用内存数据库进行测试
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
yield engine
engine.dispose()

@pytest.fixture
def db_session(self, db_engine):
"""创建数据库会话"""
Session = sessionmaker(bind=db_engine)
session = Session()

yield session

# 清理:回滚事务
session.rollback()
session.close()

def test_create_user(self, db_session):
"""测试创建用户"""
repo = UserRepository(db_session)

user = repo.create({
"username": "testuser",
"email": "[email protected]",
"password_hash": "hashed_password"
})

assert user.id is not None
assert user.username == "testuser"
assert user.email == "[email protected]"

def test_find_user_by_username(self, db_session):
"""测试通过用户名查找用户"""
repo = UserRepository(db_session)

# 创建用户
repo.create({
"username": "testuser",
"email": "[email protected]",
"password_hash": "hashed_password"
})

# 查找用户
user = repo.find_by_username("testuser")

assert user is not None
assert user.email == "[email protected]"

def test_find_nonexistent_user(self, db_session):
"""测试查找不存在的用户"""
repo = UserRepository(db_session)

user = repo.find_by_username("nonexistent")

assert user is None

def test_update_user(self, db_session):
"""测试更新用户"""
repo = UserRepository(db_session)

# 创建用户
user = repo.create({
"username": "testuser",
"email": "[email protected]",
"password_hash": "hashed_password"
})

# 更新用户
updated = repo.update(user.id, {"email": "[email protected]"})

assert updated.email == "[email protected]"

def test_delete_user(self, db_session):
"""测试删除用户"""
repo = UserRepository(db_session)

# 创建用户
user = repo.create({
"username": "testuser",
"email": "[email protected]",
"password_hash": "hashed_password"
})

# 删除用户
repo.delete(user.id)

# 验证已删除
deleted = repo.find_by_id(user.id)
assert deleted is None

3. 消息队列集成测试

测试异步消息处理,验证消息的发送和消费。

import pytest
import json
import pika
from unittest.mock import Mock

class TestMessageQueue:
"""消息队列集成测试"""

@pytest.fixture
def rabbitmq_connection(self):
"""创建 RabbitMQ 连接"""
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', port=5672)
)
yield connection
connection.close()
except pika.exceptions.AMQPConnectionError:
pytest.skip("RabbitMQ not available")

def test_order_created_event(self, rabbitmq_connection):
"""测试订单创建事件"""
channel = rabbitmq_connection.channel()

# 声明测试队列
queue_name = 'test_orders'
channel.queue_declare(queue=queue_name, durable=True)

# 发送消息
event = {
"event_type": "order_created",
"order_id": "ORD-001",
"user_id": "USER-001",
"amount": 100.00,
"timestamp": "2024-01-01T12:00:00Z"
}

channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(event),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)
)

# 消费消息
method, properties, body = channel.basic_get(queue_name, auto_ack=True)

assert method is not None
received_event = json.loads(body)
assert received_event["order_id"] == "ORD-001"
assert received_event["event_type"] == "order_created"

4. 外部服务集成测试

测试与第三方服务的集成,使用 Mock 或实际调用。

import pytest
import responses
import requests

class TestExternalService:
"""外部服务集成测试"""

@responses.activate
def test_payment_gateway_success(self):
"""测试支付网关成功响应"""
# Mock 外部服务响应
responses.add(
responses.POST,
"https://api.payment.com/v1/charges",
json={
"id": "charge_123",
"status": "succeeded",
"amount": 1000
},
status=200
)

# 调用支付服务
payment_service = PaymentService(api_key="test_key")
result = payment_service.charge(1000, "tok_visa")

assert result["status"] == "succeeded"
assert result["amount"] == 1000

@responses.activate
def test_payment_gateway_failure(self):
"""测试支付网关失败响应"""
responses.add(
responses.POST,
"https://api.payment.com/v1/charges",
json={
"error": {
"type": "card_error",
"message": "Your card was declined"
}
},
status=402
)

payment_service = PaymentService(api_key="test_key")

with pytest.raises(PaymentError) as exc_info:
payment_service.charge(1000, "tok_declined")

assert "declined" in str(exc_info.value).lower()

@responses.activate
def test_external_api_timeout(self):
"""测试外部 API 超时"""
responses.add(
responses.GET,
"https://api.external.com/data",
body=responses.ConnectionError("Connection timeout")
)

with pytest.raises(ConnectionError):
requests.get("https://api.external.com/data")

Test Containers

使用 Docker 容器进行集成测试,提供真实的环境,避免环境配置问题。

安装

pip install testcontainers

数据库测试容器

import pytest
from testcontainers.postgres import PostgresContainer
from sqlalchemy import create_engine, text

@pytest.fixture(scope="module")
def postgres_container():
"""PostgreSQL 测试容器"""
with PostgresContainer("postgres:15") as postgres:
yield postgres

@pytest.fixture
def db_engine(postgres_container):
"""创建数据库引擎"""
connection_url = postgres_container.get_connection_url()
engine = create_engine(connection_url)

# 创建测试表
with engine.connect() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL
)
"""))
conn.commit()

yield engine
engine.dispose()

def test_with_postgres(db_engine):
"""使用 PostgreSQL 容器进行测试"""
with db_engine.connect() as conn:
# 插入数据
conn.execute(
text("INSERT INTO users (username, email) VALUES (:username, :email)"),
{"username": "testuser", "email": "[email protected]"}
)
conn.commit()

# 查询数据
result = conn.execute(
text("SELECT * FROM users WHERE username = :username"),
{"username": "testuser"}
)
user = result.fetchone()

assert user is not None
assert user.username == "testuser"

Redis 测试容器

import pytest
from testcontainers.redis import RedisContainer
import redis

@pytest.fixture(scope="module")
def redis_container():
"""Redis 测试容器"""
with RedisContainer("redis:7") as container:
yield container

@pytest.fixture
def redis_client(redis_container):
"""创建 Redis 客户端"""
host = redis_container.get_container_host_ip()
port = redis_container.get_exposed_port(6379)
client = redis.Redis(host=host, port=port, decode_responses=True)
yield client
client.close()

def test_cache_operations(redis_client):
"""测试缓存操作"""
cache = CacheService(redis_client)

# 设置缓存
cache.set("user:1", {"name": "张三", "email": "[email protected]"}, ttl=300)

# 获取缓存
value = cache.get("user:1")
assert value["name"] == "张三"

# 删除缓存
cache.delete("user:1")
assert cache.get("user:1") is None

def test_cache_expiration(redis_client):
"""测试缓存过期"""
import time

cache = CacheService(redis_client)
cache.set("temp", "value", ttl=1)

assert cache.get("temp") == "value"
time.sleep(2)
assert cache.get("temp") is None

MySQL 测试容器

import pytest
from testcontainers.mysql import MySqlContainer
from sqlalchemy import create_engine

@pytest.fixture(scope="module")
def mysql_container():
"""MySQL 测试容器"""
with MySqlContainer("mysql:8.0") as mysql:
yield mysql

@pytest.fixture
def db_engine(mysql_container):
"""创建数据库引擎"""
connection_url = mysql_container.get_connection_url()
engine = create_engine(connection_url)
yield engine
engine.dispose()

def test_mysql_connection(db_engine):
"""测试 MySQL 连接"""
with db_engine.connect() as conn:
result = conn.execute("SELECT 1")
assert result.scalar() == 1

完整示例:Web 应用集成测试

import pytest
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer
import requests
import time

class TestWebApplication:
"""Web 应用完整集成测试"""

@pytest.fixture(scope="module")
def infrastructure(self):
"""启动测试基础设施"""
with PostgresContainer("postgres:15") as postgres, \
RedisContainer("redis:7") as redis_container:

# 获取连接信息
postgres_url = postgres.get_connection_url()
redis_host = redis_container.get_container_host_ip()
redis_port = redis_container.get_exposed_port(6379)

# 配置应用(假设应用支持环境变量配置)
import os
os.environ["DATABASE_URL"] = postgres_url
os.environ["REDIS_HOST"] = redis_host
os.environ["REDIS_PORT"] = str(redis_port)

# 启动应用
from myapp import create_app
app = create_app()

yield {
"app": app,
"base_url": "http://localhost:8000",
"postgres": postgres,
"redis": redis_container
}

def test_user_registration_flow(self, infrastructure):
"""测试用户注册完整流程"""
base_url = infrastructure["base_url"]

# 1. 注册用户
register_resp = requests.post(
f"{base_url}/api/register",
json={
"username": "newuser",
"email": "[email protected]",
"password": "Password123!"
}
)
assert register_resp.status_code == 201

# 2. 登录
login_resp = requests.post(
f"{base_url}/api/login",
json={
"username": "newuser",
"password": "Password123!"
}
)
assert login_resp.status_code == 200
token = login_resp.json()["token"]

# 3. 获取用户信息
profile_resp = requests.get(
f"{base_url}/api/profile",
headers={"Authorization": f"Bearer {token}"}
)
assert profile_resp.status_code == 200
assert profile_resp.json()["username"] == "newuser"

def test_order_creation_flow(self, infrastructure):
"""测试订单创建完整流程"""
base_url = infrastructure["base_url"]

# 登录
login_resp = requests.post(
f"{base_url}/api/login",
json={"username": "testuser", "password": "Password123!"}
)
token = login_resp.json()["token"]
headers = {"Authorization": f"Bearer {token}"}

# 创建订单
order_resp = requests.post(
f"{base_url}/api/orders",
headers=headers,
json={
"items": [
{"product_id": 1, "quantity": 2},
{"product_id": 2, "quantity": 1}
]
}
)
assert order_resp.status_code == 201
order_id = order_resp.json()["id"]

# 查询订单
get_order_resp = requests.get(
f"{base_url}/api/orders/{order_id}",
headers=headers
)
assert get_order_resp.status_code == 200
assert len(get_order_resp.json()["items"]) == 2

契约测试

验证服务间的契约(接口约定),确保服务提供者和消费者之间的接口一致。

Pact 契约测试

from pact import Consumer, Provider
import pytest

@pytest.fixture
def pact():
"""创建 Pact 契约"""
pact = Consumer('user-service').has_pact_with(
Provider('order-service'),
pact_dir='./pacts'
)
pact.start_service()
yield pact
pact.stop_service()

def test_get_user_orders(pact):
"""测试获取用户订单契约"""
expected = {
"orders": [
{"id": 1, "total": 100.00, "status": "completed"},
{"id": 2, "total": 200.00, "status": "pending"}
],
"total": 2
}

(pact
.given('user has orders')
.upon_receiving('a request for user orders')
.with_request('GET', '/users/1/orders')
.will_respond_with(200, body=expected))

with pact:
result = user_service.get_user_orders(1)
assert len(result["orders"]) == 2

def test_create_order_contract(pact):
"""测试创建订单契约"""
order_request = {
"user_id": 1,
"items": [
{"product_id": 1, "quantity": 2}
]
}

order_response = {
"id": 3,
"status": "created",
"total": 200.00
}

(pact
.given('products are available')
.upon_receiving('a request to create an order')
.with_request('POST', '/orders', body=order_request)
.will_respond_with(201, body=order_response))

with pact:
result = order_service.create_order(order_request)
assert result["status"] == "created"

集成测试最佳实践

1. 测试数据管理

使用工厂模式创建测试数据,提高可维护性:

import factory
from faker import Faker

fake = Faker()

class UserFactory(factory.Factory):
"""用户数据工厂"""

class Meta:
model = User

username = factory.LazyAttribute(lambda x: fake.user_name())
email = factory.LazyAttribute(lambda x: fake.email())
password_hash = factory.LazyAttribute(lambda x: fake.sha256())

class Admin:
"""管理员用户"""
is_admin = True

# 在测试中使用
def test_with_factory(db_session):
user = UserFactory.create(username="testuser")

assert user.id is not None
assert user.username == "testuser"

2. 测试隔离

确保每个测试独立运行,互不影响:

@pytest.fixture(autouse=True)
def cleanup_database(db_session):
"""每个测试后清理数据库"""
yield
db_session.rollback()
# 清理所有表数据
for table in reversed(Base.metadata.sorted_tables):
db_session.execute(table.delete())
db_session.commit()

@pytest.fixture
def isolated_test_data(db_session):
"""隔离的测试数据"""
user = UserFactory.create()
order = OrderFactory.create(user=user)

yield {
"user": user,
"order": order
}

# 清理
db_session.delete(order)
db_session.delete(user)
db_session.commit()

3. 配置管理

使用独立的测试配置:

# config/test.py
class TestConfig:
"""测试环境配置"""
TESTING = True
DEBUG = True

# 使用内存数据库
SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"

# 使用独立的 Redis 数据库
REDIS_URL = "redis://localhost:6379/15"

# 禁用 CSRF
WTF_CSRF_ENABLED = False

# 使用简单的缓存
CACHE_TYPE = "simple"

# 外部服务使用 Mock
EXTERNAL_API_URL = "http://mock-api.test"

4. 并行测试

使用并行执行加速测试:

# pytest-xdist 并行执行
pip install pytest-xdist

# 自动检测 CPU 核心数并并行执行
pytest -n auto

# 指定并行数
pytest -n 4

# 按模块分发
pytest -n auto --dist loadscope

集成测试 vs 单元测试选择

场景推荐测试类型原因
复杂业务逻辑单元测试快速、精准定位问题
数据访问层集成测试需要验证 SQL 和数据库交互
API 端点集成测试需要验证请求/响应完整流程
外部服务调用集成测试(Mock)验证集成点,避免依赖外部
消息队列集成测试需要验证消息格式和处理
用户完整流程E2E测试模拟真实用户场景

常见问题

1. 集成测试太慢怎么办?

解决方案

  • 使用内存数据库(SQLite)进行简单测试
  • 并行执行测试
  • 只测试关键路径
  • 使用 Test Containers 的复用功能
# 使用内存数据库加速测试
@pytest.fixture
def fast_db():
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
yield engine
engine.dispose()

2. 如何处理测试数据依赖?

解决方案

  • 使用工厂模式创建数据
  • 每个测试独立准备数据
  • 使用事务回滚清理数据
  • 避免测试之间的依赖关系

3. 外部服务不稳定怎么办?

解决方案

  • 使用 Mock/Stub 替代不稳定的外部服务
  • 实现熔断和重试机制
  • 使用契约测试验证接口
@responses.activate
def test_with_mocked_external_service():
responses.add(
responses.GET,
"https://api.external.com/data",
json={"data": "test"},
status=200
)

# 测试代码...

练习

  1. 为 REST API 编写集成测试,覆盖 CRUD 操作
  2. 使用 Test Containers 测试数据库访问层
  3. 实现消息队列的集成测试
  4. 编写契约测试验证服务间接口
  5. 配置并行测试执行,优化测试速度

参考资源