Skip to content

Managing Projections

Projections, a.k.a Read models, are representations of data optimized for querying and reading purposes. Projections are designed to provide data in a format that is easy and efficient to read, often tailored to the specific needs of a particular view or user interface.

Projections are typically populated in response to Domain Events raised in the domain model.

Projections

Defining a Projection

Projections are defined with the Domain.projection decorator.

@domain.projection
class ProductInventory:
    """Projection for product inventory data optimized for querying."""

    product_id = Identifier(identifier=True, required=True)
    name = String(max_length=100, required=True)
    description = Text(required=True)
    price = Float(required=True)
    stock_quantity = Integer(default=0)

Projection Configuration Options

Projections in Protean can be configured with several options passed directly to the projection decorator:

@domain.projection(
    provider="postgres",      # Database provider to use
    schema_name="product_inventory",  # Custom schema/table name
    limit=50                  # Default limit for queries
)
class ProductInventory:
    # Projection fields and methods
    pass

Storage Options

Projections can be stored in either a database or a cache, but not both simultaneously:

  • Database Storage: Use the provider parameter to specify which database provider to use.

    @domain.projection(provider="postgres")  # Connect to a PostgreSQL database
    class ProductInventory:
        # Projection fields and methods
        pass
    

  • Cache Storage: Use the cache parameter to specify which cache provider to use.

    @domain.projection(cache="redis")  # Store projection data in Redis cache
    class ProductInventory:
        # Projection fields and methods
        pass
    

When both cache and provider parameters are specified, the cache parameter takes precedence and the provider parameter is ignored.

Additional Options

All options are passed directly to the projection decorator:

@domain.projection(
    abstract=False,          # If True, indicates this projection is an abstract base class
    database_model="custom_model",    # Custom model name for storage
    order_by=("name",),      # Default ordering for query results
    schema_name="inventory", # Custom schema/table name
    limit=100                # Default query result limit (set to None for no limit)
)
class ProductInventory:
    # Projection fields and methods
    pass

Querying Projections

Projections are optimized for querying. You can use the repository pattern to query projections:

# Get a single projection record by ID
inventory = repository.get(ProductInventory, id=1)

# Query projection with filters
low_stock_items = repository._dao.filter(
    ProductInventory, 
    quantity__lt=10,
    limit=20
)

Projectors

Projectors are specialized event handlers responsible for maintaining projections by listening to domain events and updating projection data accordingly. They provide the bridge between your domain events and read models, ensuring projections stay synchronized with changes in your domain.

Defining a Projector

Projectors are defined using the Domain.projector decorator and must be associated with a specific projection:

@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
    """Projector that maintains the ProductInventory projection."""

    @on(ProductAdded)
    def on_product_added(self, event: ProductAdded):
        """Create inventory record when a new product is added."""
        repository = domain.repository_for(ProductInventory)

        inventory = ProductInventory(
            product_id=event.product_id,
            name=event.name,
            description=event.description,
            price=event.price,
            stock_quantity=event.stock_quantity,
            last_updated=event._metadata.timestamp,
        )

        repository.add(inventory)

    @on(StockAdjusted)
    def on_stock_adjusted(self, event: StockAdjusted):
        """Update inventory when stock levels change."""
        repository = domain.repository_for(ProductInventory)
        inventory = repository.get(event.product_id)

        inventory.stock_quantity = event.new_stock_quantity
        inventory.last_updated = event._metadata.timestamp

Projector Configuration Options

Projectors can be configured with several options:

@domain.projector(
    projector_for=ProductInventory,     # Required: The projection to maintain
    aggregates=[Product, Order],        # Aggregates to listen to
    stream_categories=["product", "order"]  # Alternative to aggregates
)
class ProductInventoryProjector:
    # Event handler methods
    pass

Required Configuration

  • projector_for: The projection class that this projector maintains. This parameter is mandatory and establishes the relationship between the projector and its target projection.

Event Source Configuration

