Problem Statement
Design an Event Sourcing & CQRS (Command Query Responsibility Segregation) framework. The framework must separate write operations (Commands) from read operations (Queries), log all state updates as an append-only sequence of immutable events in an Event Store, update read projections in real-time, and support reconstructing the current state of an object (Aggregate) by replaying its historical events.
Design Decisions & Patterns Used
Traditional database architectures store only the current state of an object, discarding the historical changes that led to it. Event Sourcing solves this by storing state changes as an immutable sequence of events. CQRS complements this by separating the write-side model (optimized for validation and business logic) from the read-side model (optimized for fast queries).
We will utilize the following Design Patterns:
- Observer Pattern: Registering projection handlers that listen to event streams and update read-side indexes dynamically.
- Mediator Pattern: Routing commands from clients to their corresponding write-side aggregates.
- Memento Pattern: Reconstructing the state of an aggregate by replaying its historical event logs.
Functional Requirements
- Separate state modification triggers (Commands) from read operations (Queries).
- Store all state transitions as an append-only stream of immutable events (Event Store).
- Support state reconstruction: rebuild an aggregate's current state by replaying its event stream.
- Update a read-side model projection in real-time when new events are committed.
Objects Required
Command&Event(Abstract base classes representing transactions)AccountAggregate(Write-side domain model processing commands and tracking current state)EventStore(Append-only registry storing event streams)AccountReadModel(Read-side database projection optimized for queries)EventBus(Message broker routing events from the write-side to the read-side)
Command and Event Base Classes
These abstract classes represent the operations in the framework. A Command is a request to change state, and an Event represents a completed state change.
public abstract class Command {
private final String aggregateId;
public Command(String aggregateId) {
this.aggregateId = aggregateId;
}
public String getAggregateId() { return aggregateId; }
}
Let's define the base Event class:
public abstract class Event {
private final String aggregateId;
private final long timestamp;
private int version;
public Event(String aggregateId) {
this.aggregateId = aggregateId;
this.timestamp = System.currentTimeMillis();
}
public String getAggregateId() { return aggregateId; }
public long getTimestamp() { return timestamp; }
public int getVersion() { return version; }
public void setVersion(int version) { this.version = version; }
}
Events capture a timestamp and version to enforce order checks during replay operations.
Concrete Commands and Events
We implement concrete commands and events to model a simple bank account service.
public class CreateAccountCommand extends Command {
private final String holderName;
public CreateAccountCommand(String aggregateId, String holderName) {
super(aggregateId);
this.holderName = holderName;
}
public String getHolderName() { return holderName; }
}
public class DepositMoneyCommand extends Command {
private final double amount;
public DepositMoneyCommand(String aggregateId, double amount) {
super(aggregateId);
this.amount = amount;
}
public double getAmount() { return amount; }
}
Let's define the corresponding event implementations:
public class AccountCreatedEvent extends Event {
private final String holderName;
public AccountCreatedEvent(String aggregateId, String holderName) {
super(aggregateId);
this.holderName = holderName;
}
public String getHolderName() { return holderName; }
}
public class MoneyDepositedEvent extends Event {
private final double amount;
public MoneyDepositedEvent(String aggregateId, double amount) {
super(aggregateId);
this.amount = amount;
}
public double getAmount() { return amount; }
}
These events serve as immutable records of completed state changes.
AccountAggregate Class (Write-Side Model)
The AccountAggregate processes commands, validates business logic, and mutates its state by applying events.
import java.util.ArrayList;
import java.util.List;
public class AccountAggregate {
private final String id;
private String holderName;
private double balance;
private int version;
private final List<Event> uncommittedChanges;
public AccountAggregate(String id) {
this.id = id;
this.version = 0;
this.uncommittedChanges = new ArrayList<>();
}
// --- Command Handlers ---
public void handle(CreateAccountCommand command) {
// Business Rule Validation: Account name must not be empty
if (command.getHolderName() == null || command.getHolderName().isEmpty()) {
throw new IllegalArgumentException("Account holder name cannot be empty.");
}
applyNewEvent(new AccountCreatedEvent(command.getAggregateId(), command.getHolderName()));
}
public void handle(DepositMoneyCommand command) {
if (command.getAmount() <= 0) {
throw new IllegalArgumentException("Deposit amount must be positive.");
}
applyNewEvent(new MoneyDepositedEvent(command.getAggregateId(), command.getAmount()));
}
// --- State Mutation ---
private void applyNewEvent(Event event) {
event.setVersion(++version);
apply(event);
uncommittedChanges.add(event); // Track changes to commit to the Event Store later
}
public void apply(Event event) {
if (event instanceof AccountCreatedEvent) {
this.holderName = ((AccountCreatedEvent) event).getHolderName();
this.balance = 0.0;
} else if (event instanceof MoneyDepositedEvent) {
this.balance += ((MoneyDepositedEvent) event).getAmount();
}
this.version = event.getVersion();
}
public void replay(List<Event> history) {
for (Event event : history) {
apply(event);
}
}
public String getId() { return id; }
public String getHolderName() { return holderName; }
public double getBalance() { return balance; }
public int getVersion() { return version; }
public List<Event> getUncommittedChanges() { return uncommittedChanges; }
public void markChangesAsCommitted() { uncommittedChanges.clear(); }
}
Here is an explanation of the core operations in the AccountAggregate class:
handle()processes commands. It validates business rules and generates a new event if the command is valid.apply()mutates state variables based on the event type. It is called both when applying new local events and when replaying historical event streams.replay()iterates through a list of historical events, applying each to reconstruct the aggregate's state.
EventStore & EventBus Classes
The EventStore provides append-only persistence for event streams. The EventBus broadcasts events to update read projections in real-time.
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class EventStore {
private final Map<String, List<Event>> streams = new ConcurrentHashMap<>();
private final EventBus eventBus;
public EventStore(EventBus eventBus) {
this.eventBus = eventBus;
}
public synchronized void save(String aggregateId, List<Event> newEvents, int expectedVersion) {
List<Event> stream = streams.computeIfAbsent(aggregateId, k -> new ArrayList<>());
// Optimistic Concurrency Control Check
int currentVersion = stream.isEmpty() ? 0 : stream.get(stream.size() - 1).getVersion();
if (currentVersion != expectedVersion) {
throw new ConcurrentModificationException("Optimistic lock exception: version mismatch.");
}
for (Event event : newEvents) {
stream.add(event);
eventBus.publish(event); // Broadcast to update read projections
}
}
public synchronized List<Event> getEventStream(String aggregateId) {
List<Event> stream = streams.get(aggregateId);
if (stream == null) return new ArrayList<>();
return new ArrayList<>(stream);
}
}
The save() method implements optimistic concurrency control by verifying the aggregate's expected version before appending events. If the version matches, it appends the events and publishes them to the EventBus.
import java.util.ArrayList;
import java.util.List;
public class EventBus {
private final List<AccountReadModel> subscribers = new ArrayList<>();
public void subscribe(AccountReadModel subscriber) {
subscribers.add(subscriber);
}
public void publish(Event event) {
for (AccountReadModel subscriber : subscribers) {
subscriber.onEvent(event);
}
}
}
The EventBus acts as a simple pub-sub broker. The publish() method dispatches events to all registered read-side models.
AccountReadModel Class (Read-Side Projection)
The AccountReadModel is optimized for read queries. It subscribes to the event bus and updates its internal query cache in real-time.
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class AccountReadModel {
private final Map<String, Double> accountBalances = new ConcurrentHashMap<>();
public void onEvent(Event event) {
if (event instanceof AccountCreatedEvent) {
accountBalances.put(event.getAggregateId(), 0.0);
System.out.println("[Read Projection] Initialized read record for " + event.getAggregateId());
} else if (event instanceof MoneyDepositedEvent) {
double deposit = ((MoneyDepositedEvent) event).getAmount();
accountBalances.compute(event.getAggregateId(), (id, balance) -> (balance == null ? 0.0 : balance) + deposit);
System.out.println("[Read Projection] Updated balance for " + event.getAggregateId() + " to " + accountBalances.get(event.getAggregateId()));
}
}
public Double getBalance(String id) {
return accountBalances.get(id);
}
}
The onEvent() method handles incoming events, updating the read-side cache (accountBalances) directly to ensure fast queries.
Main Driver Class
This class tests our Event Sourcing and CQRS framework. It dispatches commands, verifies read projections, and reconstructs state from the event store.
import java.util.List;
public class Main {
public static void main(String[] args) {
EventBus eventBus = new EventBus();
EventStore eventStore = new EventStore(eventBus);
AccountReadModel readModel = new AccountReadModel();
// Subscribe read projection to event bus
eventBus.subscribe(readModel);
String accountId = "ACC-99";
System.out.println("==========================================");
System.out.println("Scenario 1: Executing Commands and Updating Read Projection");
System.out.println("==========================================");
// Step 1: Create Account
AccountAggregate aggregate = new AccountAggregate(accountId);
aggregate.handle(new CreateAccountCommand(accountId, "Alice"));
eventStore.save(accountId, aggregate.getUncommittedChanges(), 0);
aggregate.markChangesAsCommitted();
// Step 2: Deposit Money
aggregate.handle(new DepositMoneyCommand(accountId, 500.00));
eventStore.save(accountId, aggregate.getUncommittedChanges(), 1);
aggregate.markChangesAsCommitted();
// Step 3: Deposit More Money
aggregate.handle(new DepositMoneyCommand(accountId, 200.00));
eventStore.save(accountId, aggregate.getUncommittedChanges(), 2);
aggregate.markChangesAsCommitted();
// Query the read model
System.out.println("\nQuerying Read Model for ACC-99 balance: $" + readModel.getBalance(accountId));
System.out.println("\n==========================================");
System.out.println("Scenario 2: Reconstructing State from Event Stream");
System.out.println("==========================================");
// Fetch historical event stream from event store
List<Event> history = eventStore.getEventStream(accountId);
System.out.println("Fetched " + history.size() + " historical events from Event Store.");
// Instantiate a new aggregate instance and replay history to reconstruct its state
AccountAggregate reconstructed = new AccountAggregate(accountId);
reconstructed.replay(history);
System.out.println("Reconstructed Account Holder: " + reconstructed.getHolderName());
System.out.println("Reconstructed Balance: $" + reconstructed.getBalance());
System.out.println("Reconstructed Version: " + reconstructed.getVersion());
}
}
The main() driver configures the CQRS framework, dispatches commands to create and fund a mock account, queries the read projection, and verifies that replaying the event stream successfully reconstructs the aggregate's state.
Comments
Post a Comment