Skip to main content

Design an Event Sourcing & CQRS Framework

Problem Statement

Design an Event Sourcing & CQRS (Command Query Responsibility Segregation) framework. The framework must separate write operations (Commands) from read operations (Queries), log all state updates as an append-only sequence of immutable events in an Event Store, update read projections in real-time, and support reconstructing the current state of an object (Aggregate) by replaying its historical events.

Asked In Companies
Uber Netflix Microsoft

Design Decisions & Patterns Used

Traditional database architectures store only the current state of an object, discarding the historical changes that led to it. Event Sourcing solves this by storing state changes as an immutable sequence of events. CQRS complements this by separating the write-side model (optimized for validation and business logic) from the read-side model (optimized for fast queries).

We will utilize the following Design Patterns:

  • Observer Pattern: Registering projection handlers that listen to event streams and update read-side indexes dynamically.
  • Mediator Pattern: Routing commands from clients to their corresponding write-side aggregates.
  • Memento Pattern: Reconstructing the state of an aggregate by replaying its historical event logs.

Functional Requirements

  • Separate state modification triggers (Commands) from read operations (Queries).
  • Store all state transitions as an append-only stream of immutable events (Event Store).
  • Support state reconstruction: rebuild an aggregate's current state by replaying its event stream.
  • Update a read-side model projection in real-time when new events are committed.

Objects Required

  • Command & Event (Abstract base classes representing transactions)
  • AccountAggregate (Write-side domain model processing commands and tracking current state)
  • EventStore (Append-only registry storing event streams)
  • AccountReadModel (Read-side database projection optimized for queries)
  • EventBus (Message broker routing events from the write-side to the read-side)

Command and Event Base Classes

These abstract classes represent the operations in the framework. A Command is a request to change state, and an Event represents a completed state change.


public abstract class Command {
    private final String aggregateId;

    public Command(String aggregateId) {
        this.aggregateId = aggregateId;
    }

    public String getAggregateId() { return aggregateId; }
}

Let's define the base Event class:


public abstract class Event {
    private final String aggregateId;
    private final long timestamp;
    private int version;

    public Event(String aggregateId) {
        this.aggregateId = aggregateId;
        this.timestamp = System.currentTimeMillis();
    }

    public String getAggregateId() { return aggregateId; }
    public long getTimestamp() { return timestamp; }
    
    public int getVersion() { return version; }
    public void setVersion(int version) { this.version = version; }
}

Events capture a timestamp and version to enforce order checks during replay operations.


Concrete Commands and Events

We implement concrete commands and events to model a simple bank account service.


public class CreateAccountCommand extends Command {
    private final String holderName;

    public CreateAccountCommand(String aggregateId, String holderName) {
        super(aggregateId);
        this.holderName = holderName;
    }

    public String getHolderName() { return holderName; }
}

public class DepositMoneyCommand extends Command {
    private final double amount;

    public DepositMoneyCommand(String aggregateId, double amount) {
        super(aggregateId);
        this.amount = amount;
    }

    public double getAmount() { return amount; }
}

Let's define the corresponding event implementations:


public class AccountCreatedEvent extends Event {
    private final String holderName;

    public AccountCreatedEvent(String aggregateId, String holderName) {
        super(aggregateId);
        this.holderName = holderName;
    }

    public String getHolderName() { return holderName; }
}

public class MoneyDepositedEvent extends Event {
    private final double amount;

    public MoneyDepositedEvent(String aggregateId, double amount) {
        super(aggregateId);
        this.amount = amount;
    }

    public double getAmount() { return amount; }
}

These events serve as immutable records of completed state changes.


AccountAggregate Class (Write-Side Model)

The AccountAggregate processes commands, validates business logic, and mutates its state by applying events.


import java.util.ArrayList;
import java.util.List;

public class AccountAggregate {
    private final String id;
    private String holderName;
    private double balance;
    private int version;
    private final List<Event> uncommittedChanges;

    public AccountAggregate(String id) {
        this.id = id;
        this.version = 0;
        this.uncommittedChanges = new ArrayList<>();
    }

    // --- Command Handlers ---
    
    public void handle(CreateAccountCommand command) {
        // Business Rule Validation: Account name must not be empty
        if (command.getHolderName() == null || command.getHolderName().isEmpty()) {
            throw new IllegalArgumentException("Account holder name cannot be empty.");
        }
        applyNewEvent(new AccountCreatedEvent(command.getAggregateId(), command.getHolderName()));
    }