You must specify either aggregates or stream_categories (but not both):

  • aggregates: A list of aggregate classes whose events this projector should handle. Protean automatically derives the stream categories from the specified aggregates.

  • stream_categories: A list of stream category names to listen to. This provides more fine-grained control over which event streams the projector monitors.

Event Handling with @on

Projectors use the @on decorator (an alias for @handle) to specify which events they respond to:

@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
    """Projector that maintains the ProductInventory projection."""

    @on(ProductAdded)
    def on_product_added(self, event: ProductAdded):
        """Create inventory record when a new product is added."""
        repository = domain.repository_for(ProductInventory)

        inventory = ProductInventory(
            product_id=event.product_id,
            name=event.name,
            description=event.description,
            price=event.price,
            stock_quantity=event.stock_quantity,
            last_updated=event._metadata.timestamp,
        )

        repository.add(inventory)

    @on(StockAdjusted)
    def on_stock_adjusted(self, event: StockAdjusted):
        """Update inventory when stock levels change."""
        repository = domain.repository_for(ProductInventory)
        inventory = repository.get(event.product_id)

        inventory.stock_quantity = event.new_stock_quantity
        inventory.last_updated = event._metadata.timestamp

Multiple Event Handlers

A single projector can handle multiple events, and multiple projectors can handle the same event:

@domain.projector(projector_for=OrderSummary, aggregates=[Order])
class OrderSummaryProjector:
    @on(OrderCreated)
    def on_order_created(self, event: OrderCreated):
        # Create order summary
        pass

    @on(OrderShipped)
    def on_order_shipped(self, event: OrderShipped):
        # Update shipping status
        pass

    @on(OrderCancelled)
    def on_order_cancelled(self, event: OrderCancelled):
        # Mark as cancelled
        pass

@domain.projector(projector_for=ShippingReport, aggregates=[Order])
class ShippingReportProjector:
    @on(OrderShipped)  # Same event, different projector
    def on_order_shipped(self, event: OrderShipped):
        # Update shipping metrics
        pass

Projector Registration

Projectors can be registered with the domain with the Domain.projector decorator:

Decorator Registration

@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
    # Event handler methods
    pass

Error Handling in Projectors

Projectors should handle errors gracefully to ensure system resilience:

@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
    @on(ProductAdded)
    def on_product_added(self, event: ProductAdded):
        try:
            repository = domain.repository_for(ProductInventory)

            # Check if inventory already exists
            try:
                existing = repository.get(event.product_id)
                # Handle duplicate case
                return
            except NotFoundError:
                pass  # Expected case - create new inventory

            inventory = ProductInventory(
                product_id=event.product_id,
                name=event.name,
                price=event.price,
                stock_quantity=event.stock_quantity,
            )

            repository.add(inventory)

        except Exception as e:
            # Log error and potentially raise for retry mechanisms
            logger.error(f"Failed to process ProductAdded event: {e}")
            raise

Projector Workflow

The projector workflow follows this sequence:

sequenceDiagram
  autonumber
  Aggregate->>Event Store: Publish Event
  Event Store->>Projector: Deliver Event
  Projector->>Projector: Process Event
  Projector->>Repository: Load Projection
  Repository-->>Projector: Return Projection
  Projector->>Projector: Update Projection
  Projector->>Repository: Save Projection
  Repository-->>Projector: Confirm Save
  1. Aggregate Publishes Event: Domain events are published when aggregates change state
  2. Event Store Delivers Event: The event store routes events to registered projectors
  3. Projector Processes Event: The projector receives and begins processing the event
  4. Load Projection: If updating existing data, the projector loads the current projection
  5. Update Projection: The projector applies changes based on the event data
  6. Save Projection: The updated projection is persisted to storage

Projection Update Strategies

There are different strategies for keeping projections up-to-date with your domain model:

  1. Event-driven: Respond to domain events to update projections (recommended)
  2. Periodic Refresh: Schedule periodic rebuilding of projections from source data
  3. On-demand Calculation: Generate projections when they are requested

The event-driven approach is usually preferred as it ensures projections are updated in near real-time.

