Skip to content

Chapter 11: When Requirements Change — Event Upcasting

Six months after launch, new anti-money-laundering regulations require that every deposit record its source type — whether the funds came from cash, wire transfer, ACH, or check. The DepositMade event needs a new source_type field.

But the event store contains hundreds of thousands of historical DepositMade events that lack this field. Rewriting history is not an option — that defeats the purpose of Event Sourcing.

The answer is upcasting: transparently transforming old event payloads into the current schema during deserialization. The old events are never rewritten — they are upgraded on the fly whenever they are read.

Versioning the Event

First, bump the event to v2 and add the new field:

@domain.event(part_of="Account")
class DepositMade:
    """v2 adds a source_type field to track how the deposit originated."""

    __version__ = "v2"

    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()
    source_type: String(default="unknown")

The __version__ = "v2" attribute tells Protean this is the current schema. Historical events stored as "v1" will need transformation.

Writing the Upcaster

@domain.upcaster(event_type=DepositMade, from_version="v1", to_version="v2")
class UpcastDepositV1ToV2(BaseUpcaster):
    """Transform v1 DepositMade events to v2 by adding source_type.

    Old v1 events stored in the event store lack the source_type field.
    This upcaster ensures they are transparently upgraded when replayed,
    defaulting source_type to "unknown" since the original source is lost.
    """

    def upcast(self, data: dict) -> dict:
        data["source_type"] = "unknown"
        return data

The upcaster is a simple class:

  • event_type=DepositMade — which event this upcaster handles.
  • from_version="v1", to_version="v2" — the version transition.
  • upcast(self, data: dict) -> dict — transforms the old payload. Here, we add source_type = "unknown" since we cannot determine the source for historical deposits.

How It Works

When the event store reads a message with type Fidelis.DepositMade.v1:

  1. Protean looks up the upcaster chain for DepositMade
  2. Finds v1 → v2 upcaster
  3. Calls upcast() to transform the data
  4. Instantiates the current DepositMade (v2) class with the transformed data

The historical event in the store is untouched. The transformation happens at read time, lazily.

Chaining Upcasters

Later, regulations add a currency field. Bump to v3:

@domain.event(part_of=Account)
class DepositMade:
    __version__ = "v3"

    account_id: Identifier(required=True)
    amount: Float(required=True)
    source_type: String(default="unknown")
    currency: String(default="USD")  # NEW in v3
    reference: String()


@domain.upcaster(event_type=DepositMade, from_version="v2", to_version="v3")
class UpcastDepositV2ToV3(BaseUpcaster):
    def upcast(self, data: dict) -> dict:
        data["currency"] = "USD"
        return data

Protean automatically chains: v1 → v2 → v3. A historical v1 event passes through both upcasters before reaching the handler.

Startup Validation

During domain.init(), Protean validates upcaster chains:

  • No duplicate registrations — two upcasters for the same from_version → to_version transition
  • No cycles — v1 → v2 → v1 would loop forever
  • Convergent chains — all paths must reach the same terminal version (the current __version__)
  • No gaps — v1 → v3 without a v2 intermediate is invalid if v2 exists

If any validation fails, domain.init() raises an error immediately.

Performance

  • Current events (v2 reading v2): Zero overhead. Direct type lookup, no upcasting.
  • Old events (v1 reading v2): O(N) where N is the number of version hops. For v1 → v3 with two upcasters, N = 2. This is negligible.
  • Never rewrite events: The event store is append-only. Upcasting respects this fundamental invariant.

What We Built

  • Event versioning with __version__ = "v2" on the event class.
  • An upcaster with @domain.upcaster() that transforms old payloads.
  • Upcaster chains that automatically compose (v1 → v2 → v3).
  • Startup validation that catches broken chains at init time.
  • Lazy migration — events are never rewritten, only transformed at read time.

This is the event-sourcing answer to database migrations. No downtime, no data rewriting, no migration scripts.

Full Source

"""Chapter 11: Event Upcasting

Demonstrates how to evolve event schemas over time using upcasters.
When a stored event's version no longer matches the current event class,
an upcaster transforms the old payload into the new shape during replay.
"""

from protean import Domain, apply, handle, invariant
from protean.core.upcaster import BaseUpcaster
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    """v2 adds a source_type field to track how the deposit originated."""

    __version__ = "v2"

    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()
    source_type: String(default="unknown")


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class AccountClosed:
    account_id: Identifier(required=True)
    reason: String()


@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @invariant.post
    def balance_must_not_be_negative(self):
        if self.balance is not None and self.balance < 0:
            raise ValidationError(
                {"balance": ["Insufficient funds: balance cannot be negative"]}
            )

    @invariant.post
    def closed_account_must_have_zero_balance(self):
        if self.status == "CLOSED" and self.balance != 0:
            raise ValidationError(
                {"status": ["Cannot close account with non-zero balance"]}
            )

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(
        self, amount: float, reference: str = None, source_type: str = "manual"
    ) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
                source_type=source_type,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def close(self, reason: str = None) -> None:
        self.raise_(
            AccountClosed(
                account_id=str(self.id),
                reason=reason,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount

    @apply
    def on_account_closed(self, event: AccountClosed):
        self.status = "CLOSED"


@domain.upcaster(event_type=DepositMade, from_version="v1", to_version="v2")
class UpcastDepositV1ToV2(BaseUpcaster):
    """Transform v1 DepositMade events to v2 by adding source_type.

    Old v1 events stored in the event store lack the source_type field.
    This upcaster ensures they are transparently upgraded when replayed,
    defaulting source_type to "unknown" since the original source is lost.
    """

    def upcast(self, data: dict) -> dict:
        data["source_type"] = "unknown"
        return data


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()
    source_type: String(default="manual")


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class CloseAccount:
    account_id: Identifier(required=True)
    reason: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(
            command.amount,
            reference=command.reference,
            source_type=command.source_type,
        )
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)

    @handle(CloseAccount)
    def handle_close_account(self, command: CloseAccount):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.close(reason=command.reason)
        repo.add(account)


domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"
if __name__ == "__main__":
    with domain.domain_context():
        # Open an account and make deposits with the new v2 schema
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=1000.00,
            )
        )
        print(f"Account opened: {account_id}")

        # This deposit is written with v2 schema (includes source_type)
        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=500.00,
                reference="paycheck",
                source_type="payroll",
            )
        )
        print("Deposit of $500.00 made (source_type=payroll)")

        # In a real system, older deposits already in the store would have
        # been written as v1 (without source_type). When those events are
        # replayed, the UpcastDepositV1ToV2 upcaster automatically adds
        # source_type="unknown" so the v2 DepositMade class can deserialize
        # them without error.

        # Make another deposit with a different source
        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=250.00,
                reference="wire-transfer",
                source_type="bank_transfer",
            )
        )
        print("Deposit of $250.00 made (source_type=bank_transfer)")

        # Reload the account — replays all events from the store
        repo = current_domain.repository_for(Account)
        account = repo.get(account_id)
        print(f"\nAccount holder: {account.holder_name}")
        print(f"Balance: ${account.balance:.2f}")
        print(f"Version: {account._version}")

        # Verify the upcaster is registered
        assert UpcastDepositV1ToV2.meta_.event_type == DepositMade
        assert UpcastDepositV1ToV2.meta_.from_version == "v1"
        assert UpcastDepositV1ToV2.meta_.to_version == "v2"

        assert account.balance == 1750.00  # 1000 + 500 + 250
        print("\nAll checks passed!")

Next

Chapter 12: Snapshots for High-Volume Accounts →