跳到主要内容

事件驱动架构

事件驱动架构(Event-Driven Architecture,EDA)是一种以事件为核心进行系统设计和通信的架构风格。在这种架构中,组件通过异步消息传递进行通信,实现高度解耦和可扩展性。

什么是事件驱动架构?

事件驱动架构是一种软件架构模式,其中系统的行为由事件的接收和发送来驱动。事件表示系统中发生了某些有意义的事情,如用户操作、数据变更或系统状态转换。

┌─────────────────────────────────────────────────────────────────────────────┐
│ 事件驱动架构示意图 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ 事件 ┌──────────┐ 事件 ┌──────────┐ │
│ │ 事件源 │ ──────────>│ 事件总线 │ ──────────>│ 事件处理器│ │
│ │ (Source) │ │ (Bus) │ │(Handler) │ │
│ └──────────┘ └────┬─────┘ └──────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 库存服务 │ │ 通知服务 │ │ 分析服务 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

核心概念

1. 事件(Event)

事件表示系统中发生的某种状态变化或动作,是不可变的、已发生的事实。

// 基础事件接口
public interface DomainEvent {
String getEventId();
String getEventType();
LocalDateTime getOccurredOn();
String getAggregateId();
}

// 订单创建事件
public class OrderCreatedEvent implements DomainEvent {
private final String eventId;
private final String orderId;
private final String userId;
private final BigDecimal totalAmount;
private final List<OrderItem> items;
private final LocalDateTime occurredOn;

public OrderCreatedEvent(String orderId, String userId,
BigDecimal totalAmount, List<OrderItem> items) {
this.eventId = UUID.randomUUID().toString();
this.orderId = orderId;
this.userId = userId;
this.totalAmount = totalAmount;
this.items = new ArrayList<>(items);
this.occurredOn = LocalDateTime.now();
}

@Override
public String getEventId() {
return eventId;
}

@Override
public String getEventType() {
return "ORDER_CREATED";
}

@Override
public String getAggregateId() {
return orderId;
}

// getters...
}

// 订单支付完成事件
public class OrderPaidEvent implements DomainEvent {
private final String eventId;
private final String orderId;
private final String paymentId;
private final BigDecimal amount;
private final LocalDateTime paidAt;
private final LocalDateTime occurredOn;

public OrderPaidEvent(String orderId, String paymentId,
BigDecimal amount, LocalDateTime paidAt) {
this.eventId = UUID.randomUUID().toString();
this.orderId = orderId;
this.paymentId = paymentId;
this.amount = amount;
this.paidAt = paidAt;
this.occurredOn = LocalDateTime.now();
}

@Override
public String getEventType() {
return "ORDER_PAID";
}

@Override
public String getAggregateId() {
return orderId;
}

// getters...
}

2. 事件生产者(Producer)

产生事件的组件或服务。

@Service
public class OrderService {

private final EventPublisher eventPublisher;
private final OrderRepository orderRepository;

public Order createOrder(CreateOrderCommand command) {
// 创建订单
Order order = Order.create(command);
orderRepository.save(order);

// 发布订单创建事件
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getUserId(),
order.getTotalAmount(),
order.getItems()
);
eventPublisher.publish(event);

return order;
}

public void payOrder(String orderId, PaymentInfo paymentInfo) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));

order.pay(paymentInfo);
orderRepository.save(order);

// 发布订单支付事件
OrderPaidEvent event = new OrderPaidEvent(
orderId,
paymentInfo.getPaymentId(),
paymentInfo.getAmount(),
LocalDateTime.now()
);
eventPublisher.publish(event);
}
}

3. 事件消费者(Consumer)

接收并处理事件的组件。

@Component
public class InventoryEventHandler {

private final InventoryService inventoryService;

@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
// 预留库存
for (OrderItem item : event.getItems()) {
inventoryService.reserveStock(
event.getOrderId(),
item.getProductId(),
item.getQuantity()
);
}
}

@EventListener
public void handleOrderCancelled(OrderCancelledEvent event) {
// 释放库存
inventoryService.releaseStock(event.getOrderId());
}
}

@Component
public class NotificationEventHandler {

private final EmailService emailService;
private final SmsService smsService;

@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
// 发送订单确认邮件
emailService.sendOrderConfirmation(event.getUserId(), event.getOrderId());
}

@EventListener
public void handleOrderPaid(OrderPaidEvent event) {
// 发送支付成功通知
smsService.sendPaymentSuccessNotification(
event.getAggregateId(),
event.getAmount()
);
}
}

事件驱动架构模式

1. 发布-订阅模式(Pub/Sub)

