跳到主要内容

CQRS 架构

CQRS(Command Query Responsibility Segregation,命令查询职责分离)是一种将读操作和写操作分离的架构模式,通过使用不同的模型来处理命令(写)和查询(读),从而优化系统性能和可扩展性。

什么是 CQRS?

CQRS 的核心思想是将系统的读操作和写操作分离到不同的模型中:

  • 命令端(Command Side):处理数据的创建、更新、删除操作
  • 查询端(Query Side):处理数据的读取操作
┌─────────────────────────────────────────────────────────────────────────────┐
│ CQRS 架构示意图 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 命令端 (Write Side) 查询端 (Read Side) │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Command │ │ Query │ │
│ │ Controller │ │ Controller │ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Command │ │ Query │ │
│ │ Handler │ │ Handler │ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ 事件同步 ┌─────────────┐ │
│ │ Write DB │ ──────────────────────> │ Read DB │ │
│ │ (关系型DB) │ (Event/消息队列) │ (优化读取) │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

为什么使用 CQRS?

传统方式的局限

在传统的 CRUD 架构中,同一个模型既要处理写操作又要处理读操作:

// 传统方式:同一个实体处理读写
@Entity
public class Order {
@Id
private Long id;
private String orderNumber;
private Long userId;
private BigDecimal totalAmount;
private OrderStatus status;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;

// 写操作需要的复杂业务逻辑
// 读操作需要的各种关联查询
// 导致实体变得臃肿
}

问题:

  • 读操作和写操作的优化目标冲突
  • 复杂的查询需要关联多张表
  • 领域模型被查询需求污染

CQRS 的优势

优势说明
性能优化读模型可以针对查询优化,写模型针对事务优化
可扩展性读写可以独立扩展
模型简化命令模型和查询模型各司其职,更加专注
灵活性可以使用不同的技术栈处理读写
安全性读写分离天然提供了安全边界

CQRS 实现模式

模式一:单数据库 CQRS

最简单的 CQRS 实现,使用同一个数据库,但使用不同的模型。

// 命令模型 - 关注业务规则
@Entity
@Table(name = "orders")
public class Order {
@Id
private String id;
private String userId;
private BigDecimal totalAmount;
private OrderStatus status;

// 业务方法
public void confirm() {
if (status != OrderStatus.PENDING) {
throw new IllegalStateException("只能确认待处理订单");
}
this.status = OrderStatus.CONFIRMED;
}
}

// 查询模型 - 针对读取优化
public class OrderSummary {
private String orderId;
private String userName; // 直接包含用户名,避免关联查询
private BigDecimal totalAmount;
private String status;
private LocalDateTime createdAt;
private int itemCount; // 直接包含统计信息

// 只有 getter,没有业务逻辑
}

// 查询仓库使用自定义 SQL 优化读取
@Repository
public interface OrderQueryRepository extends JpaRepository<OrderSummary, String> {

@Query("""
SELECT new com.example.OrderSummary(
o.id, u.username, o.totalAmount, o.status, o.createdAt, COUNT(oi)
)
FROM Order o
JOIN User u ON o.userId = u.id
LEFT JOIN OrderItem oi ON o.id = oi.orderId
WHERE o.status = :status
GROUP BY o.id
""")
List<OrderSummary> findByStatusWithSummary(@Param("status") String status);
}

模式二:事件溯源 CQRS

使用事件溯源作为写模型,投影到优化的读模型。

// 写模型 - 事件溯源
public class Order extends EventSourcedAggregate {
private String userId;
private List<OrderItem> items;
private OrderStatus status;

public static Order create(String userId, List<OrderItem> items) {
Order order = new Order();
order.raiseEvent(new OrderCreatedEvent(UUID.randomUUID().toString(),
userId, items));
return order;
}

public void confirm() {
if (status != OrderStatus.PENDING) {
throw new IllegalStateException("只能确认待处理订单");
}
raiseEvent(new OrderConfirmedEvent(this.id));
}

@Override
protected void handleEvent(DomainEvent event) {
switch (event.getEventType()) {
case "ORDER_CREATED" -> handleOrderCreated((OrderCreatedEvent) event);
case "ORDER_CONFIRMED" -> handleOrderConfirmed((OrderConfirmedEvent) event);
}
}
}

// 读模型 - 针对查询优化的投影
@Document(indexName = "orders")
public class OrderDocument {
@Id
private String orderId;
private String userId;
private String userName;
private List<OrderItemView> items;
private BigDecimal totalAmount;
private String status;
private LocalDateTime createdAt;
private List<String> tags;
private Map<String, Object> metadata;

// 为搜索优化的字段
private String searchText;
}