    public void handle(DepositMoneyCommand command) {
        if (command.getAmount() <= 0) {
            throw new IllegalArgumentException("Deposit amount must be positive.");
        }
        applyNewEvent(new MoneyDepositedEvent(command.getAggregateId(), command.getAmount()));
    }

    // --- State Mutation ---
    
    private void applyNewEvent(Event event) {
        event.setVersion(++version);
        apply(event);
        uncommittedChanges.add(event); // Track changes to commit to the Event Store later
    }

    public void apply(Event event) {
        if (event instanceof AccountCreatedEvent) {
            this.holderName = ((AccountCreatedEvent) event).getHolderName();
            this.balance = 0.0;
        } else if (event instanceof MoneyDepositedEvent) {
            this.balance += ((MoneyDepositedEvent) event).getAmount();
        }
        this.version = event.getVersion();
    }

    public void replay(List<Event> history) {
        for (Event event : history) {
            apply(event);
        }
    }

    public String getId() { return id; }
    public String getHolderName() { return holderName; }
    public double getBalance() { return balance; }
    public int getVersion() { return version; }
    public List<Event> getUncommittedChanges() { return uncommittedChanges; }
    public void markChangesAsCommitted() { uncommittedChanges.clear(); }
}

Here is an explanation of the core operations in the AccountAggregate class:

  • handle() processes commands. It validates business rules and generates a new event if the command is valid.
  • apply() mutates state variables based on the event type. It is called both when applying new local events and when replaying historical event streams.
  • replay() iterates through a list of historical events, applying each to reconstruct the aggregate's state.

EventStore & EventBus Classes

