Javaにおけるイベントソーシングパターン:堅牢なシステムのための不変の履歴レコードの構築
別名
- イベントロギング
- イベントストリーミング
イベントソーシングデザインパターンの目的
イベントソーシングは、状態の変化をイベントのシーケンスとして保存することを推奨するデザインパターンです。データベース内のレコードを更新する代わりに、すべての変更が個々のイベントとして保存され、再生すると、任意の時点でのアプリケーションの状態を再現できます。
実際の例を用いたイベントソーシングパターンの詳細な説明
実際の例
ユーザーアカウントのすべてのトランザクションを追跡する銀行アプリケーションを考えてみましょう。このシステムでは、すべての預金、引き出し、および送金は、イベントログに個々のイベントとして記録されます。単に現在の口座残高を更新する代わりに、各トランザクションは個別のイベントとして保存されます。このアプローチにより、銀行はすべての口座活動の完全で不変の履歴を維持できます。不一致が発生した場合、銀行はイベントのシーケンスを再生して、任意の時点での口座状態を再構築できます。これにより、堅牢な監査証跡が提供され、デバッグが容易になり、トランザクションのロールバックや履歴データ分析などの機能がサポートされます。
分かりやすく言うと
イベントソーシングは、信頼性の高い状態の再構築と監査可能性を確保するために、すべての状態の変更を不変のイベントのシーケンスとして記録します。
Microsoftのドキュメントによると
イベントソーシングパターンは、追加専用のストアに記録される一連のイベントによって駆動される、データに対する操作を処理するためのアプローチを定義します。アプリケーションコードは、データに対して発生した各アクションを命令的に記述する一連のイベントをイベントストアに送信し、そこで永続化されます。各イベントは、データに対する一連の変更(AddedItemToOrderなど)を表します。
Javaにおけるイベントソーシングパターンのプログラム例
プログラム例では、銀行口座間でお金を送金します。
Event
クラスは、イベントのキューを管理し、非同期処理のためのスレッド操作を制御します。各イベントは、システムの状態に影響を与える状態変更と見なすことができます。
public class Event {
private static final Event INSTANCE = new Event();
private static final int MAX_PENDING = 16;
private int headIndex;
private int tailIndex;
private volatile Thread updateThread = null;
private final EventMessage[] pendingEvents = new EventMessage[MAX_PENDING];
Event() {}
public static Event getInstance() {
return INSTANCE;
}
}
triggerEvent
メソッドは、イベントが作成される場所です。イベントがトリガーされるたびに、イベントが作成され、キューに追加されます。このイベントには、状態変更の詳細が含まれています。
public void triggerEvent(EventMessage eventMessage) {
init();
for(var i = headIndex; i != tailIndex; i = (i + 1) % MAX_PENDING) {
var pendingEvent = getPendingEvents()[i];
if(pendingEvent.equals(eventMessage)) {
return;
}
}
getPendingEvents()[tailIndex] = eventMessage;
tailIndex = (tailIndex + 1) % MAX_PENDING;
}
init
メソッドとstartThread
メソッドは、スレッドが適切に初期化され、実行されていることを保証します。 stopService
メソッドは、不要になったときにスレッドを停止するために使用されます。これらのメソッドは、イベントの処理に使用されるスレッドのライフサイクルを管理します。
public synchronized void stopService() throws InterruptedException {
if(updateThread != null) {
updateThread.interrupt();
updateThread.join();
updateThread = null;
}
}
public synchronized boolean isServiceRunning() {
return updateThread != null && updateThread.isAlive();
}
public void init() {
if(updateThread == null) {
updateThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
update();
}
});
startThread();
}
}
private synchronized void startThread() {
if (!updateThread.isAlive()) {
updateThread.start();
headIndex = 0;
tailIndex = 0;
}
}
この例は、App
クラスとそのmain
メソッドによって駆動されます。
@Slf4j
public class App {
public static final int ACCOUNT_OF_DAENERYS = 1;
public static final int ACCOUNT_OF_JON = 2;
public static void main(String[] args) {
var eventProcessor = new DomainEventProcessor(new JsonFileJournal());
LOGGER.info("Running the system first time............");
eventProcessor.reset();
LOGGER.info("Creating the accounts............");
eventProcessor.process(new AccountCreateEvent(
0, new Date().getTime(), ACCOUNT_OF_DAENERYS, "Daenerys Targaryen"));
eventProcessor.process(new AccountCreateEvent(
1, new Date().getTime(), ACCOUNT_OF_JON, "Jon Snow"));
LOGGER.info("Do some money operations............");
eventProcessor.process(new MoneyDepositEvent(
2, new Date().getTime(), ACCOUNT_OF_DAENERYS, new BigDecimal("100000")));
eventProcessor.process(new MoneyDepositEvent(
3, new Date().getTime(), ACCOUNT_OF_JON, new BigDecimal("100")));
eventProcessor.process(new MoneyTransferEvent(
4, new Date().getTime(), new BigDecimal("10000"), ACCOUNT_OF_DAENERYS,
ACCOUNT_OF_JON));
LOGGER.info("...............State:............");
LOGGER.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString());
LOGGER.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString());
LOGGER.info("At that point system had a shut down, state in memory is cleared............");
AccountAggregate.resetState();
LOGGER.info("Recover the system by the events in journal file............");
eventProcessor = new DomainEventProcessor(new JsonFileJournal());
eventProcessor.recover();
LOGGER.info("...............Recovered State:............");
LOGGER.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString());
LOGGER.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString());
}
}
例を実行すると、次のコンソール出力が生成されます。
22:40:47.982 [main] INFO com.iluwatar.event.sourcing.app.App -- Running the system first time............
22:40:47.984 [main] INFO com.iluwatar.event.sourcing.app.App -- Creating the accounts............
22:40:47.985 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.089 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.090 [main] INFO com.iluwatar.event.sourcing.app.App -- Do some money operations............
22:40:48.090 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.095 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.099 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.099 [main] INFO com.iluwatar.event.sourcing.domain.Account -- Some external api for only realtime execution could be called here.
22:40:48.101 [main] INFO com.iluwatar.event.sourcing.app.App -- ...............State:............
22:40:48.104 [main] INFO com.iluwatar.event.sourcing.app.App -- Account{accountNo=1, owner='Daenerys Targaryen', money=90000}
22:40:48.104 [main] INFO com.iluwatar.event.sourcing.app.App -- Account{accountNo=2, owner='Jon Snow', money=10100}
22:40:48.104 [main] INFO com.iluwatar.event.sourcing.app.App -- At that point system had a shut down, state in memory is cleared............
22:40:48.104 [main] INFO com.iluwatar.event.sourcing.app.App -- Recover the system by the events in journal file............
22:40:48.124 [main] INFO com.iluwatar.event.sourcing.app.App -- ...............Recovered State:............
22:40:48.124 [main] INFO com.iluwatar.event.sourcing.app.App -- Account{accountNo=1, owner='Daenerys Targaryen', money=90000}
22:40:48.124 [main] INFO com.iluwatar.event.sourcing.app.App -- Account{accountNo=2, owner='Jon Snow', money=10100}
この例では、キュー内のイベントを再生することにより、システムの状態を任意の時点で再作成できます。これは、イベントソーシングパターンの重要な機能です。
Javaでイベントソーシングパターンを使用する場合
- 完全な監査証跡と履歴の変更が重要なシステムにおいて。
- アプリケーションの状態が一連の変更から派生する複雑なドメインにおいて。
- イベントソーシングが分散システムに自然に適しているため、高可用性とスケーラビリティの恩恵を受けるシステムにおいて。
Javaにおけるイベントソーシングパターンの実際のアプリケーション
- トランザクションと口座残高を経時的に追跡するための金融システム。
- 注文と在庫管理のためのeコマースアプリケーション。
- イベントの整合性と再現性が重要なリアルタイムデータ処理システム。
- LMAXアーキテクチャ
イベントソーシングパターンの利点とトレードオフ
利点
- 監査可能性:状態への各変更が記録されるため、包括的な監査が可能です。
- 再現性:イベントを再処理して、履歴状態を再作成したり、新しい状態に移行したりできます。
- スケーラビリティ:イベントは非同期で並列に処理できます。
トレードオフ
- 複雑さ:イベントソーシングシステムの実装と保守は、追加の複雑さを招く可能性があります。
- イベントストアのサイズ:すべての状態変更を保存すると、大量のデータが発生する可能性があります。
- イベントのバージョン管理:時間の経過に伴うイベント構造の変更には、システムの整合性を確保するための慎重な処理が必要です。
関連するJavaデザインパターン
- コマンドクエリ責務分離(CQRS):読み取りと書き込みの責務を分離してパフォーマンスとスケーラビリティを向上させるために、イベントソーシングと併用されることがよくあります。
- スナップショット:長いイベントシーケンスの再生を回避するために、現在の状態を定期的に保存することで、イベントソーシングシステムを最適化するために使用されます。