// 投影处理器
@Component
public class OrderProjectionHandler {

private final OrderReadRepository readRepository;
private final UserService userService;

@EventListener
public void onOrderCreated(OrderCreatedEvent event) {
UserDTO user = userService.getUser(event.getUserId());

OrderDocument document = new OrderDocument();
document.setOrderId(event.getOrderId());
document.setUserId(event.getUserId());
document.setUserName(user.getName());
document.setItems(event.getItems().stream()
.map(this::toItemView)
.collect(Collectors.toList()));
document.setTotalAmount(calculateTotal(event.getItems()));
document.setStatus("PENDING");
document.setCreatedAt(event.getOccurredOn());
document.setSearchText(buildSearchText(event, user));

readRepository.save(document);
}

@EventListener
public void onOrderConfirmed(OrderConfirmedEvent event) {
readRepository.updateStatus(event.getOrderId(), "CONFIRMED");
}
}

模式三:分离数据库 CQRS

读写使用完全不同的数据库技术。

// 写端 - 使用关系型数据库保证事务
@Service
@Transactional
public class OrderCommandService {

private final OrderRepository orderRepository;
private final EventPublisher eventPublisher;

public String createOrder(CreateOrderCommand command) {
Order order = Order.create(command.getUserId(), command.getItems());
orderRepository.save(order);

// 发布事件通知读端更新
eventPublisher.publish(new OrderCreatedEvent(order));

return order.getId();
}
}

// 读端 - 使用 Elasticsearch 优化搜索
@Service
public class OrderQueryService {

private final OrderSearchRepository searchRepository;

public List<OrderSummary> searchOrders(OrderSearchCriteria criteria) {
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("userName", criteria.getUserName()))
.filter(QueryBuilders.rangeQuery("createdAt")
.from(criteria.getStartDate())
.to(criteria.getEndDate())))
.withPageable(PageRequest.of(criteria.getPage(), criteria.getSize()))
.build();

return searchRepository.search(query)
.stream()
.map(this::toSummary)
.collect(Collectors.toList());
}

public Optional<OrderDetail> getOrderDetail(String orderId) {
return searchRepository.findById(orderId)
.map(this::toDetail);
}
}

// 事件同步机制
@Component
public class OrderSyncHandler {

private final OrderSearchRepository searchRepository;
private final EventStore eventStore;

@Scheduled(fixedDelay = 1000)
public void syncEvents() {
List<DomainEvent> events = eventStore.getUnprocessedEvents("order-sync");

for (DomainEvent event : events) {
try {
processEvent(event);
eventStore.markProcessed(event.getEventId());
} catch (Exception e) {
log.error("Failed to process event: {}", event.getEventId(), e);
}
}
}
}

CQRS 的适用场景

CQRS 并非适用于所有场景。根据 Martin Fowler 的观点,CQRS 应该谨慎使用:

适合使用 CQRS 的场景

场景说明
复杂领域模型读写模型差异大,分离后各自简化
读写负载差异大读操作远多于写操作,或反之
性能要求高需要对读写分别进行性能优化
团队分工明确读端和写端由不同团队维护
需要独立扩展读写端需要独立水平扩展

不适合使用 CQRS 的场景

场景说明
简单 CRUD 应用读写模型高度重合,分离增加复杂度
领域逻辑简单单一模型足够应对
团队规模小额外的复杂度会拖慢开发速度
一致性要求高CQRS 通常采用最终一致性

CQRS 的优缺点

优点

职责分离,模型简化

// 写模型 - 关注业务规则和数据完整性
public class Order {
// 只包含业务逻辑和状态变更方法
public void confirm() { ... }
public void ship() { ... }
public void cancel() { ... }
}

// 读模型 - 关注数据展示和查询效率
public class OrderListDTO {
// 只包含展示所需字段,可直接来自多个表的 JOIN
private String orderId;
private String customerName; // 来自用户表
private int itemCount; // 聚合计算
private String statusLabel; // 转换后的状态文本
}

独立优化

读端可以使用缓存、搜索引擎、物化视图等技术优化查询性能,而不影响写端的事务完整性。

独立扩展

当读操作量远大于写操作时,可以单独扩展读端实例数量,节省资源成本。

缺点

复杂度增加

  • 需要维护两套模型
  • 需要处理数据同步
  • 调试和排查问题更困难

最终一致性

写操作成功 → 事件发布 → 读模型更新
↓ ↓
数据已持久化 可能存在短暂延迟