The EventStore provides append-only persistence for event streams. The EventBus broadcasts events to update read projections in real-time.


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class EventStore {
    private final Map<String, List<Event>> streams = new ConcurrentHashMap<>();
    private final EventBus eventBus;

    public EventStore(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    public synchronized void save(String aggregateId, List<Event> newEvents, int expectedVersion) {
        List<Event> stream = streams.computeIfAbsent(aggregateId, k -> new ArrayList<>());

        // Optimistic Concurrency Control Check
        int currentVersion = stream.isEmpty() ? 0 : stream.get(stream.size() - 1).getVersion();
        if (currentVersion != expectedVersion) {
            throw new ConcurrentModificationException("Optimistic lock exception: version mismatch.");
        }

        for (Event event : newEvents) {
            stream.add(event);
            eventBus.publish(event); // Broadcast to update read projections
        }
    }

    public synchronized List<Event> getEventStream(String aggregateId) {
        List<Event> stream = streams.get(aggregateId);
        if (stream == null) return new ArrayList<>();
        return new ArrayList<>(stream);
    }
}

The save() method implements optimistic concurrency control by verifying the aggregate's expected version before appending events. If the version matches, it appends the events and publishes them to the EventBus.


import java.util.ArrayList;
import java.util.List;

public class EventBus {
    private final List<AccountReadModel> subscribers = new ArrayList<>();

    public void subscribe(AccountReadModel subscriber) {
        subscribers.add(subscriber);
    }

    public void publish(Event event) {
        for (AccountReadModel subscriber : subscribers) {
            subscriber.onEvent(event);
        }
    }
}

The EventBus acts as a simple pub-sub broker. The publish() method dispatches events to all registered read-side models.


AccountReadModel Class (Read-Side Projection)

The AccountReadModel is optimized for read queries. It subscribes to the event bus and updates its internal query cache in real-time.


import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class AccountReadModel {
    private final Map<String, Double> accountBalances = new ConcurrentHashMap<>();

    public void onEvent(Event event) {
        if (event instanceof AccountCreatedEvent) {
            accountBalances.put(event.getAggregateId(), 0.0);
            System.out.println("[Read Projection] Initialized read record for " + event.getAggregateId());
        } else if (event instanceof MoneyDepositedEvent) {
            double deposit = ((MoneyDepositedEvent) event).getAmount();
            accountBalances.compute(event.getAggregateId(), (id, balance) -> (balance == null ? 0.0 : balance) + deposit);
            System.out.println("[Read Projection] Updated balance for " + event.getAggregateId() + " to " + accountBalances.get(event.getAggregateId()));
        }
    }

    public Double getBalance(String id) {
        return accountBalances.get(id);
    }
}

The onEvent() method handles incoming events, updating the read-side cache (accountBalances) directly to ensure fast queries.


Main Driver Class

This class tests our Event Sourcing and CQRS framework. It dispatches commands, verifies read projections, and reconstructs state from the event store.


import java.util.List;

public class Main {
    public static void main(String[] args) {
        EventBus eventBus = new EventBus();
        EventStore eventStore = new EventStore(eventBus);
        AccountReadModel readModel = new AccountReadModel();

        // Subscribe read projection to event bus
        eventBus.subscribe(readModel);

        String accountId = "ACC-99";
        System.out.println("==========================================");
        System.out.println("Scenario 1: Executing Commands and Updating Read Projection");
        System.out.println("==========================================");

        // Step 1: Create Account
        AccountAggregate aggregate = new AccountAggregate(accountId);
        aggregate.handle(new CreateAccountCommand(accountId, "Alice"));
        eventStore.save(accountId, aggregate.getUncommittedChanges(), 0);
        aggregate.markChangesAsCommitted();

        // Step 2: Deposit Money
        aggregate.handle(new DepositMoneyCommand(accountId, 500.00));
        eventStore.save(accountId, aggregate.getUncommittedChanges(), 1);
        aggregate.markChangesAsCommitted();

        // Step 3: Deposit More Money
        aggregate.handle(new DepositMoneyCommand(accountId, 200.00));
        eventStore.save(accountId, aggregate.getUncommittedChanges(), 2);
        aggregate.markChangesAsCommitted();

        // Query the read model
        System.out.println("\nQuerying Read Model for ACC-99 balance: $" + readModel.getBalance(accountId));

        System.out.println("\n==========================================");
        System.out.println("Scenario 2: Reconstructing State from Event Stream");
        System.out.println("==========================================");

        // Fetch historical event stream from event store
        List<Event> history = eventStore.getEventStream(accountId);
        System.out.println("Fetched " + history.size() + " historical events from Event Store.");

        // Instantiate a new aggregate instance and replay history to reconstruct its state
        AccountAggregate reconstructed = new AccountAggregate(accountId);
        reconstructed.replay(history);

        System.out.println("Reconstructed Account Holder: " + reconstructed.getHolderName());
        System.out.println("Reconstructed Balance: $" + reconstructed.getBalance());
        System.out.println("Reconstructed Version: " + reconstructed.getVersion());
    }
}

The main() driver configures the CQRS framework, dispatches commands to create and fund a mock account, queries the read projection, and verifies that replaying the event stream successfully reconstructs the aggregate's state.


Also See

Comments

Popular posts from this blog

Designing a Parking Lot - Low Level Design

Problem Statement Design a parking lot that can handle vehicles entering and leaving while managing parking across multiple floors. Each vehicle should be assigned a suitable parking spot based on its type, and the spot should be freed once the vehicle exits. The design should also support generating a ticket at entry and optionally calculating the parking fee based on the duration of stay. Asked In Companies Amazon Google Microsoft Uber Walmart Flipkart Meta PayPal Oracle Salesforce Adobe Apple Intuit LinkedIn Atlassian Functional Requirements The design should support multiple vehicle types such as bikes, cars, and trucks A vehicle must be assigned a parking spot compatible with its type A parking spot cannot be assigned to more than one vehicle at a time The parking lot should support multiple levels (floors) The design should search and allocate an availa...

Most Frequently Asked Low Level Design(LLD) Interview Questions

Below are the curated list of most commonly asked Low Level Design (LLD) interview problems. Each problem includes a short description and a link to the complete solution with code and class diagrams. Design Parking Lot System The system should handle parking for different vehicle types such as bikes, cars, and trucks. It should manage slot allocation, availability tracking, and entry/exit flow. The design also ensures efficient usage of parking space under varying load conditions. View Solution Design Elevator / Lift System The system should support multiple elevators operating across floors with request handling logic. It focuses on scheduling algorithms to minimize wait time and optimize movement. It also manages direction control and concurrent floor requests. View Solution Design Movie Ticket Booking System The system should allow users to browse movies, select shows, and book seats. It handles seat ...

Software Design Patterns for LLD Interviews: A Complete Guide

Software Design Patterns for LLD Interviews: A Complete Guide In Software Development Engineer (SDE) interviews—especially for mid-level and senior roles—low-level design (LLD) rounds assess your ability to write clean, reusable, maintainable, and extensible code. The foundation of resolving these architectural challenges lies in the standard Gang of Four (GoF) Design Patterns. Rather than memorizing theoretical definitions, interviewers expect you to apply these patterns to real-world scenarios, identifying the trade-offs of each. Below is a comprehensive guide to the 12 most frequently asked design patterns in LLD interviews, categorized by their classification (Creational, Structural, and Behavioral). Each pattern contains a concrete, real-world Java implementation and a detailed breakdown of design decisions. Creational Design Patterns Creational design patterns deal with object creation mechanisms. They abstract the instantiation process, making a system independent of how...