Workflow

ManageInventory Command Handler handles AdjustStock command, loads the product and updates it, and then persists the product, generating domain events.

sequenceDiagram
  autonumber
  App->>Manage Inventory: AdjustStock object
  Manage Inventory->>Manage Inventory: Extract data and load product
  Manage Inventory->>product: adjust stock
  product->>product: Mutate
  product-->>Manage Inventory: 
  Manage Inventory->>Repository: Persist product
  Repository->>Broker: Publish events

The events are then consumed by a projector that loads the inventory projection record and updates it.

sequenceDiagram
  autonumber
  Broker-->>Sync Inventory: Pull events
  Sync Inventory->>Sync Inventory: Extract data and load inventory record
  Sync Inventory->>inventory: update
  inventory->>inventory: Mutate
  inventory-->>Sync Inventory: 
  Sync Inventory->>Repository: Persist inventory record

Supported Field Types

Projections can only contain basic field types. References, Associations, and ValueObjects are not supported in projections. This is because projections are designed to be flattened, denormalized representations of data.

Best Practices

When working with projectors, consider these best practices:

Idempotency

Projectors should be idempotent to handle duplicate events gracefully:

@domain.projector(projector_for=UserProfile, aggregates=[User])
class UserProfileProjector:
    @on(UserRegistered)
    def on_user_registered(self, event: UserRegistered):
        repository = domain.repository_for(UserProfile)

        # Check if profile already exists
        try:
            existing_profile = repository.get(event.user_id)
            # Profile already exists, skip creation
            return
        except NotFoundError:
            pass  # Expected case - create new profile

        profile = UserProfile(
            user_id=event.user_id,
            email=event.email,
            name=event.name
        )
        repository.add(profile)

Event Ordering

Be aware that events may not always arrive in the expected order. Design projectors to handle out-of-order events:

@domain.projector(projector_for=OrderStatus, aggregates=[Order])
class OrderStatusProjector:
    @on(OrderCreated)
    def on_order_created(self, event: OrderCreated):
        repository = domain.repository_for(OrderStatus)

        # Use event timestamp to handle ordering
        status = OrderStatus(
            order_id=event.order_id,
            status="CREATED",
            last_updated=event._metadata.timestamp
        )
        repository.add(status)

    @on(OrderShipped)
    def on_order_shipped(self, event: OrderShipped):
        repository = domain.repository_for(OrderStatus)
        status = repository.get(event.order_id)

        # Only update if this event is newer
        if event._metadata.timestamp > status.last_updated:
            status.status = "SHIPPED"
            status.last_updated = event._metadata.timestamp
            repository.add(status)

Advanced Usage

Cross-Aggregate Projections

Projectors can listen to events from multiple aggregates to create comprehensive views:

@domain.projector(
    projector_for=CustomerOrderSummary, 
    aggregates=[Customer, Order, Payment]
)
class CustomerOrderSummaryProjector:
    @on(CustomerRegistered)
    def on_customer_registered(self, event: CustomerRegistered):
        # Initialize customer summary
        pass

    @on(OrderPlaced)
    def on_order_placed(self, event: OrderPlaced):
        # Update order count and total
        pass

    @on(PaymentProcessed)
    def on_payment_processed(self, event: PaymentProcessed):
        # Update payment status
        pass

Stream Categories

For more granular control, use stream categories instead of aggregates:

@domain.projector(
    projector_for=SystemMetrics,
    stream_categories=["user", "order", "payment", "inventory"]
)
class SystemMetricsProjector:
    @on(UserRegistered)
    def on_user_registered(self, event: UserRegistered):
        # Update user metrics
        pass

    @on(OrderPlaced)
    def on_order_placed(self, event: OrderPlaced):
        # Update order metrics
        pass

Complete Example

Below is a comprehensive example showing projections and projectors working together to maintain multiple read models from a single aggregate:

from uuid import uuid4

from protean import Domain
from protean.core.projector import on
from protean.fields import DateTime, Float, Identifier, Integer, String, Text

