Chapter 11: When Requirements Change — Event Upcasting
Six months after launch, new anti-money-laundering regulations require
that every deposit record its source type — whether the funds came
from cash, wire transfer, ACH, or check. The DepositMade event needs a
new source_type field.
But the event store contains hundreds of thousands of historical
DepositMade events that lack this field. Rewriting history is not an
option — that defeats the purpose of Event Sourcing.
The answer is upcasting: transparently transforming old event payloads into the current schema during deserialization. The old events are never rewritten — they are upgraded on the fly whenever they are read.
Versioning the Event
First, bump the event to v2 and add the new field:
@domain.event(part_of="Account")
class DepositMade:
"""v2 adds a source_type field to track how the deposit originated."""
__version__ = "v2"
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
source_type: String(default="unknown")
The __version__ = "v2" attribute tells Protean this is the current
schema. Historical events stored as "v1" will need transformation.
Writing the Upcaster
@domain.upcaster(event_type=DepositMade, from_version="v1", to_version="v2")
class UpcastDepositV1ToV2(BaseUpcaster):
"""Transform v1 DepositMade events to v2 by adding source_type.
Old v1 events stored in the event store lack the source_type field.
This upcaster ensures they are transparently upgraded when replayed,
defaulting source_type to "unknown" since the original source is lost.
"""
def upcast(self, data: dict) -> dict:
data["source_type"] = "unknown"
return data
The upcaster is a simple class:
event_type=DepositMade— which event this upcaster handles.from_version="v1",to_version="v2"— the version transition.upcast(self, data: dict) -> dict— transforms the old payload. Here, we addsource_type = "unknown"since we cannot determine the source for historical deposits.
How It Works
When the event store reads a message with type
Fidelis.DepositMade.v1:
- Protean looks up the upcaster chain for
DepositMade - Finds
v1 → v2upcaster - Calls
upcast()to transform the data - Instantiates the current
DepositMade(v2) class with the transformed data
The historical event in the store is untouched. The transformation happens at read time, lazily.
Chaining Upcasters
Later, regulations add a currency field. Bump to v3:
@domain.event(part_of=Account)
class DepositMade:
__version__ = "v3"
account_id: Identifier(required=True)
amount: Float(required=True)
source_type: String(default="unknown")
currency: String(default="USD") # NEW in v3
reference: String()
@domain.upcaster(event_type=DepositMade, from_version="v2", to_version="v3")
class UpcastDepositV2ToV3(BaseUpcaster):
def upcast(self, data: dict) -> dict:
data["currency"] = "USD"
return data
Protean automatically chains: v1 → v2 → v3. A historical v1 event passes through both upcasters before reaching the handler.
Startup Validation
During domain.init(), Protean validates upcaster chains:
- No duplicate registrations — two upcasters for the same
from_version → to_versiontransition - No cycles — v1 → v2 → v1 would loop forever
- Convergent chains — all paths must reach the same terminal version
(the current
__version__) - No gaps — v1 → v3 without a v2 intermediate is invalid if v2 exists
If any validation fails, domain.init() raises an error immediately.
Performance
- Current events (v2 reading v2): Zero overhead. Direct type lookup, no upcasting.
- Old events (v1 reading v2): O(N) where N is the number of version hops. For v1 → v3 with two upcasters, N = 2. This is negligible.
- Never rewrite events: The event store is append-only. Upcasting respects this fundamental invariant.
What We Built
- Event versioning with
__version__ = "v2"on the event class. - An upcaster with
@domain.upcaster()that transforms old payloads. - Upcaster chains that automatically compose (v1 → v2 → v3).
- Startup validation that catches broken chains at init time.
- Lazy migration — events are never rewritten, only transformed at read time.
This is the event-sourcing answer to database migrations. No downtime, no data rewriting, no migration scripts.
Full Source
"""Chapter 11: Event Upcasting
Demonstrates how to evolve event schemas over time using upcasters.
When a stored event's version no longer matches the current event class,
an upcaster transforms the old payload into the new shape during replay.
"""
from protean import Domain, apply, handle, invariant
from protean.core.upcaster import BaseUpcaster
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain
domain = Domain("fidelis")
@domain.event(part_of="Account")
class AccountOpened:
account_id: Identifier(required=True)
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.event(part_of="Account")
class DepositMade:
"""v2 adds a source_type field to track how the deposit originated."""
__version__ = "v2"
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
source_type: String(default="unknown")
@domain.event(part_of="Account")
class WithdrawalMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.event(part_of="Account")
class AccountClosed:
account_id: Identifier(required=True)
reason: String()
@domain.aggregate(is_event_sourced=True)
class Account:
account_number: String(max_length=20, required=True)
holder_name: String(max_length=100, required=True)
balance: Float(default=0.0)
status: String(max_length=20, default="ACTIVE")
@invariant.post
def balance_must_not_be_negative(self):
if self.balance is not None and self.balance < 0:
raise ValidationError(
{"balance": ["Insufficient funds: balance cannot be negative"]}
)
@invariant.post
def closed_account_must_have_zero_balance(self):
if self.status == "CLOSED" and self.balance != 0:
raise ValidationError(
{"status": ["Cannot close account with non-zero balance"]}
)
@classmethod
def open(cls, account_number: str, holder_name: str, opening_deposit: float):
account = cls._create_new()
account.raise_(
AccountOpened(
account_id=str(account.id),
account_number=account_number,
holder_name=holder_name,
opening_deposit=opening_deposit,
)
)
return account
def deposit(
self, amount: float, reference: str = None, source_type: str = "manual"
) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Deposit amount must be positive"]})
self.raise_(
DepositMade(
account_id=str(self.id),
amount=amount,
reference=reference,
source_type=source_type,
)
)
def withdraw(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
self.raise_(
WithdrawalMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
def close(self, reason: str = None) -> None:
self.raise_(
AccountClosed(
account_id=str(self.id),
reason=reason,
)
)
@apply
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
self.account_number = event.account_number
self.holder_name = event.holder_name
self.balance = event.opening_deposit
self.status = "ACTIVE"
@apply
def on_deposit_made(self, event: DepositMade):
self.balance += event.amount
@apply
def on_withdrawal_made(self, event: WithdrawalMade):
self.balance -= event.amount
@apply
def on_account_closed(self, event: AccountClosed):
self.status = "CLOSED"
@domain.upcaster(event_type=DepositMade, from_version="v1", to_version="v2")
class UpcastDepositV1ToV2(BaseUpcaster):
"""Transform v1 DepositMade events to v2 by adding source_type.
Old v1 events stored in the event store lack the source_type field.
This upcaster ensures they are transparently upgraded when replayed,
defaulting source_type to "unknown" since the original source is lost.
"""
def upcast(self, data: dict) -> dict:
data["source_type"] = "unknown"
return data
@domain.command(part_of=Account)
class OpenAccount:
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.command(part_of=Account)
class MakeDeposit:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
source_type: String(default="manual")
@domain.command(part_of=Account)
class MakeWithdrawal:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command(part_of=Account)
class CloseAccount:
account_id: Identifier(required=True)
reason: String()
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(OpenAccount)
def handle_open_account(self, command: OpenAccount):
account = Account.open(
account_number=command.account_number,
holder_name=command.holder_name,
opening_deposit=command.opening_deposit,
)
current_domain.repository_for(Account).add(account)
return str(account.id)
@handle(MakeDeposit)
def handle_make_deposit(self, command: MakeDeposit):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.deposit(
command.amount,
reference=command.reference,
source_type=command.source_type,
)
repo.add(account)
@handle(MakeWithdrawal)
def handle_make_withdrawal(self, command: MakeWithdrawal):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.withdraw(command.amount, reference=command.reference)
repo.add(account)
@handle(CloseAccount)
def handle_close_account(self, command: CloseAccount):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.close(reason=command.reason)
repo.add(account)
domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"
if __name__ == "__main__":
with domain.domain_context():
# Open an account and make deposits with the new v2 schema
account_id = domain.process(
OpenAccount(
account_number="ACC-001",
holder_name="Alice Johnson",
opening_deposit=1000.00,
)
)
print(f"Account opened: {account_id}")
# This deposit is written with v2 schema (includes source_type)
domain.process(
MakeDeposit(
account_id=account_id,
amount=500.00,
reference="paycheck",
source_type="payroll",
)
)
print("Deposit of $500.00 made (source_type=payroll)")
# In a real system, older deposits already in the store would have
# been written as v1 (without source_type). When those events are
# replayed, the UpcastDepositV1ToV2 upcaster automatically adds
# source_type="unknown" so the v2 DepositMade class can deserialize
# them without error.
# Make another deposit with a different source
domain.process(
MakeDeposit(
account_id=account_id,
amount=250.00,
reference="wire-transfer",
source_type="bank_transfer",
)
)
print("Deposit of $250.00 made (source_type=bank_transfer)")
# Reload the account — replays all events from the store
repo = current_domain.repository_for(Account)
account = repo.get(account_id)
print(f"\nAccount holder: {account.holder_name}")
print(f"Balance: ${account.balance:.2f}")
print(f"Version: {account._version}")
# Verify the upcaster is registered
assert UpcastDepositV1ToV2.meta_.event_type == DepositMade
assert UpcastDepositV1ToV2.meta_.from_version == "v1"
assert UpcastDepositV1ToV2.meta_.to_version == "v2"
assert account.balance == 1750.00 # 1000 + 500 + 250
print("\nAll checks passed!")