Event Sourcing with Protean
Builds on CQRS
Event Sourcing extends the CQRS pathway with event replay, temporal queries, and full audit trails. If you haven't read the CQRS pathway yet, start there — Event Sourcing builds on those concepts.
Overview
In Event Sourcing, aggregate state is not stored as a snapshot in a database. Instead, every state change is captured as an event, and the aggregate's current state is reconstructed by replaying its event history. This gives you a complete, immutable audit trail and the ability to answer "what was the state at time T?" for any aggregate.
In Protean, you enable Event Sourcing on a per-aggregate basis by marking
it with is_event_sourced=True. The aggregate uses @apply methods to
define how each event type mutates state, and an Event Sourced Repository
handles persistence to the event store.
Request Flow
sequenceDiagram
autonumber
participant API as API Layer
participant D as domain.process()
participant CH as Command Handler
participant ESR as ES Repository
participant ES as Event Store
participant Agg as Aggregate
participant P as Projector
participant Proj as Projection
API->>D: Submit Command
D->>CH: Dispatch to handler
CH->>ESR: Load aggregate
ESR->>ES: Fetch events for stream
ES-->>ESR: Event history
ESR->>Agg: Replay events to rebuild state
CH->>Agg: Invoke domain method
Agg->>Agg: Raise new event(s)
CH->>ESR: Persist aggregate
ESR->>ES: Append new events
ES-->>P: Events dispatched
P->>Proj: Update read model
- The API layer submits a Command via
domain.process() - The Command Handler asks the Event Sourced Repository to load the aggregate
- The repository fetches all events for that aggregate from the Event Store
- It replays them through the aggregate's
@applymethods to rebuild current state - The handler invokes the domain method, which raises new events
- The repository appends the new events to the Event Store
- Events flow to Projectors that update read-optimized Projections
What Event Sourcing Adds to CQRS
| CQRS Element | Event Sourcing Change |
|---|---|
@domain.aggregate |
Add is_event_sourced=True option |
| Aggregate methods | Use @apply decorator for event application |
@domain.repository |
Replace with @domain.event_sourced_repository |
| Persistence Store | Replace with Event Store (e.g., Message DB) |
| Everything else | Commands, Command Handlers, Projections, Projectors — unchanged |
Elements You'll Use
Everything from the CQRS pathway, with these changes:
| Element | Purpose |
|---|---|
| Event Sourced Aggregates | Aggregates with is_event_sourced=True that derive state from events |
@apply decorator |
Methods that define how each event type mutates aggregate state |
| Event Sourced Repository | Persists events and reconstructs aggregates from event streams |
| Fact Events | Auto-generated snapshot events capturing full aggregate state |
| Event Store adapter | Infrastructure for storing and retrieving events (e.g., Message DB) |
Guided Reading Order
If you've already worked through the CQRS pathway, pick up from step 1 below. These guides cover the Event Sourcing-specific concepts:
| Step | Guide | What You'll Learn |
|---|---|---|
| 1 | Event Sourcing Concepts | Theory and workflow of event sourcing |
| 2 | Events | Delta events vs. fact events |
| 3 | Raising Events | How aggregates raise events and the @apply decorator |
| 4 | Persist Aggregates | Event sourced repositories |
| 5 | Stream Categories | How events are organized into streams |
| 6 | Projections | Build read models from event streams |
| 7 | Server | Process events asynchronously |
| 8 | Event Store adapters | Configure Message DB or other event stores |
| 9 | Architecture Decision | When to use ES vs. CQRS per aggregate |
| 10 | Testing | Test event-sourced aggregates and projections |
Key Concepts
The @apply Decorator
In event-sourced aggregates, state changes are expressed through events.
The @apply decorator marks methods that define how each event type
mutates the aggregate's state:
@domain.aggregate(is_event_sourced=True)
class Order:
status = String(default="draft")
total = Float(default=0.0)
@apply
def placed(self, event: OrderPlaced):
self.status = "placed"
self.total = event.total
@apply
def cancelled(self, event: OrderCancelled):
self.status = "cancelled"
Fact Events
When fact_events=True is set on an aggregate, Protean auto-generates a
snapshot event after every state change. This captures the complete current
state of the aggregate, making it easy for downstream consumers to build
projections without replaying the full event history.
Mixing Patterns Per Aggregate
Protean allows mixing CQRS and Event Sourcing at the aggregate level within the same domain. Some aggregates can use standard repositories while others use event sourcing — the decision is explicit and per-aggregate:
@domain.aggregate # Standard CQRS — state stored as snapshots
class Product:
...
@domain.aggregate(is_event_sourced=True) # Event Sourced — state from events
class Order:
...
See the Architecture Decision guide for criteria on choosing the right pattern per aggregate.