domain = Domain()

# Process events and commands synchronously for demonstration
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"


@domain.aggregate
class Product:
    name = String(max_length=100, required=True)
    description = Text()
    price = Float(required=True)
    stock_quantity = Integer(default=0)

    def adjust_stock(self, quantity):
        self.stock_quantity += quantity
        self.raise_(
            StockAdjusted(
                product_id=self.id,
                quantity=quantity,
                new_stock_quantity=self.stock_quantity,
            )
        )

    @classmethod
    def create(cls, name, description, price, stock_quantity=0):
        product = cls(
            name=name,
            description=description,
            price=price,
            stock_quantity=stock_quantity,
        )
        product.raise_(
            ProductAdded(
                product_id=product.id,
                name=product.name,
                description=product.description,
                price=product.price,
                stock_quantity=product.stock_quantity,
            )
        )
        return product


@domain.event(part_of=Product)
class ProductAdded:
    product_id = Identifier(required=True)
    name = String(max_length=100, required=True)
    description = Text(required=True)
    price = Float(required=True)
    stock_quantity = Integer(default=0)


@domain.event(part_of=Product)
class StockAdjusted:
    product_id = Identifier(required=True)
    quantity = Integer(required=True)
    new_stock_quantity = Integer(required=True)


@domain.projection
class ProductInventory:
    """Projection for product inventory data optimized for querying."""

    product_id = Identifier(identifier=True, required=True)
    name = String(max_length=100, required=True)
    description = Text(required=True)
    price = Float(required=True)
    stock_quantity = Integer(default=0)
    last_updated = DateTime()


@domain.projection
class ProductCatalog:
    """Projection for product catalog data optimized for browsing."""

    product_id = Identifier(identifier=True, required=True)
    name = String(max_length=100, required=True)
    description = Text(required=True)
    price = Float(required=True)
    in_stock = String(choices=["YES", "NO"], default="YES")


@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
    """Projector that maintains the ProductInventory projection."""

    @on(ProductAdded)
    def on_product_added(self, event: ProductAdded):
        """Create inventory record when a new product is added."""
        repository = domain.repository_for(ProductInventory)

        inventory = ProductInventory(
            product_id=event.product_id,
            name=event.name,
            description=event.description,
            price=event.price,
            stock_quantity=event.stock_quantity,
            last_updated=event._metadata.timestamp,
        )

        repository.add(inventory)

    @on(StockAdjusted)
    def on_stock_adjusted(self, event: StockAdjusted):
        """Update inventory when stock levels change."""
        repository = domain.repository_for(ProductInventory)
        inventory = repository.get(event.product_id)

        inventory.stock_quantity = event.new_stock_quantity
        inventory.last_updated = event._metadata.timestamp

        repository.add(inventory)


@domain.projector(projector_for=ProductCatalog, aggregates=[Product])
class ProductCatalogProjector:
    """Projector that maintains the ProductCatalog projection."""

    @on(ProductAdded)
    def on_product_added(self, event: ProductAdded):
        """Create catalog entry when a new product is added."""
        repository = domain.repository_for(ProductCatalog)

        catalog_entry = ProductCatalog(
            product_id=event.product_id,
            name=event.name,
            description=event.description,
            price=event.price,
            in_stock="YES" if event.stock_quantity > 0 else "NO",
        )

        repository.add(catalog_entry)

    @on(StockAdjusted)
    def on_stock_adjusted(self, event: StockAdjusted):
        """Update catalog availability when stock changes."""
        repository = domain.repository_for(ProductCatalog)
        catalog_entry = repository.get(event.product_id)

        catalog_entry.in_stock = "YES" if event.new_stock_quantity > 0 else "NO"

        repository.add(catalog_entry)


# Initialize the domain
domain.init(traverse=False)

