Skip to content

Chapter 13: Looking Back in Time

A regulatory inquiry arrives: "What was account ACC-7742's balance on March 15th at 4:00 PM?" This is not about the current balance — it is about the balance at a specific moment in the past.

Event Sourcing makes this possible. Because we have the complete event history, we can reconstruct the aggregate at any historical point.

Querying by Version

Use at_version to reconstruct the aggregate after a specific event:

with domain.domain_context():
    repo = domain.repository_for(Account)

    # What was the state after the 5th event?
    account_v5 = repo.get("acc-7742", at_version=5)
    print(f"Balance at version 5: ${account_v5.balance:.2f}")

Versions are 0-indexed. at_version=0 gives you the state after the first event (the creation event). at_version=5 gives you the state after the sixth event.

Querying by Timestamp

Use as_of to reconstruct the aggregate at a specific point in time:

from datetime import datetime, timezone

march_15 = datetime(2025, 3, 15, 16, 0, tzinfo=timezone.utc)
account_march = repo.get("acc-7742", as_of=march_15)
print(f"Balance on March 15: ${account_march.balance:.2f}")

Protean filters events by their timestamp, including only events written on or before the requested time.

Note

at_version and as_of are mutually exclusive. You cannot use both in the same query.

Temporal Aggregates Are Read-Only

Historical state must not be modified. Protean enforces this:

try:
    account_march.raise_(DepositMade(
        account_id="acc-7742", amount=100.00
    ))
except IncorrectUsageError as e:
    print(e)
    # "Cannot raise events on a temporally-loaded aggregate..."

When an aggregate is loaded with at_version or as_of, it is marked as temporal. Any attempt to call raise_() raises IncorrectUsageError. This is a safety mechanism — you should never raise new events on a historical view.

Interaction with Snapshots

Temporal queries work with snapshots:

  • at_version: If a snapshot exists before the target version, Protean loads the snapshot and replays only the events between the snapshot and the target.
  • as_of: Snapshots are not used (they may contain events after the target timestamp). Events are replayed from the beginning.

Exploring Event History with the CLI

The protean events history command shows the complete timeline for an aggregate:

$ protean events history --aggregate=Account --id=acc-7742 --domain=fidelis
                        Account (acc-7742)
 Version  Type                      Time
 0        AccountOpened              2025-01-10 09:15:00
 1        DepositMade                2025-01-10 10:30:00
 2        DepositMade                2025-02-01 14:00:00
 3        WithdrawalMade             2025-03-01 09:00:00
 4        DepositMade                2025-03-15 11:30:00
 ...

Add --data to see full event payloads:

$ protean events history --aggregate=Account --id=acc-7742 --data --domain=fidelis

This is invaluable for debugging — you can see exactly what happened to an aggregate and when.

What We Built

  • repo.get(id, at_version=N) — reconstruct at a specific version.
  • repo.get(id, as_of=datetime) — reconstruct at a specific timestamp.
  • Read-only temporal aggregates that prevent accidental mutations.
  • Event history CLI for exploring an aggregate's timeline.

Temporal queries are a superpower unique to Event Sourcing. Traditional CRUD systems cannot answer "what was the state at time T?" without complex auditing infrastructure. With Event Sourcing, it is a one-line query.

Full Source

"""Chapter 13: Temporal Queries

Demonstrates how to load an event-sourced aggregate at a specific point
in time or at a specific version.  Temporal aggregates are read-only —
they represent a historical snapshot and cannot raise new events.
"""

from datetime import datetime, timezone

from protean import Domain, apply, handle, invariant
from protean.exceptions import IncorrectUsageError, ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class AccountClosed:
    account_id: Identifier(required=True)
    reason: String()


@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @invariant.post
    def balance_must_not_be_negative(self):
        if self.balance is not None and self.balance < 0:
            raise ValidationError(
                {"balance": ["Insufficient funds: balance cannot be negative"]}
            )

    @invariant.post
    def closed_account_must_have_zero_balance(self):
        if self.status == "CLOSED" and self.balance != 0:
            raise ValidationError(
                {"status": ["Cannot close account with non-zero balance"]}
            )

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def close(self, reason: str = None) -> None:
        self.raise_(
            AccountClosed(
                account_id=str(self.id),
                reason=reason,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount

    @apply
    def on_account_closed(self, event: AccountClosed):
        self.status = "CLOSED"


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)


domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"


if __name__ == "__main__":
    with domain.domain_context():
        # Build up a transaction history
        # Event 0: AccountOpened  -> balance $1000
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=1000.00,
            )
        )
        print(f"Account opened: {account_id}")

        # Event 1: DepositMade   -> balance $1500
        domain.process(
            MakeDeposit(account_id=account_id, amount=500.00, reference="paycheck")
        )

        # Event 2: DepositMade   -> balance $1700
        domain.process(
            MakeDeposit(account_id=account_id, amount=200.00, reference="refund")
        )

        # Capture a timestamp between events for as_of queries
        midpoint = datetime.now(timezone.utc)

        # Event 3: WithdrawalMade -> balance $1200
        domain.process(
            MakeWithdrawal(account_id=account_id, amount=500.00, reference="rent")
        )

        # Event 4: DepositMade    -> balance $1500
        domain.process(
            MakeDeposit(account_id=account_id, amount=300.00, reference="freelance")
        )

        repo = current_domain.repository_for(Account)

        # --- Current state ---
        current = repo.get(account_id)
        print(f"\nCurrent balance: ${current.balance:.2f} (version {current._version})")
        assert current.balance == 1500.00

        # --- Temporal query: at_version ---
        # at_version=2 replays events 0, 1, and 2 (the first 3 events)
        historical = repo.get(account_id, at_version=2)
        print(
            f"Balance at version 2: ${historical.balance:.2f} "
            f"(version {historical._version})"
        )
        assert historical.balance == 1700.00  # 1000 + 500 + 200

        # --- Temporal query: as_of ---
        # Load the aggregate as it was at the midpoint timestamp
        snapshot_in_time = repo.get(account_id, as_of=midpoint)
        print(
            f"Balance at midpoint: ${snapshot_in_time.balance:.2f} "
            f"(version {snapshot_in_time._version})"
        )
        assert snapshot_in_time.balance == 1700.00  # Before rent withdrawal

        # --- Temporal aggregates are read-only ---
        try:
            historical.raise_(
                DepositMade(
                    account_id=str(historical.id),
                    amount=100.00,
                    reference="should-fail",
                )
            )
        except IncorrectUsageError as e:
            print(f"\nRead-only guard: {e.args[0]}")

        print("\nAll checks passed!")

Next

Chapter 14: Connecting to the Outside World →