多个消费者可以订阅同一个事件主题,实现一对多通信。

// 使用 Kafka 实现发布-订阅
@Service
public class KafkaEventPublisher implements EventPublisher {

private final KafkaTemplate<String, DomainEvent> kafkaTemplate;

@Override
public void publish(DomainEvent event) {
String topic = getTopicForEvent(event);
kafkaTemplate.send(topic, event.getAggregateId(), event);
}

private String getTopicForEvent(DomainEvent event) {
// 根据事件类型路由到不同主题
return event.getEventType().toLowerCase().replace("_", "-");
}
}

@Component
public class KafkaEventConsumer {

private final InventoryEventHandler inventoryHandler;
private final NotificationEventHandler notificationHandler;

@KafkaListener(topics = "order-created", groupId = "inventory-service")
public void consumeOrderCreatedForInventory(OrderCreatedEvent event) {
inventoryHandler.handleOrderCreated(event);
}

@KafkaListener(topics = "order-created", groupId = "notification-service")
public void consumeOrderCreatedForNotification(OrderCreatedEvent event) {
notificationHandler.handleOrderCreated(event);
}
}

2. 事件溯源(Event Sourcing)

不存储当前状态,而是存储导致状态变化的所有事件,通过重放事件重建状态。

// 事件存储
public interface EventStore {
void appendEvents(String aggregateId, List<DomainEvent> events);
List<DomainEvent> getEvents(String aggregateId);
List<DomainEvent> getEvents(String aggregateId, long fromVersion);
}

// 事件溯源聚合根
public abstract class EventSourcedAggregate {
protected String id;
protected long version = 0;
protected List<DomainEvent> uncommittedEvents = new ArrayList<>();

public void apply(DomainEvent event) {
// 应用事件到聚合根
handleEvent(event);
version++;
}

protected void raiseEvent(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}

protected abstract void handleEvent(DomainEvent event);

public List<DomainEvent> getUncommittedEvents() {
return new ArrayList<>(uncommittedEvents);
}

public void markCommitted() {
uncommittedEvents.clear();
}
}

