【アーキテクチャ】イベント駆動アーキテクチャ - j-komatsu/myCheatSheet GitHub Wiki
イベント駆動アーキテクチャ(Event-Driven Architecture, EDA) は、システム内のコンポーネント間をイベントを介して疎結合に連携させるアーキテクチャパターンです。
| 要素 | 説明 |
|---|---|
| イベント | システム内で発生した事実・状態変化を表すメッセージ |
| イベント発行者(Producer) | イベントを生成・発行するコンポーネント |
| イベント購読者(Consumer) | イベントを受信・処理するコンポーネント |
| イベントブローカー | イベントの配信を仲介する基盤(Kafka, RabbitMQ等) |
graph LR
subgraph "同期通信(Request/Response)"
A1[Service A] -->|HTTP Request| B1[Service B]
B1 -->|HTTP Response| A1
end
subgraph "イベント駆動(非同期)"
A2[Service A] -->|Event| EB[Event Broker]
EB -->|Event| B2[Service B]
EB -->|Event| C2[Service C]
end
| メリット | 詳細 |
|---|---|
| 疎結合 | 発行者と購読者が互いを知る必要がない |
| スケーラビリティ | 購読者を独立してスケール可能 |
| 柔軟性 | 新しい購読者を既存システムに影響なく追加 |
| 耐障害性 | 一部のコンポーネント障害が全体に波及しない |
| リアルタイム性 | イベント発生時に即座に処理開始 |
| 課題 | 対策 |
|---|---|
| 複雑性の増加 | イベントフローの可視化ツール導入 |
| デバッグ困難 | 分散トレーシング(Zipkin, Jaeger等) |
| データ整合性 | Saga パターン、補償トランザクション |
| イベント順序保証 | パーティションキーの適切な設計 |
| 重複処理 | べき等性の実装 |
最もシンプルなパターン。イベント発生を通知するだけで、詳細データは含まない。
// イベント定義
public class OrderCreatedEvent {
private final Long orderId;
private final Instant occurredAt;
public OrderCreatedEvent(Long orderId) {
this.orderId = orderId;
this.occurredAt = Instant.now();
}
// getters...
}
// イベント発行者
@Service
public class OrderService {
private final ApplicationEventPublisher eventPublisher;
public void createOrder(CreateOrderCommand command) {
Order order = new Order(command);
orderRepository.save(order);
// イベント発行
eventPublisher.publishEvent(new OrderCreatedEvent(order.getId()));
}
}
// イベント購読者
@Component
public class NotificationService {
@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
// 注文IDをもとに詳細データを取得
Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
sendNotification(order);
}
}すべての状態変化をイベントとして永続化し、イベントの再生で現在状態を復元。
// イベントストア
public interface EventStore {
void save(DomainEvent event);
List<DomainEvent> getEvents(UUID aggregateId);
}
// 集約ルート
public class Order {
private UUID id;
private OrderStatus status;
private List<DomainEvent> uncommittedEvents = new ArrayList<>();
// イベントを適用
public void apply(OrderCreatedEvent event) {
this.id = event.getOrderId();
this.status = OrderStatus.CREATED;
}
public void apply(OrderConfirmedEvent event) {
this.status = OrderStatus.CONFIRMED;
}
// イベントを生成
public void confirm() {
if (this.status != OrderStatus.CREATED) {
throw new IllegalStateException("Cannot confirm order");
}
var event = new OrderConfirmedEvent(this.id);
apply(event);
uncommittedEvents.add(event);
}
// イベントから復元
public static Order fromEvents(List<DomainEvent> events) {
Order order = new Order();
events.forEach(order::apply);
return order;
}
}イベントを使って読み取りモデルを更新。
@Component
public class OrderReadModelUpdater {
private final OrderReadRepository readRepository;
@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
OrderReadModel readModel = new OrderReadModel(
event.getOrderId(),
event.getCustomerId(),
"CREATED"
);
readRepository.save(readModel);
}
@EventListener
public void handleOrderConfirmed(OrderConfirmedEvent event) {
readRepository.updateStatus(event.getOrderId(), "CONFIRMED");
}
}graph TD
A[Order Service] -->|OrderCreatedEvent| EB[Event Broker<br/>Kafka/RabbitMQ]
B[Payment Service] -->|PaymentCompletedEvent| EB
C[Inventory Service] -->|StockReservedEvent| EB
EB -->|Subscribe| D[Notification Service]
EB -->|Subscribe| E[Analytics Service]
EB -->|Subscribe| F[Audit Service]
style EB fill:#f9f,stroke:#333,stroke-width:4px
sequenceDiagram
participant OS as Order Service
participant K as Kafka Topic
participant PS as Payment Service
participant NS as Notification Service
OS->>K: Publish OrderCreatedEvent
K->>PS: Consume Event
K->>NS: Consume Event
PS->>PS: Process Payment
PS->>K: Publish PaymentCompletedEvent
NS->>NS: Send Email
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: order-service-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
private static final String TOPIC = "order-events";
public void publishOrderCreated(Order order) {
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getTotalAmount(),
Instant.now()
);
kafkaTemplate.send(TOPIC, order.getId().toString(), event)
.addCallback(
result -> log.info("Event published: {}", event),
ex -> log.error("Failed to publish event", ex)
);
}
}@Component
public class PaymentEventConsumer {
@KafkaListener(topics = "order-events", groupId = "payment-service-group")
public void handleOrderEvent(OrderCreatedEvent event) {
log.info("Received OrderCreatedEvent: {}", event.getOrderId());
try {
// 支払い処理
processPayment(event);
} catch (Exception e) {
// エラーハンドリング(リトライ、DLQへ送信等)
log.error("Payment processing failed", e);
throw e; // Kafkaがリトライ
}
}
private void processPayment(OrderCreatedEvent event) {
// 支払いロジック実装
}
}@Component
public class IdempotentEventConsumer {
private final ProcessedEventRepository processedEventRepo;
@KafkaListener(topics = "order-events")
@Transactional
public void handleEvent(OrderCreatedEvent event) {
String eventId = event.getEventId();
// 既に処理済みかチェック
if (processedEventRepo.existsById(eventId)) {
log.info("Event already processed: {}", eventId);
return;
}
// ビジネスロジック実行
processOrder(event);
// 処理済みマーク
processedEventRepo.save(new ProcessedEvent(eventId, Instant.now()));
}
}sequenceDiagram
participant OS as Order Service
participant PS as Payment Service
participant IS as Inventory Service
participant K as Kafka
OS->>K: OrderCreated
K->>PS: Process Payment
PS->>K: PaymentCompleted
K->>IS: Reserve Stock
IS--xIS: Stock Unavailable
IS->>K: StockReservationFailed
K->>PS: Compensate (Refund)
PS->>K: PaymentRefunded
K->>OS: OrderCancelled
@Component
public class OrderSagaOrchestrator {
@KafkaListener(topics = "payment-events")
public void handlePaymentCompleted(PaymentCompletedEvent event) {
// 次のステップ:在庫確保
inventoryService.reserveStock(event.getOrderId());
}
@KafkaListener(topics = "inventory-events")
public void handleStockReservationFailed(StockReservationFailedEvent event) {
// 補償トランザクション:支払い取り消し
paymentService.refund(event.getOrderId());
}
}❌ 問題
- 同一エンティティに対するイベントが順不同で処理される
✅ 解決策
// Kafkaのパーティションキーを使用
kafkaTemplate.send(
"order-events",
order.getId().toString(), // ← 同じorderIdは同じパーティションへ
event
);✅ 解決策
// 分散トレーシングID(Correlation ID)の付与
public class OrderCreatedEvent {
private String eventId;
private String correlationId; // ← リクエスト全体を追跡
private Long orderId;
private Instant timestamp;
}
// ログ出力時に常にcorrelationIdを含める
MDC.put("correlationId", event.getCorrelationId());
log.info("Processing event: {}", event);✅ 解決策
public class OrderCreatedEvent {
private String eventType = "OrderCreated";
private int version = 2; // ← イベントスキーマのバージョン
private Long orderId;
private String customerId;
private BigDecimal amount; // v2で追加
// 古いバージョンからの変換
public static OrderCreatedEvent fromV1(OrderCreatedEventV1 oldEvent) {
return new OrderCreatedEvent(
oldEvent.getOrderId(),
oldEvent.getCustomerId(),
BigDecimal.ZERO // デフォルト値
);
}
}@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory() {
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),
new FixedBackOff(1000L, 3L) // 1秒間隔で3回リトライ
));
return factory;
}| ブローカー | 特徴 | 適用シーン |
|---|---|---|
| Apache Kafka | 高スループット、永続化、順序保証 | 大量イベント、イベントソーシング |
| RabbitMQ | 柔軟なルーティング、AMQP対応 | 複雑なメッセージングパターン |
| AWS SNS/SQS | マネージドサービス、スケーラブル | AWS環境、運用負荷軽減 |
| Google Pub/Sub | マネージドサービス、グローバル配信 | GCP環境、リアルタイム分析 |
| Azure Service Bus | エンタープライズメッセージング | Azure環境、トランザクションサポート |
-
イベント設計は慎重に
- イベント名は過去形(OrderCreated, PaymentCompleted)
- イベントは不変(Immutable)
- 必要最小限の情報のみ含める
-
べき等性を必ず実装
- 同じイベントが複数回処理されても安全に
-
モニタリングとアラート
- イベント処理の遅延を監視
- DLQの溜まり具合をチェック
-
段階的な導入
同期通信 → イベント通知 → イベントソーシング
❌ 避けるべきこと
- イベントに大量のデータを詰め込む(参照IDのみ推奨)
- イベント購読者が発行者に同期的に依存
- イベントの過度な細分化(イベントストームに注意)
- Event Sourcing(イベントソーシング): すべての状態変化をイベントとして記録
- CQRS: コマンドとクエリの責任分離
- Saga Pattern: 分散トランザクションを複数のローカルトランザクションで実現
- Idempotency(べき等性): 同じ処理を複数回実行しても結果が同じ
- Dead Letter Queue(DLQ): 処理失敗イベントを格納するキュー
- 📖 Martin Fowler - Event-Driven Architecture
- 📖 Chris Richardson著『マイクロサービスパターン』
- 🌐 Confluent - Event-Driven Microservices
- 🌐 AWS - Event-Driven Architecture
課題1: 簡単なECサイトで「注文 → 支払い → 発送」のイベントフローを実装してみましょう
課題2: Sagaパターンを使って、在庫不足時に支払いをロールバックする仕組みを実装してみましょう
課題3: KafkaとSpring Bootを使って、べき等性を保証したイベント購読者を実装してみましょう
- 【アーキテクチャ】CQRS(コマンドクエリ責任分離) - イベント駆動と相性の良いパターン
- 【アーキテクチャ】マイクロサービスアーキテクチャ - イベント駆動の主要な適用先
- 【アーキテクチャ】モジュラモノリス - イベント駆動をモノリス内で実現
- 【アーキテクチャ】ドメイン駆動設計(DDD) - ドメインイベントの設計理論
- 【データベース】データベースの概要と主要概念 - イベントストアの実装
📝 最終更新: 2025-10-25