Managing Projections
Projections, a.k.a Read models, are representations of data optimized for querying and reading purposes. Projections are designed to provide data in a format that is easy and efficient to read, often tailored to the specific needs of a particular view or user interface.
Projections are typically populated in response to Domain Events raised in the domain model.
Projections
Defining a Projection
Projections are defined with the Domain.projection
decorator.
@domain.projection
class ProductInventory:
"""Projection for product inventory data optimized for querying."""
product_id = Identifier(identifier=True, required=True)
name = String(max_length=100, required=True)
description = Text(required=True)
price = Float(required=True)
stock_quantity = Integer(default=0)
Projection Configuration Options
Projections in Protean can be configured with several options passed directly to the projection decorator:
@domain.projection(
provider="postgres", # Database provider to use
schema_name="product_inventory", # Custom schema/table name
limit=50 # Default limit for queries
)
class ProductInventory:
# Projection fields and methods
pass
Storage Options
Projections can be stored in either a database or a cache, but not both simultaneously:
-
Database Storage: Use the
provider
parameter to specify which database provider to use.@domain.projection(provider="postgres") # Connect to a PostgreSQL database class ProductInventory: # Projection fields and methods pass
-
Cache Storage: Use the
cache
parameter to specify which cache provider to use.@domain.projection(cache="redis") # Store projection data in Redis cache class ProductInventory: # Projection fields and methods pass
When both cache
and provider
parameters are specified, the cache
parameter takes precedence
and the provider
parameter is ignored.
Additional Options
All options are passed directly to the projection decorator:
@domain.projection(
abstract=False, # If True, indicates this projection is an abstract base class
database_model="custom_model", # Custom model name for storage
order_by=("name",), # Default ordering for query results
schema_name="inventory", # Custom schema/table name
limit=100 # Default query result limit (set to None for no limit)
)
class ProductInventory:
# Projection fields and methods
pass
Querying Projections
Projections are optimized for querying. You can use the repository pattern to query projections:
# Get a single projection record by ID
inventory = repository.get(ProductInventory, id=1)
# Query projection with filters
low_stock_items = repository._dao.filter(
ProductInventory,
quantity__lt=10,
limit=20
)
Projectors
Projectors are specialized event handlers responsible for maintaining projections by listening to domain events and updating projection data accordingly. They provide the bridge between your domain events and read models, ensuring projections stay synchronized with changes in your domain.
Defining a Projector
Projectors are defined using the Domain.projector
decorator and must be associated with a specific projection:
@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
"""Projector that maintains the ProductInventory projection."""
@on(ProductAdded)
def on_product_added(self, event: ProductAdded):
"""Create inventory record when a new product is added."""
repository = domain.repository_for(ProductInventory)
inventory = ProductInventory(
product_id=event.product_id,
name=event.name,
description=event.description,
price=event.price,
stock_quantity=event.stock_quantity,
last_updated=event._metadata.timestamp,
)
repository.add(inventory)
@on(StockAdjusted)
def on_stock_adjusted(self, event: StockAdjusted):
"""Update inventory when stock levels change."""
repository = domain.repository_for(ProductInventory)
inventory = repository.get(event.product_id)
inventory.stock_quantity = event.new_stock_quantity
inventory.last_updated = event._metadata.timestamp
Projector Configuration Options
Projectors can be configured with several options:
@domain.projector(
projector_for=ProductInventory, # Required: The projection to maintain
aggregates=[Product, Order], # Aggregates to listen to
stream_categories=["product", "order"] # Alternative to aggregates
)
class ProductInventoryProjector:
# Event handler methods
pass
Required Configuration
projector_for
: The projection class that this projector maintains. This parameter is mandatory and establishes the relationship between the projector and its target projection.
Event Source Configuration
You must specify either aggregates
or stream_categories
(but not both):
-
aggregates
: A list of aggregate classes whose events this projector should handle. Protean automatically derives the stream categories from the specified aggregates. -
stream_categories
: A list of stream category names to listen to. This provides more fine-grained control over which event streams the projector monitors.
Event Handling with @on
Projectors use the @on
decorator (an alias for @handle
) to specify which events they respond to:
@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
"""Projector that maintains the ProductInventory projection."""
@on(ProductAdded)
def on_product_added(self, event: ProductAdded):
"""Create inventory record when a new product is added."""
repository = domain.repository_for(ProductInventory)
inventory = ProductInventory(
product_id=event.product_id,
name=event.name,
description=event.description,
price=event.price,
stock_quantity=event.stock_quantity,
last_updated=event._metadata.timestamp,
)
repository.add(inventory)
@on(StockAdjusted)
def on_stock_adjusted(self, event: StockAdjusted):
"""Update inventory when stock levels change."""
repository = domain.repository_for(ProductInventory)
inventory = repository.get(event.product_id)
inventory.stock_quantity = event.new_stock_quantity
inventory.last_updated = event._metadata.timestamp
Multiple Event Handlers
A single projector can handle multiple events, and multiple projectors can handle the same event:
@domain.projector(projector_for=OrderSummary, aggregates=[Order])
class OrderSummaryProjector:
@on(OrderCreated)
def on_order_created(self, event: OrderCreated):
# Create order summary
pass
@on(OrderShipped)
def on_order_shipped(self, event: OrderShipped):
# Update shipping status
pass
@on(OrderCancelled)
def on_order_cancelled(self, event: OrderCancelled):
# Mark as cancelled
pass
@domain.projector(projector_for=ShippingReport, aggregates=[Order])
class ShippingReportProjector:
@on(OrderShipped) # Same event, different projector
def on_order_shipped(self, event: OrderShipped):
# Update shipping metrics
pass
Projector Registration
Projectors can be registered with the domain with the Domain.projector
decorator:
Decorator Registration
@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
# Event handler methods
pass
Error Handling in Projectors
Projectors should handle errors gracefully to ensure system resilience:
@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
@on(ProductAdded)
def on_product_added(self, event: ProductAdded):
try:
repository = domain.repository_for(ProductInventory)
# Check if inventory already exists
try:
existing = repository.get(event.product_id)
# Handle duplicate case
return
except NotFoundError:
pass # Expected case - create new inventory
inventory = ProductInventory(
product_id=event.product_id,
name=event.name,
price=event.price,
stock_quantity=event.stock_quantity,
)
repository.add(inventory)
except Exception as e:
# Log error and potentially raise for retry mechanisms
logger.error(f"Failed to process ProductAdded event: {e}")
raise
Projector Workflow
The projector workflow follows this sequence:
sequenceDiagram
autonumber
Aggregate->>Event Store: Publish Event
Event Store->>Projector: Deliver Event
Projector->>Projector: Process Event
Projector->>Repository: Load Projection
Repository-->>Projector: Return Projection
Projector->>Projector: Update Projection
Projector->>Repository: Save Projection
Repository-->>Projector: Confirm Save
- Aggregate Publishes Event: Domain events are published when aggregates change state
- Event Store Delivers Event: The event store routes events to registered projectors
- Projector Processes Event: The projector receives and begins processing the event
- Load Projection: If updating existing data, the projector loads the current projection
- Update Projection: The projector applies changes based on the event data
- Save Projection: The updated projection is persisted to storage
Projection Update Strategies
There are different strategies for keeping projections up-to-date with your domain model:
- Event-driven: Respond to domain events to update projections (recommended)
- Periodic Refresh: Schedule periodic rebuilding of projections from source data
- On-demand Calculation: Generate projections when they are requested
The event-driven approach is usually preferred as it ensures projections are updated in near real-time.
Workflow
ManageInventory
Command Handler handles AdjustStock
command, loads the
product and updates it, and then persists the product, generating domain
events.
sequenceDiagram
autonumber
App->>Manage Inventory: AdjustStock object
Manage Inventory->>Manage Inventory: Extract data and load product
Manage Inventory->>product: adjust stock
product->>product: Mutate
product-->>Manage Inventory:
Manage Inventory->>Repository: Persist product
Repository->>Broker: Publish events
The events are then consumed by a projector that loads the inventory
projection
record and updates it.
sequenceDiagram
autonumber
Broker-->>Sync Inventory: Pull events
Sync Inventory->>Sync Inventory: Extract data and load inventory record
Sync Inventory->>inventory: update
inventory->>inventory: Mutate
inventory-->>Sync Inventory:
Sync Inventory->>Repository: Persist inventory record
Supported Field Types
Projections can only contain basic field types. References, Associations, and ValueObjects are not supported in projections. This is because projections are designed to be flattened, denormalized representations of data.
Best Practices
When working with projectors, consider these best practices:
Idempotency
Projectors should be idempotent to handle duplicate events gracefully:
@domain.projector(projector_for=UserProfile, aggregates=[User])
class UserProfileProjector:
@on(UserRegistered)
def on_user_registered(self, event: UserRegistered):
repository = domain.repository_for(UserProfile)
# Check if profile already exists
try:
existing_profile = repository.get(event.user_id)
# Profile already exists, skip creation
return
except NotFoundError:
pass # Expected case - create new profile
profile = UserProfile(
user_id=event.user_id,
email=event.email,
name=event.name
)
repository.add(profile)
Event Ordering
Be aware that events may not always arrive in the expected order. Design projectors to handle out-of-order events:
@domain.projector(projector_for=OrderStatus, aggregates=[Order])
class OrderStatusProjector:
@on(OrderCreated)
def on_order_created(self, event: OrderCreated):
repository = domain.repository_for(OrderStatus)
# Use event timestamp to handle ordering
status = OrderStatus(
order_id=event.order_id,
status="CREATED",
last_updated=event._metadata.timestamp
)
repository.add(status)
@on(OrderShipped)
def on_order_shipped(self, event: OrderShipped):
repository = domain.repository_for(OrderStatus)
status = repository.get(event.order_id)
# Only update if this event is newer
if event._metadata.timestamp > status.last_updated:
status.status = "SHIPPED"
status.last_updated = event._metadata.timestamp
repository.add(status)
Advanced Usage
Cross-Aggregate Projections
Projectors can listen to events from multiple aggregates to create comprehensive views:
@domain.projector(
projector_for=CustomerOrderSummary,
aggregates=[Customer, Order, Payment]
)
class CustomerOrderSummaryProjector:
@on(CustomerRegistered)
def on_customer_registered(self, event: CustomerRegistered):
# Initialize customer summary
pass
@on(OrderPlaced)
def on_order_placed(self, event: OrderPlaced):
# Update order count and total
pass
@on(PaymentProcessed)
def on_payment_processed(self, event: PaymentProcessed):
# Update payment status
pass
Stream Categories
For more granular control, use stream categories instead of aggregates:
@domain.projector(
projector_for=SystemMetrics,
stream_categories=["user", "order", "payment", "inventory"]
)
class SystemMetricsProjector:
@on(UserRegistered)
def on_user_registered(self, event: UserRegistered):
# Update user metrics
pass
@on(OrderPlaced)
def on_order_placed(self, event: OrderPlaced):
# Update order metrics
pass
Complete Example
Below is a comprehensive example showing projections and projectors working together to maintain multiple read models from a single aggregate:
from uuid import uuid4
from protean import Domain
from protean.core.projector import on
from protean.fields import DateTime, Float, Identifier, Integer, String, Text
domain = Domain()
# Process events and commands synchronously for demonstration
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"
@domain.aggregate
class Product:
name = String(max_length=100, required=True)
description = Text()
price = Float(required=True)
stock_quantity = Integer(default=0)
def adjust_stock(self, quantity):
self.stock_quantity += quantity
self.raise_(
StockAdjusted(
product_id=self.id,
quantity=quantity,
new_stock_quantity=self.stock_quantity,
)
)
@classmethod
def create(cls, name, description, price, stock_quantity=0):
product = cls(
name=name,
description=description,
price=price,
stock_quantity=stock_quantity,
)
product.raise_(
ProductAdded(
product_id=product.id,
name=product.name,
description=product.description,
price=product.price,
stock_quantity=product.stock_quantity,
)
)
return product
@domain.event(part_of=Product)
class ProductAdded:
product_id = Identifier(required=True)
name = String(max_length=100, required=True)
description = Text(required=True)
price = Float(required=True)
stock_quantity = Integer(default=0)
@domain.event(part_of=Product)
class StockAdjusted:
product_id = Identifier(required=True)
quantity = Integer(required=True)
new_stock_quantity = Integer(required=True)
@domain.projection
class ProductInventory:
"""Projection for product inventory data optimized for querying."""
product_id = Identifier(identifier=True, required=True)
name = String(max_length=100, required=True)
description = Text(required=True)
price = Float(required=True)
stock_quantity = Integer(default=0)
last_updated = DateTime()
@domain.projection
class ProductCatalog:
"""Projection for product catalog data optimized for browsing."""
product_id = Identifier(identifier=True, required=True)
name = String(max_length=100, required=True)
description = Text(required=True)
price = Float(required=True)
in_stock = String(choices=["YES", "NO"], default="YES")
@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
"""Projector that maintains the ProductInventory projection."""
@on(ProductAdded)
def on_product_added(self, event: ProductAdded):
"""Create inventory record when a new product is added."""
repository = domain.repository_for(ProductInventory)
inventory = ProductInventory(
product_id=event.product_id,
name=event.name,
description=event.description,
price=event.price,
stock_quantity=event.stock_quantity,
last_updated=event._metadata.timestamp,
)
repository.add(inventory)
@on(StockAdjusted)
def on_stock_adjusted(self, event: StockAdjusted):
"""Update inventory when stock levels change."""
repository = domain.repository_for(ProductInventory)
inventory = repository.get(event.product_id)
inventory.stock_quantity = event.new_stock_quantity
inventory.last_updated = event._metadata.timestamp
repository.add(inventory)
@domain.projector(projector_for=ProductCatalog, aggregates=[Product])
class ProductCatalogProjector:
"""Projector that maintains the ProductCatalog projection."""
@on(ProductAdded)
def on_product_added(self, event: ProductAdded):
"""Create catalog entry when a new product is added."""
repository = domain.repository_for(ProductCatalog)
catalog_entry = ProductCatalog(
product_id=event.product_id,
name=event.name,
description=event.description,
price=event.price,
in_stock="YES" if event.stock_quantity > 0 else "NO",
)
repository.add(catalog_entry)
@on(StockAdjusted)
def on_stock_adjusted(self, event: StockAdjusted):
"""Update catalog availability when stock changes."""
repository = domain.repository_for(ProductCatalog)
catalog_entry = repository.get(event.product_id)
catalog_entry.in_stock = "YES" if event.new_stock_quantity > 0 else "NO"
repository.add(catalog_entry)
# Initialize the domain
domain.init(traverse=False)
# Demonstrate the projector workflow
if __name__ == "__main__":
with domain.domain_context():
# Create a new product
product = Product.create(
name="Laptop",
description="High-performance laptop",
price=999.99,
stock_quantity=50,
)
# Persist the product (this will trigger ProductAdded event)
product_repo = domain.repository_for(Product)
product_repo.add(product)
# Verify projections were updated
inventory_repo = domain.repository_for(ProductInventory)
catalog_repo = domain.repository_for(ProductCatalog)
inventory = inventory_repo.get(product.id)
catalog = catalog_repo.get(product.id)
print("=== After Product Creation ===")
print(f"Product: {product.name} (Stock: {product.stock_quantity})")
print(
f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
)
print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")
# Adjust stock (this will trigger StockAdjusted event)
product.adjust_stock(-30) # Sell 30 units
product_repo.add(product)
# Verify projections were updated again
inventory = inventory_repo.get(product.id)
catalog = catalog_repo.get(product.id)
print("\n=== After Stock Adjustment ===")
print(f"Product: {product.name} (Stock: {product.stock_quantity})")
print(
f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
)
print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")
# Sell all remaining stock
product.adjust_stock(-20) # Sell remaining 20 units
product_repo.add(product)
# Verify out-of-stock status
inventory = inventory_repo.get(product.id)
catalog = catalog_repo.get(product.id)
print("\n=== After Selling All Stock ===")
print(f"Product: {product.name} (Stock: {product.stock_quantity})")
print(
f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
)
print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")
# Assertions for testing
assert inventory.stock_quantity == 0
assert catalog.in_stock == "NO"
print("\n✅ All projections updated correctly!")
This example demonstrates:
- Multiple Projections:
ProductInventory
for detailed inventory tracking andProductCatalog
for simplified browsing - Multiple Projectors: Each projection has its own dedicated projector
- Event Handling: Both projectors respond to the same events but update different projections
- Real-time Updates: Projections are automatically updated when domain events occur
- Different Data Formats: Each projection optimizes data for its specific use case
Testing Projectors
Testing projectors is straightforward since they respond to domain events. Here's how to test them effectively:
Unit Testing Projector Methods
Test individual projector methods by creating events and calling the methods directly:
import pytest
from protean import Domain
def test_product_inventory_projector_on_product_added():
domain = Domain()
# ... register domain elements
with domain.domain_context():
# Create test event
event = ProductAdded(
product_id="test-123",
name="Test Product",
description="A test product",
price=99.99,
stock_quantity=10
)
# Create projector instance
projector = ProductInventoryProjector()
# Call the handler method
projector.on_product_added(event)
# Verify projection was created
repository = domain.repository_for(ProductInventory)
inventory = repository.get("test-123")
assert inventory.name == "Test Product"
assert inventory.stock_quantity == 10
Integration Testing with Events
Test the complete flow by raising events from aggregates:
def test_projector_integration():
domain = Domain()
# ... register domain elements
with domain.domain_context():
# Create and persist aggregate
product = Product.create(
name="Integration Test Product",
description="Testing projector integration",
price=149.99,
stock_quantity=25
)
product_repo = domain.repository_for(Product)
product_repo.add(product) # This triggers events
# Verify projections were updated
inventory_repo = domain.repository_for(ProductInventory)
catalog_repo = domain.repository_for(ProductCatalog)
inventory = inventory_repo.get(product.id)
catalog = catalog_repo.get(product.id)
assert inventory.name == "Integration Test Product"
assert catalog.in_stock == "YES"
Testing Error Scenarios
Test how projectors handle error conditions:
def test_projector_handles_missing_projection():
domain = Domain()
# ... register domain elements
with domain.domain_context():
event = StockAdjusted(
product_id="non-existent",
quantity=-5,
new_stock_quantity=0
)
projector = ProductInventoryProjector()
# Should handle missing projection gracefully
with pytest.raises(NotFoundError):
projector.on_stock_adjusted(event)