// 订单聚合根(事件溯源实现)
public class Order extends EventSourcedAggregate {
private String userId;
private OrderStatus status;
private List<OrderItem> items;
private BigDecimal totalAmount;

// 用于重建聚合根的工厂方法
public static Order reconstitute(String orderId, List<DomainEvent> events) {
Order order = new Order();
order.id = orderId;
for (DomainEvent event : events) {
order.apply(event);
}
return order;
}

// 创建新订单
public static Order create(String userId, List<OrderItem> items) {
Order order = new Order();
String orderId = UUID.randomUUID().toString();
BigDecimal total = items.stream()
.map(i -> i.getPrice().multiply(BigDecimal.valueOf(i.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);

order.raiseEvent(new OrderCreatedEvent(orderId, userId, total, items));
return order;
}

// 支付订单
public void pay(PaymentInfo paymentInfo) {
if (status != OrderStatus.PENDING) {
throw new IllegalStateException("只能支付待处理订单");
}
raiseEvent(new OrderPaidEvent(id, paymentInfo.getPaymentId(),
totalAmount, LocalDateTime.now()));
}

@Override
protected void handleEvent(DomainEvent event) {
switch (event.getEventType()) {
case "ORDER_CREATED":
handleOrderCreated((OrderCreatedEvent) event);
break;
case "ORDER_PAID":
handleOrderPaid((OrderPaidEvent) event);
break;
// ... 其他事件处理
}
}

private void handleOrderCreated(OrderCreatedEvent event) {
this.id = event.getAggregateId();
this.userId = event.getUserId();
this.items = event.getItems();
this.totalAmount = event.getTotalAmount();
this.status = OrderStatus.PENDING;
}

private void handleOrderPaid(OrderPaidEvent event) {
this.status = OrderStatus.PAID;
}
}

// 事件溯源仓库
@Service
public class EventSourcedOrderRepository {

private final EventStore eventStore;

public Order findById(String orderId) {
List<DomainEvent> events = eventStore.getEvents(orderId);
if (events.isEmpty()) {
return null;
}
return Order.reconstitute(orderId, events);
}

public void save(Order order) {
eventStore.appendEvents(order.getId(), order.getUncommittedEvents());
order.markCommitted();
}
}

3. CQRS(命令查询职责分离)

将读操作和写操作分离,使用不同的模型处理命令和查询。

// 命令端(写模型)
@Service
public class OrderCommandService {

private final EventSourcedOrderRepository orderRepository;
private final EventPublisher eventPublisher;

@Transactional
public String createOrder(CreateOrderCommand command) {
Order order = Order.create(command.getUserId(), command.getItems());
orderRepository.save(order);

// 发布事件,用于更新读模型
order.getUncommittedEvents().forEach(eventPublisher::publish);

return order.getId();
}

@Transactional
public void payOrder(String orderId, PaymentInfo paymentInfo) {
Order order = orderRepository.findById(orderId);
if (order == null) {
throw new OrderNotFoundException(orderId);
}
order.pay(paymentInfo);
orderRepository.save(order);

order.getUncommittedEvents().forEach(eventPublisher::publish);
}
}

// 查询端(读模型)
@Service
public class OrderQueryService {

private final OrderReadRepository readRepository;

public OrderDTO getOrder(String orderId) {
return readRepository.findById(orderId);
}

public List<OrderSummary> getUserOrders(String userId, Pageable pageable) {
return readRepository.findByUserId(userId, pageable);
}

public OrderStatistics getOrderStatistics(LocalDate from, LocalDate to) {
return readRepository.getStatistics(from, to);
}
}

// 读模型投影处理器
@Component
public class OrderProjectionHandler {

private final OrderReadRepository readRepository;

@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
OrderDTO dto = new OrderDTO();
dto.setId(event.getAggregateId());
dto.setUserId(event.getUserId());
dto.setTotalAmount(event.getTotalAmount());
dto.setStatus("PENDING");
dto.setCreatedAt(event.getOccurredOn());

readRepository.save(dto);
}

@EventListener
public void handleOrderPaid(OrderPaidEvent event) {
readRepository.updateStatus(event.getAggregateId(), "PAID");
}
}

事件驱动架构的优缺点

优点

优点说明
高度解耦生产者和消费者互不了解,通过事件间接通信
可扩展性可以独立扩展事件生产者和消费者
弹性消费者故障不影响生产者,消息可持久化重试
实时性事件实时传播,支持实时处理场景
审计追踪事件日志天然形成完整的变更历史

缺点

缺点说明
最终一致性数据在不同组件间可能存在延迟
复杂性需要处理消息顺序、重复、丢失等问题
调试困难异步流程难以追踪和调试
数据一致性跨服务数据一致性处理复杂

最佳实践

1. 事件设计

// 好的事件设计:包含足够上下文信息
public class OrderShippedEvent implements DomainEvent {
private final String orderId;
private final String trackingNumber;
private final String carrier;
private final List<ShippedItem> items; // 包含明细
private final Address shippingAddress; // 包含地址信息
private final LocalDateTime shippedAt;
private final String shippedBy; // 操作人

// 消费者不需要额外查询就能处理事件
}

// 避免的事件设计:信息不足
public class BadOrderShippedEvent implements DomainEvent {
private final String orderId; // 只有ID,消费者需要额外查询
private final LocalDateTime shippedAt;
}

2. 错误处理

@Component
public class ReliableEventHandler {

private final DeadLetterQueue dlq;
private final RetryTemplate retryTemplate;

@KafkaListener(topics = "order-events")
public void handleEvent(DomainEvent event) {
try {
retryTemplate.execute(context -> {
processEvent(event);
return null;
});
} catch (Exception e) {
// 超过重试次数,发送到死信队列
dlq.send(event, e);
log.error("处理事件失败,已发送到DLQ", e);
}
}

private void processEvent(DomainEvent event) {
// 业务处理逻辑
}
}

3. 事件版本管理

public class VersionedEvent {
private final String eventType;
private final int version; // 事件版本
private final String payload;

// 支持向后兼容的事件升级
}

// 事件升级处理器
@Component
public class EventUpgrader {

public DomainEvent upgrade(DomainEvent oldEvent) {
if (oldEvent.getVersion() < 2) {
return upgradeToV2(oldEvent);
}
return oldEvent;
}
}

适用场景

  • 实时数据处理:日志分析、实时监控
  • 异步流程:订单处理、工作流编排
  • 系统解耦:微服务间通信
  • 审计追踪:合规要求、操作日志
  • 复杂事件处理:规则引擎、风控系统

总结

事件驱动架构提供了强大的解耦和扩展能力,但也带来了复杂性。关键要点:

  1. 事件设计:包含足够上下文,避免消费者额外查询
  2. 可靠性:实现重试、死信队列等机制
  3. 监控:完善的链路追踪和监控告警
  4. 一致性:理解并接受最终一致性

"事件驱动架构让系统像生态系统一样运作,组件通过事件自然协作,而不是紧密耦合。" —— Martin Fowler

延伸阅读