读端数据可能存在延迟,需要业务上接受最终一致性。

学习成本

团队需要理解新的架构模式,学习曲线较陡。

CQRS 与其他模式的关系

CQRS + Event Sourcing

事件溯源与 CQRS 天然契合,事件作为写模型的状态变更记录,可以投影到任意读模型。

┌─────────────────────────────────────────────────────────────────────────────┐
│ CQRS + Event Sourcing 架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 写端 读端 │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Command │ │ Query │ │
│ │ Handler │ │ Handler │ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ 事件流 ┌─────────────┐ │
│ │ 聚合根 │ ──────────────────────> │ 投影器 │ │
│ │ (Aggregate) │ │ (Projector) │ │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Event Store │ │ Read DB │ │
│ │ (事件存储) │ │ (读模型存储) │ │
│ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

CQRS + 领域驱动设计

CQRS 适合应用于 DDD 中的特定限界上下文(Bounded Context),而非整个系统。每个限界上下文可以根据自身特点决定是否采用 CQRS。

最佳实践

1. 从简单开始

// 先使用单数据库 CQRS
// 确认收益后再考虑分离数据库

@Service
public class OrderService {

private final OrderRepository writeRepository;
private final OrderQueryRepository readRepository;

// 写操作
@Transactional
public Order createOrder(CreateOrderCommand command) {
Order order = Order.create(command);
return writeRepository.save(order);
}

// 读操作 - 使用优化的查询
public List<OrderSummary> getOrderSummaries(String userId) {
return readRepository.findSummariesByUserId(userId);
}
}

2. 使用 Mediator 模式

// 命令总线
public interface CommandBus {
<T> T dispatch(Command<T> command);
}

// 查询总线
public interface QueryBus {
<T> T execute(Query<T> query);
}

// 使用示例
@Service
public class OrderController {

private final CommandBus commandBus;
private final QueryBus queryBus;

@PostMapping
public OrderDTO createOrder(@RequestBody CreateOrderRequest request) {
CreateOrderCommand command = new CreateOrderCommand(request);
return commandBus.dispatch(command);
}

@GetMapping("/{id}")
public OrderDTO getOrder(@PathVariable String id) {
GetOrderQuery query = new GetOrderQuery(id);
return queryBus.execute(query);
}
}

3. 处理最终一致性

// 版本号机制
public class OrderDocument {
private String orderId;
private long version; // 版本号
private String status;
// ...
}

// 客户端检查版本
@GetMapping("/{id}")
public ResponseEntity<OrderDTO> getOrder(
@PathVariable String id,
@RequestHeader(value = "If-Match", required = false) Long expectedVersion) {

OrderDTO order = queryService.getOrder(id);

// 如果客户端版本比服务端新,说明数据还在同步中
if (expectedVersion != null && expectedVersion > order.getVersion()) {
return ResponseEntity.status(HttpStatus.CONFLICT).build();
}

return ResponseEntity.ok()
.eTag(String.valueOf(order.getVersion()))
.body(order);
}

4. 监控数据同步延迟

@Component
public class SyncMonitor {

private final MeterRegistry meterRegistry;

public void recordSyncLatency(DomainEvent event, Instant processedAt) {
Duration latency = Duration.between(event.getOccurredOn(), processedAt);
meterRegistry.timer("cqrs.sync.latency").record(latency);

if (latency.toSeconds() > 5) {
log.warn("High sync latency detected: {} ms", latency.toMillis());
}
}
}

常见误区

误区一:到处使用 CQRS

CQRS 不应该在整个系统中使用,而应该只在特定的限界上下文中使用。大多数业务场景使用单一模型就足够了。

误区二:读写必须使用不同数据库

CQRS 的核心是模型的分离,而不是数据库的分离。可以先使用同一个数据库,当有明确的性能需求时再考虑分离。

误区三:CQRS 就是微服务

CQRS 和微服务是两个独立的概念。一个微服务内部可以使用 CQRS,一个 CQRS 系统也可以不使用微服务。

总结

CQRS 是一个强大的架构模式,但它也带来了额外的复杂度。关键要点:

  1. 谨慎使用:只在确实需要时才引入 CQRS
  2. 局部应用:在特定限界上下文中使用,而非整个系统
  3. 接受最终一致性:理解并处理数据同步延迟
  4. 监控同步状态:确保读写数据的一致性

"CQRS is a pattern that is good to have in the toolbox, but beware that it is difficult to use well and you can easily chop off important bits if you mishandle it." —— Martin Fowler

延伸阅读

参考资料