Event Store Setup
ES
This guide covers choosing, configuring, and operating an event store for event-sourced aggregates. For the conceptual background, see Event Sourcing.
Choosing a provider
| Provider | Use case | External dependency |
|---|---|---|
memory |
Development, testing, prototyping | None |
message_db |
Production, durable storage | PostgreSQL + Message DB |
Both providers implement the same interface -- your domain code is identical regardless of provider.
Configuration
In-memory (default)
No configuration needed. This is the default when no [event_store]
section is present:
[event_store]
provider = "memory"
Message DB (production)
Message DB runs on PostgreSQL. Install it with Docker:
docker run -d -p 5433:5432 ethangarofolo/message-db:1.2.6
Then configure:
[event_store]
provider = "message_db"
database_uri = "postgresql://message_store@localhost:5433/message_store"
Use environment variable substitution for production:
[production.event_store]
provider = "message_db"
database_uri = "${MESSAGEDB_URL}"
Marking aggregates as event-sourced
Only aggregates marked with is_event_sourced=True use the event store
for persistence:
@domain.aggregate(is_event_sourced=True)
class Account:
balance = Float(default=0.0)
def deposit(self, amount):
self.raise_(Deposited(amount=amount))
@apply(Deposited)
def on_deposited(self, event):
self.balance += event.amount
Non-event-sourced aggregates continue to use the database provider as usual. You can mix both patterns in the same domain.
Reading events
CLI
# Read events from a specific aggregate instance
protean events read "myapp::account-acc-001" --domain=myapp
# Read from a category stream (all accounts)
protean events read "myapp::account" --limit=10 --domain=myapp
# Include event payloads
protean events read "myapp::account-acc-001" --data --domain=myapp
# Domain-wide statistics
protean events stats --domain=myapp
# Search by event type
protean events search --type=Deposited --domain=myapp
Programmatic
store = domain.event_store.store
# Read from beginning of stream
messages = store.read("myapp::account-acc-001")
# Read from a specific position
messages = store.read("myapp::account-acc-001", position=5)
# Read last message
last = store.read_last_message("myapp::account-acc-001")
Stream naming conventions
Protean generates stream names automatically:
| Stream type | Pattern | Example |
|---|---|---|
| Instance | {domain}::{category}-{id} |
myapp::account-acc-001 |
| Category | {domain}::{category} |
myapp::account |
| Command | {domain}::{category}:command-{id} |
myapp::account:command-acc-001 |
| Snapshot | {domain}::{category}:snapshot-{id} |
myapp::account:snapshot-acc-001 |
The category is derived from the aggregate class name (lowercased, underscored).
Temporal queries
Event-sourced aggregates support time-travel queries:
repo = domain.repository_for(Account)
# Load at a specific version
account = repo.get(account_id, at_version=5)
# Load state as of a point in time
from datetime import datetime
account = repo.get(account_id, as_of=datetime(2024, 6, 15, 12, 0))
See Temporal Queries for the full guide.
Snapshots
For aggregates with many events, snapshots optimize load times by storing periodic state checkpoints. See Snapshots.
See also
- Event Store Reference -- Provider configuration details.
- Message DB Reference -- Message DB-specific setup and options.
- Causation Tracing -- Tracing causal chains through the event store.