Projectors
CQRS ES
Projectors are specialized event handlers responsible for maintaining projections by listening to domain events and updating projection data accordingly. They bridge your domain events and read models, ensuring projections stay synchronized with changes in your domain.
Defining a Projector
Projectors are defined using the Domain.projector decorator and must be
associated with a specific projection via projector_for:
@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
"""Projector that maintains the ProductInventory projection."""
@on(ProductAdded)
def on_product_added(self, event: ProductAdded):
"""Create inventory record when a new product is added."""
repository = domain.repository_for(ProductInventory)
inventory = ProductInventory(
product_id=event.product_id,
name=event.name,
description=event.description,
price=event.price,
stock_quantity=event.stock_quantity,
last_updated=event._metadata.headers.time,
)
repository.add(inventory)
@on(StockAdjusted)
def on_stock_adjusted(self, event: StockAdjusted):
"""Update inventory when stock levels change."""
repository = domain.repository_for(ProductInventory)
inventory = repository.get(event.product_id)
inventory.stock_quantity = event.new_stock_quantity
inventory.last_updated = event._metadata.headers.time
You must also specify which events to listen to, via either aggregates (a
list of aggregate classes — Protean derives the
stream categories
automatically) or stream_categories (a list of stream category names for
more fine-grained control). Use one or the other, not both.
Event Handling with @on
Projectors use the @on decorator (an alias for @handle) to specify which
events they respond to:
@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
"""Projector that maintains the ProductInventory projection."""
@on(ProductAdded)
def on_product_added(self, event: ProductAdded):
"""Create inventory record when a new product is added."""
repository = domain.repository_for(ProductInventory)
inventory = ProductInventory(
product_id=event.product_id,
name=event.name,
description=event.description,
price=event.price,
stock_quantity=event.stock_quantity,
last_updated=event._metadata.headers.time,
)
repository.add(inventory)
@on(StockAdjusted)
def on_stock_adjusted(self, event: StockAdjusted):
"""Update inventory when stock levels change."""
repository = domain.repository_for(ProductInventory)
inventory = repository.get(event.product_id)
inventory.stock_quantity = event.new_stock_quantity
inventory.last_updated = event._metadata.headers.time
A single projector can handle multiple events, and multiple projectors can handle the same event:
@domain.projector(projector_for=OrderSummary, aggregates=[Order])
class OrderSummaryProjector:
@on(OrderCreated)
def on_order_created(self, event: OrderCreated):
# Create order summary
pass
@on(OrderShipped)
def on_order_shipped(self, event: OrderShipped):
# Update shipping status
pass
@on(OrderCancelled)
def on_order_cancelled(self, event: OrderCancelled):
# Mark as cancelled
pass
@domain.projector(projector_for=ShippingReport, aggregates=[Order])
class ShippingReportProjector:
@on(OrderShipped) # Same event, different projector
def on_order_shipped(self, event: OrderShipped):
# Update shipping metrics
pass
Cross-Aggregate Projections
Projectors can listen to events from multiple aggregates to create comprehensive views:
@domain.projector(
projector_for=CustomerOrderSummary,
aggregates=[Customer, Order, Payment]
)
class CustomerOrderSummaryProjector:
@on(CustomerRegistered)
def on_customer_registered(self, event: CustomerRegistered):
# Initialize customer summary
pass
@on(OrderPlaced)
def on_order_placed(self, event: OrderPlaced):
# Update order count and total
pass
@on(PaymentProcessed)
def on_payment_processed(self, event: PaymentProcessed):
# Update payment status
pass
For more granular control, use stream categories instead of aggregates:
@domain.projector(
projector_for=SystemMetrics,
stream_categories=["user", "order", "payment", "inventory"]
)
class SystemMetricsProjector:
@on(UserRegistered)
def on_user_registered(self, event: UserRegistered):
# Update user metrics
pass
@on(OrderPlaced)
def on_order_placed(self, event: OrderPlaced):
# Update order metrics
pass
Idempotency
Projectors should be idempotent to handle duplicate events gracefully:
@domain.projector(projector_for=UserProfile, aggregates=[User])
class UserProfileProjector:
@on(UserRegistered)
def on_user_registered(self, event: UserRegistered):
repository = domain.repository_for(UserProfile)
# Check if profile already exists
try:
existing_profile = repository.get(event.user_id)
# Profile already exists, skip creation
return
except NotFoundError:
pass # Expected case - create new profile
profile = UserProfile(
user_id=event.user_id,
email=event.email,
name=event.name
)
repository.add(profile)
Event Ordering
Be aware that events may not always arrive in the expected order. Design projectors to handle out-of-order events:
@domain.projector(projector_for=OrderStatus, aggregates=[Order])
class OrderStatusProjector:
@on(OrderCreated)
def on_order_created(self, event: OrderCreated):
repository = domain.repository_for(OrderStatus)
# Use event timestamp to handle ordering
status = OrderStatus(
order_id=event.order_id,
status="CREATED",
last_updated=event._metadata.timestamp
)
repository.add(status)
@on(OrderShipped)
def on_order_shipped(self, event: OrderShipped):
repository = domain.repository_for(OrderStatus)
status = repository.get(event.order_id)
# Only update if this event is newer
if event._metadata.timestamp > status.last_updated:
status.status = "SHIPPED"
status.last_updated = event._metadata.timestamp
repository.add(status)
Error Handling
Projectors should handle errors gracefully to ensure system resilience:
@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
@on(ProductAdded)
def on_product_added(self, event: ProductAdded):
try:
repository = domain.repository_for(ProductInventory)
# Check if inventory already exists
try:
existing = repository.get(event.product_id)
# Handle duplicate case
return
except NotFoundError:
pass # Expected case - create new inventory
inventory = ProductInventory(
product_id=event.product_id,
name=event.name,
price=event.price,
stock_quantity=event.stock_quantity,
)
repository.add(inventory)
except Exception as e:
# Log error and potentially raise for retry mechanisms
logger.error(f"Failed to process ProductAdded event: {e}")
raise
Complete Example
Below is a comprehensive example showing projections and projectors working together to maintain multiple read models from a single aggregate:
from protean import Domain
from protean.core.projector import on
from protean.fields import DateTime, Float, Identifier, Integer, String, Text
domain = Domain()
# Process events and commands synchronously for demonstration
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"
@domain.aggregate
class Product:
name: String(max_length=100, required=True)
description: Text()
price: Float(required=True)
stock_quantity: Integer(default=0)
def adjust_stock(self, quantity):
self.stock_quantity += quantity
self.raise_(
StockAdjusted(
product_id=self.id,
quantity=quantity,
new_stock_quantity=self.stock_quantity,
)
)
@classmethod
def create(cls, name, description, price, stock_quantity=0):
product = cls(
name=name,
description=description,
price=price,
stock_quantity=stock_quantity,
)
product.raise_(
ProductAdded(
product_id=product.id,
name=product.name,
description=product.description,
price=product.price,
stock_quantity=product.stock_quantity,
)
)
return product
@domain.event(part_of=Product)
class ProductAdded:
product_id: Identifier(required=True)
name: String(max_length=100, required=True)
description: Text(required=True)
price: Float(required=True)
stock_quantity: Integer(default=0)
@domain.event(part_of=Product)
class StockAdjusted:
product_id: Identifier(required=True)
quantity: Integer(required=True)
new_stock_quantity: Integer(required=True)
@domain.projection
class ProductInventory:
"""Projection for product inventory data optimized for querying."""
product_id: Identifier(identifier=True, required=True)
name: String(max_length=100, required=True)
description: Text(required=True)
price: Float(required=True)
stock_quantity: Integer(default=0)
last_updated: DateTime()
@domain.projection
class ProductCatalog:
"""Projection for product catalog data optimized for browsing."""
product_id: Identifier(identifier=True, required=True)
name: String(max_length=100, required=True)
description: Text(required=True)
price: Float(required=True)
in_stock: String(choices=["YES", "NO"], default="YES")
@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
"""Projector that maintains the ProductInventory projection."""
@on(ProductAdded)
def on_product_added(self, event: ProductAdded):
"""Create inventory record when a new product is added."""
repository = domain.repository_for(ProductInventory)
inventory = ProductInventory(
product_id=event.product_id,
name=event.name,
description=event.description,
price=event.price,
stock_quantity=event.stock_quantity,
last_updated=event._metadata.headers.time,
)
repository.add(inventory)
@on(StockAdjusted)
def on_stock_adjusted(self, event: StockAdjusted):
"""Update inventory when stock levels change."""
repository = domain.repository_for(ProductInventory)
inventory = repository.get(event.product_id)
inventory.stock_quantity = event.new_stock_quantity
inventory.last_updated = event._metadata.headers.time
repository.add(inventory)
@domain.projector(projector_for=ProductCatalog, aggregates=[Product])
class ProductCatalogProjector:
"""Projector that maintains the ProductCatalog projection."""
@on(ProductAdded)
def on_product_added(self, event: ProductAdded):
"""Create catalog entry when a new product is added."""
repository = domain.repository_for(ProductCatalog)
catalog_entry = ProductCatalog(
product_id=event.product_id,
name=event.name,
description=event.description,
price=event.price,
in_stock="YES" if event.stock_quantity > 0 else "NO",
)
repository.add(catalog_entry)
@on(StockAdjusted)
def on_stock_adjusted(self, event: StockAdjusted):
"""Update catalog availability when stock changes."""
repository = domain.repository_for(ProductCatalog)
catalog_entry = repository.get(event.product_id)
catalog_entry.in_stock = "YES" if event.new_stock_quantity > 0 else "NO"
repository.add(catalog_entry)
# Initialize the domain
domain.init(traverse=False)
# Demonstrate the projector workflow
if __name__ == "__main__":
with domain.domain_context():
# Create a new product
product = Product.create(
name="Laptop",
description="High-performance laptop",
price=999.99,
stock_quantity=50,
)
# Persist the product (this will trigger ProductAdded event)
product_repo = domain.repository_for(Product)
product_repo.add(product)
# Verify projections were updated
inventory_repo = domain.repository_for(ProductInventory)
catalog_repo = domain.repository_for(ProductCatalog)
inventory = inventory_repo.get(product.id)
catalog = catalog_repo.get(product.id)
print("=== After Product Creation ===")
print(f"Product: {product.name} (Stock: {product.stock_quantity})")
print(
f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
)
print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")
# Adjust stock (this will trigger StockAdjusted event)
product.adjust_stock(-30) # Sell 30 units
product_repo.add(product)
# Verify projections were updated again
inventory = inventory_repo.get(product.id)
catalog = catalog_repo.get(product.id)
print("\n=== After Stock Adjustment ===")
print(f"Product: {product.name} (Stock: {product.stock_quantity})")
print(
f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
)
print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")
# Sell all remaining stock
product.adjust_stock(-20) # Sell remaining 20 units
product_repo.add(product)
# Verify out-of-stock status
inventory = inventory_repo.get(product.id)
catalog = catalog_repo.get(product.id)
print("\n=== After Selling All Stock ===")
print(f"Product: {product.name} (Stock: {product.stock_quantity})")
print(
f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
)
print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")
# Assertions for testing
assert inventory.stock_quantity == 0
assert catalog.in_stock == "NO"
print("\n✅ All projections updated correctly!")
This example demonstrates:
- Multiple Projections:
ProductInventoryfor detailed inventory tracking andProductCatalogfor simplified browsing - Multiple Projectors: Each projection has its own dedicated projector
- Event Handling: Both projectors respond to the same events but update different projections
- Real-time Updates: Projections are automatically updated when domain events occur
Testing Projectors
Unit Testing
Test individual projector methods by creating events and calling the methods directly:
import pytest
from protean import Domain
def test_product_inventory_projector_on_product_added():
domain = Domain()
# ... register domain elements
with domain.domain_context():
# Create test event
event = ProductAdded(
product_id="test-123",
name="Test Product",
description="A test product",
price=99.99,
stock_quantity=10
)
# Create projector instance
projector = ProductInventoryProjector()
# Call the handler method
projector.on_product_added(event)
# Verify projection was created
repository = domain.repository_for(ProductInventory)
inventory = repository.get("test-123")
assert inventory.name == "Test Product"
assert inventory.stock_quantity == 10
Integration Testing
Test the complete flow by raising events from aggregates:
def test_projector_integration():
domain = Domain()
# ... register domain elements
with domain.domain_context():
# Create and persist aggregate
product = Product.create(
name="Integration Test Product",
description="Testing projector integration",
price=149.99,
stock_quantity=25
)
product_repo = domain.repository_for(Product)
product_repo.add(product) # This triggers events
# Verify projections were updated
inventory_repo = domain.repository_for(ProductInventory)
catalog_repo = domain.repository_for(ProductCatalog)
inventory = inventory_repo.get(product.id)
catalog = catalog_repo.get(product.id)
assert inventory.name == "Integration Test Product"
assert catalog.in_stock == "YES"
See also
Concept overviews:
- Projections — Read-optimized views in CQRS.
- Projectors — Specialized handlers that maintain projections.
Patterns:
- Design Events for Consumers — Structuring events so projectors can build reliable read models.
- Idempotent Event Handlers — Ensuring projectors handle replayed events correctly.