事件驱动架构
事件驱动架构(Event-Driven Architecture,EDA)是一种以事件为核心进行系统设计和通信的架构风格。在这种架构中,组件通过异步消息传递进行通信,实现高度解耦和可扩展性。
什么是事件驱动架构?
事件驱动架构是一种软件架构模式,其中系统的行为由事件的接收和发送来驱动。事件表示系统中发生了某些有意义的事情,如用户操作、数据变更或系统状态转换。
┌─────────────────────────────────────────────────────────────────────────────┐
│ 事件驱动架构示意图 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ 事件 ┌──────────┐ 事件 ┌──────────┐ │
│ │ 事件源 │ ──────────>│ 事件总线 │ ──────────>│ 事件处理器│ │
│ │ (Source) │ │ (Bus) │ │(Handler) │ │
│ └──────────┘ └────┬─────┘ └──────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 库存服务 │ │ 通知服务 │ │ 分析服务 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
核心概念
1. 事件(Event)
事件表示系统中发生的某种状态变化或动作,是不可变的、已发生的事实。
// 基础事件接口
public interface DomainEvent {
String getEventId();
String getEventType();
LocalDateTime getOccurredOn();
String getAggregateId();
}
// 订单创建事件
public class OrderCreatedEvent implements DomainEvent {
private final String eventId;
private final String orderId;
private final String userId;
private final BigDecimal totalAmount;
private final List<OrderItem> items;
private final LocalDateTime occurredOn;
public OrderCreatedEvent(String orderId, String userId,
BigDecimal totalAmount, List<OrderItem> items) {
this.eventId = UUID.randomUUID().toString();
this.orderId = orderId;
this.userId = userId;
this.totalAmount = totalAmount;
this.items = new ArrayList<>(items);
this.occurredOn = LocalDateTime.now();
}
@Override
public String getEventId() {
return eventId;
}
@Override
public String getEventType() {
return "ORDER_CREATED";
}
@Override
public String getAggregateId() {
return orderId;
}
// getters...
}
// 订单支付完成事件
public class OrderPaidEvent implements DomainEvent {
private final String eventId;
private final String orderId;
private final String paymentId;
private final BigDecimal amount;
private final LocalDateTime paidAt;
private final LocalDateTime occurredOn;
public OrderPaidEvent(String orderId, String paymentId,
BigDecimal amount, LocalDateTime paidAt) {
this.eventId = UUID.randomUUID().toString();
this.orderId = orderId;
this.paymentId = paymentId;
this.amount = amount;
this.paidAt = paidAt;
this.occurredOn = LocalDateTime.now();
}
@Override
public String getEventType() {
return "ORDER_PAID";
}
@Override
public String getAggregateId() {
return orderId;
}
// getters...
}
2. 事件生产者(Producer)
产生事件的组件或服务。
@Service
public class OrderService {
private final EventPublisher eventPublisher;
private final OrderRepository orderRepository;
public Order createOrder(CreateOrderCommand command) {
// 创建订单
Order order = Order.create(command);
orderRepository.save(order);
// 发布订单创建事件
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getUserId(),
order.getTotalAmount(),
order.getItems()
);
eventPublisher.publish(event);
return order;
}
public void payOrder(String orderId, PaymentInfo paymentInfo) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
order.pay(paymentInfo);
orderRepository.save(order);
// 发布订单支付事件
OrderPaidEvent event = new OrderPaidEvent(
orderId,
paymentInfo.getPaymentId(),
paymentInfo.getAmount(),
LocalDateTime.now()
);
eventPublisher.publish(event);
}
}
3. 事件消费者(Consumer)
接收并处理事件的组件。
@Component
public class InventoryEventHandler {
private final InventoryService inventoryService;
@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
// 预留库存
for (OrderItem item : event.getItems()) {
inventoryService.reserveStock(
event.getOrderId(),
item.getProductId(),
item.getQuantity()
);
}
}
@EventListener
public void handleOrderCancelled(OrderCancelledEvent event) {
// 释放库存
inventoryService.releaseStock(event.getOrderId());
}
}
@Component
public class NotificationEventHandler {
private final EmailService emailService;
private final SmsService smsService;
@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
// 发送订单确认邮件
emailService.sendOrderConfirmation(event.getUserId(), event.getOrderId());
}
@EventListener
public void handleOrderPaid(OrderPaidEvent event) {
// 发送支付成功通知
smsService.sendPaymentSuccessNotification(
event.getAggregateId(),
event.getAmount()
);
}
}
事件驱动架构模式
1. 发布-订阅模式(Pub/Sub)
多个消费者可以订阅同一个事件主题,实现一对多通信。
// 使用 Kafka 实现发布-订阅
@Service
public class KafkaEventPublisher implements EventPublisher {
private final KafkaTemplate<String, DomainEvent> kafkaTemplate;
@Override
public void publish(DomainEvent event) {
String topic = getTopicForEvent(event);
kafkaTemplate.send(topic, event.getAggregateId(), event);
}
private String getTopicForEvent(DomainEvent event) {
// 根据事件类型路由到不同主题
return event.getEventType().toLowerCase().replace("_", "-");
}
}
@Component
public class KafkaEventConsumer {
private final InventoryEventHandler inventoryHandler;
private final NotificationEventHandler notificationHandler;
@KafkaListener(topics = "order-created", groupId = "inventory-service")
public void consumeOrderCreatedForInventory(OrderCreatedEvent event) {
inventoryHandler.handleOrderCreated(event);
}
@KafkaListener(topics = "order-created", groupId = "notification-service")
public void consumeOrderCreatedForNotification(OrderCreatedEvent event) {
notificationHandler.handleOrderCreated(event);
}
}
2. 事件溯源(Event Sourcing)
不存储当前状态,而是存储导致状态变化的所有事件,通过重放事件重建状态。
// 事件存储
public interface EventStore {
void appendEvents(String aggregateId, List<DomainEvent> events);
List<DomainEvent> getEvents(String aggregateId);
List<DomainEvent> getEvents(String aggregateId, long fromVersion);
}
// 事件溯源聚合根
public abstract class EventSourcedAggregate {
protected String id;
protected long version = 0;
protected List<DomainEvent> uncommittedEvents = new ArrayList<>();
public void apply(DomainEvent event) {
// 应用事件到聚合根
handleEvent(event);
version++;
}
protected void raiseEvent(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}
protected abstract void handleEvent(DomainEvent event);
public List<DomainEvent> getUncommittedEvents() {
return new ArrayList<>(uncommittedEvents);
}
public void markCommitted() {
uncommittedEvents.clear();
}
}
// 订单聚合根(事件溯源实现)
public class Order extends EventSourcedAggregate {
private String userId;
private OrderStatus status;
private List<OrderItem> items;
private BigDecimal totalAmount;
// 用于重建聚合根的工厂方法
public static Order reconstitute(String orderId, List<DomainEvent> events) {
Order order = new Order();
order.id = orderId;
for (DomainEvent event : events) {
order.apply(event);
}
return order;
}
// 创建新订单
public static Order create(String userId, List<OrderItem> items) {
Order order = new Order();
String orderId = UUID.randomUUID().toString();
BigDecimal total = items.stream()
.map(i -> i.getPrice().multiply(BigDecimal.valueOf(i.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
order.raiseEvent(new OrderCreatedEvent(orderId, userId, total, items));
return order;
}
// 支付订单
public void pay(PaymentInfo paymentInfo) {
if (status != OrderStatus.PENDING) {
throw new IllegalStateException("只能支付待处理订单");
}
raiseEvent(new OrderPaidEvent(id, paymentInfo.getPaymentId(),
totalAmount, LocalDateTime.now()));
}
@Override
protected void handleEvent(DomainEvent event) {
switch (event.getEventType()) {
case "ORDER_CREATED":
handleOrderCreated((OrderCreatedEvent) event);
break;
case "ORDER_PAID":
handleOrderPaid((OrderPaidEvent) event);
break;
// ... 其他事件处理
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
this.id = event.getAggregateId();
this.userId = event.getUserId();
this.items = event.getItems();
this.totalAmount = event.getTotalAmount();
this.status = OrderStatus.PENDING;
}
private void handleOrderPaid(OrderPaidEvent event) {
this.status = OrderStatus.PAID;
}
}
// 事件溯源仓库
@Service
public class EventSourcedOrderRepository {
private final EventStore eventStore;
public Order findById(String orderId) {
List<DomainEvent> events = eventStore.getEvents(orderId);
if (events.isEmpty()) {
return null;
}
return Order.reconstitute(orderId, events);
}
public void save(Order order) {
eventStore.appendEvents(order.getId(), order.getUncommittedEvents());
order.markCommitted();
}
}
3. CQRS(命令查询职责分离)
将读操作和写操作分离,使用不同的模型处理命令和查询。
// 命令端(写模型)
@Service
public class OrderCommandService {
private final EventSourcedOrderRepository orderRepository;
private final EventPublisher eventPublisher;
@Transactional
public String createOrder(CreateOrderCommand command) {
Order order = Order.create(command.getUserId(), command.getItems());
orderRepository.save(order);
// 发布事件,用于更新读模型
order.getUncommittedEvents().forEach(eventPublisher::publish);
return order.getId();
}
@Transactional
public void payOrder(String orderId, PaymentInfo paymentInfo) {
Order order = orderRepository.findById(orderId);
if (order == null) {
throw new OrderNotFoundException(orderId);
}
order.pay(paymentInfo);
orderRepository.save(order);
order.getUncommittedEvents().forEach(eventPublisher::publish);
}
}
// 查询端(读模型)
@Service
public class OrderQueryService {
private final OrderReadRepository readRepository;
public OrderDTO getOrder(String orderId) {
return readRepository.findById(orderId);
}
public List<OrderSummary> getUserOrders(String userId, Pageable pageable) {
return readRepository.findByUserId(userId, pageable);
}
public OrderStatistics getOrderStatistics(LocalDate from, LocalDate to) {
return readRepository.getStatistics(from, to);
}
}
// 读模型投影处理器
@Component
public class OrderProjectionHandler {
private final OrderReadRepository readRepository;
@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
OrderDTO dto = new OrderDTO();
dto.setId(event.getAggregateId());
dto.setUserId(event.getUserId());
dto.setTotalAmount(event.getTotalAmount());
dto.setStatus("PENDING");
dto.setCreatedAt(event.getOccurredOn());
readRepository.save(dto);
}
@EventListener
public void handleOrderPaid(OrderPaidEvent event) {
readRepository.updateStatus(event.getAggregateId(), "PAID");
}
}
事件驱动架构的优缺点
优点
| 优点 | 说明 |
|---|---|
| 高度解耦 | 生产者和消费者互不了解,通过事件间接通信 |
| 可扩展性 | 可以独立扩展事件生产者和消费者 |
| 弹性 | 消费者故障不影响生产者,消息可持久化重试 |
| 实时性 | 事件实时传播,支持实时处理场景 |
| 审计追踪 | 事件日志天然形成完整的变更历史 |
缺点
| 缺点 | 说明 |
|---|---|
| 最终一致性 | 数据在不同组件间可能存在延迟 |
| 复杂性 | 需要处理消息顺序、重复、丢失等问题 |
| 调试困难 | 异步流程难以追踪和调试 |
| 数据一致性 | 跨服务数据一致性处理复杂 |
最佳实践
1. 事件设计
// 好的事件设计:包含足够上下文信息
public class OrderShippedEvent implements DomainEvent {
private final String orderId;
private final String trackingNumber;
private final String carrier;
private final List<ShippedItem> items; // 包含明细
private final Address shippingAddress; // 包含地址信息
private final LocalDateTime shippedAt;
private final String shippedBy; // 操作人
// 消费者不需要额外查询就能处理事件
}
// 避免的事件设计:信息不足
public class BadOrderShippedEvent implements DomainEvent {
private final String orderId; // 只有ID,消费者需要额外查询
private final LocalDateTime shippedAt;
}
2. 错误处理
@Component
public class ReliableEventHandler {
private final DeadLetterQueue dlq;
private final RetryTemplate retryTemplate;
@KafkaListener(topics = "order-events")
public void handleEvent(DomainEvent event) {
try {
retryTemplate.execute(context -> {
processEvent(event);
return null;
});
} catch (Exception e) {
// 超过重试次数,发送到死信队列
dlq.send(event, e);
log.error("处理事件失败,已发送到DLQ", e);
}
}
private void processEvent(DomainEvent event) {
// 业务处理逻辑
}
}
3. 事件版本管理
public class VersionedEvent {
private final String eventType;
private final int version; // 事件版本
private final String payload;
// 支持向后兼容的事件升级
}
// 事件升级处理器
@Component
public class EventUpgrader {
public DomainEvent upgrade(DomainEvent oldEvent) {
if (oldEvent.getVersion() < 2) {
return upgradeToV2(oldEvent);
}
return oldEvent;
}
}
适用场景
- 实时数据处理:日志分析、实时监控
- 异步流程:订单处理、工作流编排
- 系统解耦:微服务间通信
- 审计追踪:合规要求、操作日志
- 复杂事件处理:规则引擎、风控系统
总结
事件驱动架构提供了强大的解耦和扩展能力,但也带来了复杂性。关键要点:
- 事件设计:包含足够上下文,避免消费者额外查询
- 可靠性:实现重试、死信队列等机制
- 监控:完善的链路追踪和监控告警
- 一致性:理解并接受最终一致性
"事件驱动架构让系统像生态系统一样运作,组件通过事件自然协作,而不是紧密耦合。" —— Martin Fowler