Building an Event Sourced System in Python
Event sourcing is a powerful architectural pattern where all changes to application state are stored as a sequence of immutable events. This challenge asks you to implement a basic event sourcing system in Python, allowing you to manage and replay state changes for a simple domain. This pattern is useful for auditing, debugging, and enabling complex features like temporal queries.
Problem Description
You need to create a Python class that simulates an event sourced system. This system will manage a collection of "aggregates" (e.g., bank accounts, user profiles). Each aggregate will have its own stream of events. The core functionality will involve:
- Defining Events: Create a way to represent different types of events that can occur within the system.
- Storing Events: Implement a mechanism to store these events in an append-only log, typically associated with a specific aggregate ID.
- Applying Events: Develop a method to apply a sequence of events to an aggregate to reconstruct its current state.
- Committing Events: Provide a way to append new events to an aggregate's stream.
Your implementation should be able to:
- Persist events for a given aggregate ID.
- Retrieve all events for a specific aggregate ID.
- Load an aggregate's state by replaying its event history.
- Add new events to an aggregate's history and persist them.
Consider how to handle the initial creation of an aggregate and subsequent modifications.
Examples
Example 1: Bank Account Creation and Deposit
Let's imagine a simple BankAccount aggregate.
Events:
AccountCreated(account_id: str, initial_balance: float)Deposited(account_id: str, amount: float)
# Conceptual representation of an aggregate
class BankAccount:
def __init__(self):
self.balance = 0.0
self.account_id = None
def apply_event(self, event):
if isinstance(event, AccountCreated):
self.account_id = event.account_id
self.balance = event.initial_balance
elif isinstance(event, Deposited):
self.balance += event.amount
# Conceptual event store
class EventStore:
def __init__(self):
self.events = {} # {account_id: [event1, event2, ...]}
def append_events(self, account_id: str, new_events: list):
if account_id not in self.events:
self.events[account_id] = []
self.events[account_id].extend(new_events)
def get_events(self, account_id: str) -> list:
return self.events.get(account_id, [])
# --- Scenario ---
event_store = EventStore()
# 1. Create account
account_id_1 = "acc_123"
initial_deposit = 100.0
create_event = AccountCreated(account_id=account_id_1, initial_balance=initial_deposit)
event_store.append_events(account_id_1, [create_event])
# 2. Deposit more
deposit_amount = 50.0
deposit_event = Deposited(account_id=account_id_1, amount=deposit_amount)
event_store.append_events(account_id_1, [deposit_event])
# 3. Load and get state
account_events = event_store.get_events(account_id_1)
account = BankAccount()
for event in account_events:
account.apply_event(event)
print(f"Account ID: {account.account_id}")
print(f"Final Balance: {account.balance}")
Expected Output (for Example 1):
Account ID: acc_123
Final Balance: 150.0
Explanation:
The EventStore stores the AccountCreated and Deposited events. When get_events is called for acc_123, it retrieves both events. The BankAccount then iterates through these events, applying each one to reconstruct its state. The initial balance is set, and then the deposit amount is added.
Example 2: Handling a Non-Existent Aggregate
# Using the same EventStore and BankAccount from Example 1
# --- Scenario ---
event_store = EventStore()
account_id_2 = "acc_456"
# Attempt to get events for a non-existent account
account_events_nonexistent = event_store.get_events(account_id_2)
print(f"Events for {account_id_2}: {account_events_nonexistent}")
# Attempt to load state for a non-existent account
account_nonexistent = BankAccount()
for event in account_events_nonexistent:
account_nonexistent.apply_event(event)
print(f"Account ID (non-existent): {account_nonexistent.account_id}")
print(f"Balance (non-existent): {account_nonexistent.balance}")
Expected Output (for Example 2):
Events for acc_456: []
Account ID (non-existent): None
Balance (non-existent): 0.0
Explanation:
The EventStore.get_events method correctly returns an empty list for an account_id that has no events. Consequently, when an empty list of events is processed, the BankAccount remains in its initial state (e.g., account_id=None, balance=0.0).
Constraints
- Your
EventStoreshould be able to handle at least 1,000,000 events per aggregate ID. - Event objects should be serializable (e.g., can be converted to JSON or a dictionary for storage, although you don't need to implement the serialization/deserialization for this challenge, assume it's handled).
- The
apply_eventmethod within an aggregate must deterministically produce the same state given the same sequence of events.
Notes
- This challenge focuses on the core principles of event sourcing: event definition, storage, and state reconstruction.
- You are free to choose how to represent your events (e.g., simple classes, Pydantic models, dataclasses).
- The
EventStorein this challenge can be an in-memory dictionary for simplicity. In a real-world scenario, this would be a persistent database (e.g., PostgreSQL, Kafka, specialized event databases). - Think about how to version your events in a more complex system, though not strictly required for this challenge.
- Consider designing your
EventStoreand aggregate interaction to be thread-safe if multiple threads were to access it concurrently (again, not a strict requirement for this challenge, but good to keep in mind).