Chapter 12: Snapshots for High-Volume Accounts
A corporate trading account has accumulated 50,000 events over the past
year. Every time the system loads this account, it must replay all
50,000 events through the @apply handlers. This takes seconds —
unacceptable for real-time operations.
Snapshots solve this. A snapshot captures the full aggregate state at a point in time. When loading, Protean reads the snapshot plus only the events that occurred after it, reducing replay from 50,000 events to a handful.
How Snapshots Work
Events: [e0] [e1] [e2] ... [e49999] [e50000] [e50001] [e50002]
↑
Snapshot
(full state)
Without snapshot: replay e0 → e50002 (50,003 events)
With snapshot: load snapshot + replay e50001 → e50002 (2 events)
Snapshots are written to a separate stream in the event store:
- Events:
fidelis::account-{id} - Snapshot:
fidelis::account:snapshot-{id}
Configuring the Threshold
Add to your domain.toml:
snapshot_threshold = 50
With this setting, Protean automatically creates a snapshot when
load_aggregate() finds more than 50 events since the last snapshot.
For development, keep the threshold low (10–50). For production, set it based on your aggregate's event rate and acceptable load latency.
Manual Snapshot Creation
You can create snapshots on demand — useful for maintenance, after migrations, or to optimize specific aggregates:
with domain.domain_context():
# Snapshot a single account
domain.create_snapshot(Account, "acc-corporate-001")
# Snapshot all accounts
domain.create_snapshots(Account)
CLI Commands
# Snapshot a specific account
$ protean snapshot create --aggregate=Account --identifier=acc-corporate-001 --domain=fidelis
Snapshot created for Account(acc-corporate-001).
# Snapshot all accounts
$ protean snapshot create --aggregate=Account --domain=fidelis
Created 47 snapshot(s) for Account.
# Snapshot all aggregates in the domain
$ protean snapshot create --domain=fidelis
Account: 47 snapshot(s)
Transfer: 12 snapshot(s)
Created 59 snapshot(s) across 2 aggregate(s).
What Happens During Load
When repo.get(account_id) is called:
- Check for a snapshot in
fidelis::account:snapshot-{id} - If found, deserialize the snapshot into an aggregate instance
- Read events after the snapshot's version from the event stream
- Replay only those new events through
@applyhandlers - If the number of new events exceeds
snapshot_threshold, create a fresh snapshot
If no snapshot exists, all events are replayed from the beginning — the same behavior as before.
Snapshots Are Not Source of Truth
Snapshots are an optimization, not a replacement for events:
- Events remain the source of truth. Snapshots can always be regenerated by replaying events.
- Deleting a snapshot does not lose data — the next
repo.get()will replay all events. - Snapshots are created transparently. Your domain code does not need to know about them.
When to Snapshot
| Scenario | Threshold |
|---|---|
| Low-activity aggregates (< 100 events/year) | No snapshot needed |
| Moderate activity (100–1,000 events/year) | 100–200 |
| High activity (1,000+ events/year) | 50–100 |
| Extreme activity (trading accounts) | 25–50 |
The right threshold balances write overhead (creating snapshots) against read performance (replaying fewer events).
What We Built
- Automatic snapshots via
snapshot_thresholdconfiguration. - Manual snapshots via
domain.create_snapshot()anddomain.create_snapshots(). - CLI tools via
protean snapshot create. - An understanding that snapshots are an optimization — events remain the source of truth.
In the next chapter, a regulatory inquiry arrives that requires looking at the past — we will use temporal queries to reconstruct the account at any historical point.
Full Source
"""Chapter 12: Snapshots
Demonstrates how snapshots optimize event-sourced aggregate loading.
Instead of replaying every event from the beginning of time, Protean
can periodically capture a snapshot of the aggregate's state. Subsequent
loads replay only the events *after* the snapshot.
Snapshots are mostly a configuration and operational concern — the domain
code itself stays unchanged.
"""
from protean import Domain, apply, handle, invariant
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain
domain = Domain("fidelis")
@domain.event(part_of="Account")
class AccountOpened:
account_id: Identifier(required=True)
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.event(part_of="Account")
class DepositMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.event(part_of="Account")
class WithdrawalMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.event(part_of="Account")
class AccountClosed:
account_id: Identifier(required=True)
reason: String()
@domain.aggregate(is_event_sourced=True)
class Account:
account_number: String(max_length=20, required=True)
holder_name: String(max_length=100, required=True)
balance: Float(default=0.0)
status: String(max_length=20, default="ACTIVE")
@invariant.post
def balance_must_not_be_negative(self):
if self.balance is not None and self.balance < 0:
raise ValidationError(
{"balance": ["Insufficient funds: balance cannot be negative"]}
)
@invariant.post
def closed_account_must_have_zero_balance(self):
if self.status == "CLOSED" and self.balance != 0:
raise ValidationError(
{"status": ["Cannot close account with non-zero balance"]}
)
@classmethod
def open(cls, account_number: str, holder_name: str, opening_deposit: float):
account = cls._create_new()
account.raise_(
AccountOpened(
account_id=str(account.id),
account_number=account_number,
holder_name=holder_name,
opening_deposit=opening_deposit,
)
)
return account
def deposit(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Deposit amount must be positive"]})
self.raise_(
DepositMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
def withdraw(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
self.raise_(
WithdrawalMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
def close(self, reason: str = None) -> None:
self.raise_(
AccountClosed(
account_id=str(self.id),
reason=reason,
)
)
@apply
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
self.account_number = event.account_number
self.holder_name = event.holder_name
self.balance = event.opening_deposit
self.status = "ACTIVE"
@apply
def on_deposit_made(self, event: DepositMade):
self.balance += event.amount
@apply
def on_withdrawal_made(self, event: WithdrawalMade):
self.balance -= event.amount
@apply
def on_account_closed(self, event: AccountClosed):
self.status = "CLOSED"
@domain.command(part_of=Account)
class OpenAccount:
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.command(part_of=Account)
class MakeDeposit:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(OpenAccount)
def handle_open_account(self, command: OpenAccount):
account = Account.open(
account_number=command.account_number,
holder_name=command.holder_name,
opening_deposit=command.opening_deposit,
)
current_domain.repository_for(Account).add(account)
return str(account.id)
@handle(MakeDeposit)
def handle_make_deposit(self, command: MakeDeposit):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.deposit(command.amount, reference=command.reference)
repo.add(account)
# Snapshot threshold is configured in domain.toml or pyproject.toml:
#
# [tool.protean]
# snapshot_threshold = 10
#
# This means a snapshot is automatically considered after every 10 events.
# You can also trigger snapshots programmatically or via the CLI:
#
# CLI:
# protean snapshot --domain path.to.domain --aggregate Account
#
# Programmatic:
# domain.create_snapshot(Account, account_id) # Single aggregate
# domain.create_snapshots(Account) # All instances
# domain.create_all_snapshots() # All aggregates
domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"
# Set a low snapshot threshold for demonstration
domain.config["snapshot_threshold"] = 5
if __name__ == "__main__":
with domain.domain_context():
# Open an account
account_id = domain.process(
OpenAccount(
account_number="ACC-001",
holder_name="Alice Johnson",
opening_deposit=1000.00,
)
)
print(f"Account opened: {account_id}")
# Make several deposits to build up event history
for i in range(1, 8):
domain.process(
MakeDeposit(
account_id=account_id,
amount=100.00,
reference=f"deposit-{i}",
)
)
print("Made 7 deposits of $100 each")
# At this point, 8 events exist (1 open + 7 deposits).
# With snapshot_threshold=5, a snapshot can be created.
# Create a snapshot programmatically
created = domain.create_snapshot(Account, account_id)
print(f"\nSnapshot created: {created}")
# Now when we load the account, Protean starts from the snapshot
# and only replays events written after the snapshot position —
# much faster than replaying all 8 events from scratch.
repo = current_domain.repository_for(Account)
account = repo.get(account_id)
print(f"\nAccount holder: {account.holder_name}")
print(f"Balance: ${account.balance:.2f}")
print(f"Version: {account._version}")
assert account.balance == 1700.00 # 1000 + (7 * 100)
print("\nAll checks passed!")