Chapter 20: Rebuilding the World
The finance team changes their regulatory reporting requirements. The
AccountReport projection needs new fields and different aggregation
logic. Rather than writing a complex data migration, we simply
rebuild the projection from scratch: truncate the existing data,
replay all events through the updated projector, and the new projection
materializes automatically.
How Projection Rebuilding Works
- Discover the projectors for the target projection
- Truncate the existing projection data
- Read all events from the relevant stream categories in
global_positionorder - Dispatch each event through the projector handlers
- Report the result: events processed, events skipped, errors
Rebuilding Programmatically
with domain.domain_context():
result = domain.rebuild_projection("AccountReport")
print(
f"Rebuilt '{result.projection_name}': "
f"{result.events_dispatched} events processed, "
f"{result.events_skipped} skipped"
)
The RebuildResult contains:
projection_name— which projection was rebuiltevents_dispatched— number of events processedevents_skipped— events whose type could not be resolvederrors— any exceptions during processing
Rebuilding via CLI
# Rebuild a specific projection
$ protean projection rebuild --projection=AccountReport --domain=fidelis
Rebuilt projection 'AccountReport': 245,000 events processed, 12 skipped.
# Rebuild all projections
$ protean projection rebuild --domain=fidelis
AccountSummary: 245,000 events processed
AccountReport: 245,000 events processed
Rebuilt 2 projection(s), 490,000 total events processed.
Use --batch-size to control memory usage for large replays:
$ protean projection rebuild --projection=AccountReport --batch-size=1000 --domain=fidelis
Upcasters Apply Automatically
Events stored as v1 are automatically upcasted during replay. The
upcaster chain runs inside to_domain_object(), so the projector
always receives the current event version. Your projectors only need
to handle the latest schema.
Skipped Events
Events whose type cannot be resolved (deprecated events with no upcaster chain) are skipped with a warning, not crashed on. This ensures a single deprecated event type does not block the entire rebuild.
When to Rebuild
| Scenario | Action |
|---|---|
| Projector logic changed | Rebuild affected projection |
| New projection added to existing system | Rebuild to backfill data |
| Projection data corrupted | Rebuild from events |
| Schema migration | Update projector + rebuild |
| After deploying upcasters | Rebuild if projectors use upcasted events |
Stop the server first
Stop protean server before rebuilding projections to avoid
conflicts with concurrent event processing. After the rebuild,
restart the server — it will resume from the current stream
position.
Idempotent Rebuilds
Rebuilds are idempotent. The process truncates the projection table first, then replays from scratch. Running it twice produces the same result. This makes rebuilds safe to retry if they fail midway.
What We Built
domain.rebuild_projection()for programmatic rebuilding.protean projection rebuildCLI for operational use.- Automatic upcaster integration during replay.
- Graceful handling of deprecated event types.
- The confidence that projections can always be regenerated from the event store.
This is a fundamental advantage of Event Sourcing: the event store is the source of truth, and all read models can be rebuilt from it at any time.
Full Source
"""Chapter 20: Rebuilding Projections
Demonstrates how to rebuild a projection by replaying events from the event
store through its projectors. The ``domain.rebuild_projection()`` method
truncates existing projection data and replays all historical events, applying
upcasters automatically during replay.
This is useful when:
- A projector's logic changes and the read model needs to be re-derived
- A new projection is added and needs to be backfilled from existing events
- Projection data becomes corrupted or out of sync
"""
from protean import Domain, apply, handle, invariant
from protean.core.projector import on
from protean.exceptions import ValidationError
from protean.fields import DateTime, Float, Identifier, Integer, String
from protean.utils.globals import current_domain
domain = Domain("fidelis")
@domain.event(part_of="Account")
class AccountOpened:
account_id: Identifier(required=True)
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.event(part_of="Account")
class DepositMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.event(part_of="Account")
class WithdrawalMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.aggregate(is_event_sourced=True)
class Account:
account_number: String(max_length=20, required=True)
holder_name: String(max_length=100, required=True)
balance: Float(default=0.0)
status: String(max_length=20, default="ACTIVE")
@invariant.post
def balance_must_not_be_negative(self):
if self.balance is not None and self.balance < 0:
raise ValidationError(
{"balance": ["Insufficient funds: balance cannot be negative"]}
)
@classmethod
def open(cls, account_number: str, holder_name: str, opening_deposit: float):
account = cls._create_new()
account.raise_(
AccountOpened(
account_id=str(account.id),
account_number=account_number,
holder_name=holder_name,
opening_deposit=opening_deposit,
)
)
return account
def deposit(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Deposit amount must be positive"]})
self.raise_(
DepositMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
def withdraw(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
self.raise_(
WithdrawalMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
@apply
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
self.account_number = event.account_number
self.holder_name = event.holder_name
self.balance = event.opening_deposit
self.status = "ACTIVE"
@apply
def on_deposit_made(self, event: DepositMade):
self.balance += event.amount
@apply
def on_withdrawal_made(self, event: WithdrawalMade):
self.balance -= event.amount
@domain.command(part_of=Account)
class OpenAccount:
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.command(part_of=Account)
class MakeDeposit:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command(part_of=Account)
class MakeWithdrawal:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(OpenAccount)
def handle_open_account(self, command: OpenAccount):
account = Account.open(
account_number=command.account_number,
holder_name=command.holder_name,
opening_deposit=command.opening_deposit,
)
current_domain.repository_for(Account).add(account)
return str(account.id)
@handle(MakeDeposit)
def handle_make_deposit(self, command: MakeDeposit):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.deposit(command.amount, reference=command.reference)
repo.add(account)
@handle(MakeWithdrawal)
def handle_make_withdrawal(self, command: MakeWithdrawal):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.withdraw(command.amount, reference=command.reference)
repo.add(account)
@domain.projection
class AccountSummary:
"""A read-optimized view of account data for the dashboard."""
account_id: Identifier(identifier=True, required=True)
account_number: String(max_length=20, required=True)
holder_name: String(max_length=100, required=True)
balance: Float(default=0.0)
transaction_count: Integer(default=0)
last_transaction_at: DateTime()
@domain.projector(projector_for=AccountSummary, aggregates=[Account])
class AccountSummaryProjector:
"""Maintains the AccountSummary projection from Account events."""
@on(AccountOpened)
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
summary = AccountSummary(
account_id=event.account_id,
account_number=event.account_number,
holder_name=event.holder_name,
balance=event.opening_deposit,
transaction_count=1,
last_transaction_at=event._metadata.headers.time,
)
current_domain.repository_for(AccountSummary).add(summary)
@on(DepositMade)
def on_deposit_made(self, event: DepositMade):
repo = current_domain.repository_for(AccountSummary)
summary = repo.get(event.account_id)
summary.balance += event.amount
summary.transaction_count += 1
summary.last_transaction_at = event._metadata.headers.time
repo.add(summary)
@on(WithdrawalMade)
def on_withdrawal_made(self, event: WithdrawalMade):
repo = current_domain.repository_for(AccountSummary)
summary = repo.get(event.account_id)
summary.balance -= event.amount
summary.transaction_count += 1
summary.last_transaction_at = event._metadata.headers.time
repo.add(summary)
domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"
if __name__ == "__main__":
with domain.domain_context():
# Build up some history
account_id = domain.process(
OpenAccount(
account_number="ACC-001",
holder_name="Alice Johnson",
opening_deposit=1000.00,
)
)
domain.process(
MakeDeposit(
account_id=account_id,
amount=500.00,
reference="paycheck",
)
)
domain.process(
MakeWithdrawal(
account_id=account_id,
amount=200.00,
reference="groceries",
)
)
print(f"Account {account_id} created with 3 transactions")
# Verify projection is up to date
summary_repo = current_domain.repository_for(AccountSummary)
summary = summary_repo.get(account_id)
print(
f"Before rebuild - Balance: ${summary.balance:.2f}, "
f"Transactions: {summary.transaction_count}"
)
# Rebuild the projection from scratch by replaying all events
result = domain.rebuild_projection(AccountSummary)
print("\n=== Rebuild Result ===")
print(f"Projection: {result.projection_name}")
print(f"Projectors processed: {result.projectors_processed}")
print(f"Categories processed: {result.categories_processed}")
print(f"Events dispatched: {result.events_dispatched}")
print(f"Events skipped: {result.events_skipped}")
print(f"Success: {result.success}")
# Verify the rebuilt projection matches the original
rebuilt_summary = summary_repo.get(account_id)
print(
f"\nAfter rebuild - Balance: ${rebuilt_summary.balance:.2f}, "
f"Transactions: {rebuilt_summary.transaction_count}"
)
assert rebuilt_summary.balance == 1300.00 # 1000 + 500 - 200
assert rebuilt_summary.transaction_count == 3
assert result.success
print("\nAll checks passed!")