# Demonstrate the projector workflow
if __name__ == "__main__":
    with domain.domain_context():
        # Create a new product
        product = Product.create(
            name="Laptop",
            description="High-performance laptop",
            price=999.99,
            stock_quantity=50,
        )

        # Persist the product (this will trigger ProductAdded event)
        product_repo = domain.repository_for(Product)
        product_repo.add(product)

        # Verify projections were updated
        inventory_repo = domain.repository_for(ProductInventory)
        catalog_repo = domain.repository_for(ProductCatalog)

        inventory = inventory_repo.get(product.id)
        catalog = catalog_repo.get(product.id)

        print("=== After Product Creation ===")
        print(f"Product: {product.name} (Stock: {product.stock_quantity})")
        print(
            f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
        )
        print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")

        # Adjust stock (this will trigger StockAdjusted event)
        product.adjust_stock(-30)  # Sell 30 units
        product_repo.add(product)

        # Verify projections were updated again
        inventory = inventory_repo.get(product.id)
        catalog = catalog_repo.get(product.id)

        print("\n=== After Stock Adjustment ===")
        print(f"Product: {product.name} (Stock: {product.stock_quantity})")
        print(
            f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
        )
        print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")

        # Sell all remaining stock
        product.adjust_stock(-20)  # Sell remaining 20 units
        product_repo.add(product)

        # Verify out-of-stock status
        inventory = inventory_repo.get(product.id)
        catalog = catalog_repo.get(product.id)

        print("\n=== After Selling All Stock ===")
        print(f"Product: {product.name} (Stock: {product.stock_quantity})")
        print(
            f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
        )
        print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")

        # Assertions for testing
        assert inventory.stock_quantity == 0
        assert catalog.in_stock == "NO"
        print("\n✅ All projections updated correctly!")

This example demonstrates:

  • Multiple Projections: ProductInventory for detailed inventory tracking and ProductCatalog for simplified browsing
  • Multiple Projectors: Each projection has its own dedicated projector
  • Event Handling: Both projectors respond to the same events but update different projections
  • Real-time Updates: Projections are automatically updated when domain events occur
  • Different Data Formats: Each projection optimizes data for its specific use case

Testing Projectors

Testing projectors is straightforward since they respond to domain events. Here's how to test them effectively:

Unit Testing Projector Methods

Test individual projector methods by creating events and calling the methods directly:

import pytest
from protean import Domain

def test_product_inventory_projector_on_product_added():
    domain = Domain()
    # ... register domain elements

    with domain.domain_context():
        # Create test event
        event = ProductAdded(
            product_id="test-123",
            name="Test Product",
            description="A test product",
            price=99.99,
            stock_quantity=10
        )

        # Create projector instance
        projector = ProductInventoryProjector()

        # Call the handler method
        projector.on_product_added(event)

        # Verify projection was created
        repository = domain.repository_for(ProductInventory)
        inventory = repository.get("test-123")

        assert inventory.name == "Test Product"
        assert inventory.stock_quantity == 10

Integration Testing with Events

Test the complete flow by raising events from aggregates:

def test_projector_integration():
    domain = Domain()
    # ... register domain elements

    with domain.domain_context():
        # Create and persist aggregate
        product = Product.create(
            name="Integration Test Product",
            description="Testing projector integration",
            price=149.99,
            stock_quantity=25
        )

        product_repo = domain.repository_for(Product)
        product_repo.add(product)  # This triggers events

        # Verify projections were updated
        inventory_repo = domain.repository_for(ProductInventory)
        catalog_repo = domain.repository_for(ProductCatalog)

        inventory = inventory_repo.get(product.id)
        catalog = catalog_repo.get(product.id)

        assert inventory.name == "Integration Test Product"
        assert catalog.in_stock == "YES"

Testing Error Scenarios

Test how projectors handle error conditions:

def test_projector_handles_missing_projection():
    domain = Domain()
    # ... register domain elements

    with domain.domain_context():
        event = StockAdjusted(
            product_id="non-existent",
            quantity=-5,
            new_stock_quantity=0
        )

        projector = ProductInventoryProjector()

        # Should handle missing projection gracefully
        with pytest.raises(NotFoundError):
            projector.on_stock_adjusted(event)