Skip to content

Chapter 12: Snapshots for High-Volume Accounts

A corporate trading account has accumulated 50,000 events over the past year. Every time the system loads this account, it must replay all 50,000 events through the @apply handlers. This takes seconds — unacceptable for real-time operations.

Snapshots solve this. A snapshot captures the full aggregate state at a point in time. When loading, Protean reads the snapshot plus only the events that occurred after it, reducing replay from 50,000 events to a handful.

How Snapshots Work

Events:  [e0] [e1] [e2] ... [e49999] [e50000] [e50001] [e50002]
                                 ↑
                            Snapshot
                          (full state)

Without snapshot: replay e0 → e50002 (50,003 events)
With snapshot:    load snapshot + replay e50001 → e50002 (2 events)

Snapshots are written to a separate stream in the event store:

  • Events: fidelis::account-{id}
  • Snapshot: fidelis::account:snapshot-{id}

Configuring the Threshold

Add to your domain.toml:

snapshot_threshold = 50

With this setting, Protean automatically creates a snapshot when load_aggregate() finds more than 50 events since the last snapshot.

For development, keep the threshold low (10–50). For production, set it based on your aggregate's event rate and acceptable load latency.

Manual Snapshot Creation

You can create snapshots on demand — useful for maintenance, after migrations, or to optimize specific aggregates:

with domain.domain_context():
    # Snapshot a single account
    domain.create_snapshot(Account, "acc-corporate-001")

    # Snapshot all accounts
    domain.create_snapshots(Account)

CLI Commands

# Snapshot a specific account
$ protean snapshot create --aggregate=Account --identifier=acc-corporate-001 --domain=fidelis
Snapshot created for Account(acc-corporate-001).

# Snapshot all accounts
$ protean snapshot create --aggregate=Account --domain=fidelis
Created 47 snapshot(s) for Account.

# Snapshot all aggregates in the domain
$ protean snapshot create --domain=fidelis
  Account: 47 snapshot(s)
  Transfer: 12 snapshot(s)
Created 59 snapshot(s) across 2 aggregate(s).

What Happens During Load

When repo.get(account_id) is called:

  1. Check for a snapshot in fidelis::account:snapshot-{id}
  2. If found, deserialize the snapshot into an aggregate instance
  3. Read events after the snapshot's version from the event stream
  4. Replay only those new events through @apply handlers
  5. If the number of new events exceeds snapshot_threshold, create a fresh snapshot

If no snapshot exists, all events are replayed from the beginning — the same behavior as before.

Snapshots Are Not Source of Truth

Snapshots are an optimization, not a replacement for events:

  • Events remain the source of truth. Snapshots can always be regenerated by replaying events.
  • Deleting a snapshot does not lose data — the next repo.get() will replay all events.
  • Snapshots are created transparently. Your domain code does not need to know about them.

When to Snapshot

Scenario Threshold
Low-activity aggregates (< 100 events/year) No snapshot needed
Moderate activity (100–1,000 events/year) 100–200
High activity (1,000+ events/year) 50–100
Extreme activity (trading accounts) 25–50

The right threshold balances write overhead (creating snapshots) against read performance (replaying fewer events).

What We Built

  • Automatic snapshots via snapshot_threshold configuration.
  • Manual snapshots via domain.create_snapshot() and domain.create_snapshots().
  • CLI tools via protean snapshot create.
  • An understanding that snapshots are an optimization — events remain the source of truth.

In the next chapter, a regulatory inquiry arrives that requires looking at the past — we will use temporal queries to reconstruct the account at any historical point.

Full Source

"""Chapter 12: Snapshots

Demonstrates how snapshots optimize event-sourced aggregate loading.
Instead of replaying every event from the beginning of time, Protean
can periodically capture a snapshot of the aggregate's state.  Subsequent
loads replay only the events *after* the snapshot.

Snapshots are mostly a configuration and operational concern — the domain
code itself stays unchanged.
"""

from protean import Domain, apply, handle, invariant
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class AccountClosed:
    account_id: Identifier(required=True)
    reason: String()


@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @invariant.post
    def balance_must_not_be_negative(self):
        if self.balance is not None and self.balance < 0:
            raise ValidationError(
                {"balance": ["Insufficient funds: balance cannot be negative"]}
            )

    @invariant.post
    def closed_account_must_have_zero_balance(self):
        if self.status == "CLOSED" and self.balance != 0:
            raise ValidationError(
                {"status": ["Cannot close account with non-zero balance"]}
            )

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def close(self, reason: str = None) -> None:
        self.raise_(
            AccountClosed(
                account_id=str(self.id),
                reason=reason,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount

    @apply
    def on_account_closed(self, event: AccountClosed):
        self.status = "CLOSED"


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)


# Snapshot threshold is configured in domain.toml or pyproject.toml:
#
#   [tool.protean]
#   snapshot_threshold = 10
#
# This means a snapshot is automatically considered after every 10 events.
# You can also trigger snapshots programmatically or via the CLI:
#
#   CLI:
#     protean snapshot --domain path.to.domain --aggregate Account
#
#   Programmatic:
#     domain.create_snapshot(Account, account_id)     # Single aggregate
#     domain.create_snapshots(Account)                 # All instances
#     domain.create_all_snapshots()                    # All aggregates
domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"

# Set a low snapshot threshold for demonstration
domain.config["snapshot_threshold"] = 5


if __name__ == "__main__":
    with domain.domain_context():
        # Open an account
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=1000.00,
            )
        )
        print(f"Account opened: {account_id}")

        # Make several deposits to build up event history
        for i in range(1, 8):
            domain.process(
                MakeDeposit(
                    account_id=account_id,
                    amount=100.00,
                    reference=f"deposit-{i}",
                )
            )
        print("Made 7 deposits of $100 each")

        # At this point, 8 events exist (1 open + 7 deposits).
        # With snapshot_threshold=5, a snapshot can be created.

        # Create a snapshot programmatically
        created = domain.create_snapshot(Account, account_id)
        print(f"\nSnapshot created: {created}")

        # Now when we load the account, Protean starts from the snapshot
        # and only replays events written after the snapshot position —
        # much faster than replaying all 8 events from scratch.
        repo = current_domain.repository_for(Account)
        account = repo.get(account_id)
        print(f"\nAccount holder: {account.holder_name}")
        print(f"Balance: ${account.balance:.2f}")
        print(f"Version: {account._version}")

        assert account.balance == 1700.00  # 1000 + (7 * 100)
        print("\nAll checks passed!")

Next

Chapter 13: Looking Back in Time →