Skip to content

Projectors

CQRS ES

Projectors are specialized event handlers responsible for maintaining projections by listening to domain events and updating projection data accordingly. They bridge your domain events and read models, ensuring projections stay synchronized with changes in your domain.

How Projectors Work

When an aggregate raises an event, the following sequence occurs:

sequenceDiagram
  autonumber
  Aggregate->>Event Store: Persist event
  Event Store-->>Engine: New event available
  Engine->>Engine: Match event to subscribed projectors
  Engine->>Projector: Invoke @on handler (within UoW)
  Projector->>Repository: Load/create projection record
  Projector->>Repository: Persist updated projection
  Note over Projector,Repository: UoW commits atomically
  1. An aggregate raises an event and is persisted. The event is written to the event store.
  2. The Protean Engine's subscription detects the new event.
  3. The engine matches the event to all projectors subscribed to that stream category.
  4. Each matching @on handler is invoked within its own Unit of Work.
  5. The handler loads or creates the projection record via current_domain.repository_for().
  6. Changes to the projection are committed atomically when the UoW completes.

If the handler raises an exception, the UoW rolls back and the event may be retried depending on the subscription configuration.

Defining a Projector

Projectors are defined using the Domain.projector decorator and must be associated with a specific projection via projector_for:

@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
    """Projector that maintains the ProductInventory projection."""

    @on(ProductAdded)
    def on_product_added(self, event: ProductAdded):
        """Create inventory record when a new product is added."""
        repository = domain.repository_for(ProductInventory)

        inventory = ProductInventory(
            product_id=event.product_id,
            name=event.name,
            description=event.description,
            price=event.price,
            stock_quantity=event.stock_quantity,
            last_updated=event._metadata.headers.time,
        )

        repository.add(inventory)

    @on(StockAdjusted)
    def on_stock_adjusted(self, event: StockAdjusted):
        """Update inventory when stock levels change."""
        repository = domain.repository_for(ProductInventory)
        inventory = repository.get(event.product_id)

        inventory.stock_quantity = event.new_stock_quantity
        inventory.last_updated = event._metadata.headers.time

You must also specify which events to listen to, via either aggregates (a list of aggregate classes — Protean derives the stream categories automatically) or stream_categories (a list of stream category names for more fine-grained control). When both are specified, stream_categories takes precedence.

Required: projector_for

Every projector must specify projector_for — the projection class it maintains. Omitting it raises IncorrectUsageError.

Event Handling with @on

Projectors use the @on decorator (an alias for @handle) to specify which events they respond to:

@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
    """Projector that maintains the ProductInventory projection."""

    @on(ProductAdded)
    def on_product_added(self, event: ProductAdded):
        """Create inventory record when a new product is added."""
        repository = domain.repository_for(ProductInventory)

        inventory = ProductInventory(
            product_id=event.product_id,
            name=event.name,
            description=event.description,
            price=event.price,
            stock_quantity=event.stock_quantity,
            last_updated=event._metadata.headers.time,
        )

        repository.add(inventory)

    @on(StockAdjusted)
    def on_stock_adjusted(self, event: StockAdjusted):
        """Update inventory when stock levels change."""
        repository = domain.repository_for(ProductInventory)
        inventory = repository.get(event.product_id)

        inventory.stock_quantity = event.new_stock_quantity
        inventory.last_updated = event._metadata.headers.time

A single projector can handle multiple events, and multiple projectors can handle the same event:

@domain.projector(projector_for=OrderSummary, aggregates=[Order])
class OrderSummaryProjector:
    @on(OrderCreated)
    def on_order_created(self, event: OrderCreated):
        # Create order summary
        pass

    @on(OrderShipped)
    def on_order_shipped(self, event: OrderShipped):
        # Update shipping status
        pass

    @on(OrderCancelled)
    def on_order_cancelled(self, event: OrderCancelled):
        # Mark as cancelled
        pass

@domain.projector(projector_for=ShippingReport, aggregates=[Order])
class ShippingReportProjector:
    @on(OrderShipped)  # Same event, different projector
    def on_order_shipped(self, event: OrderShipped):
        # Update shipping metrics
        pass

Cross-Aggregate Projections

Projectors can listen to events from multiple aggregates to create comprehensive views:

@domain.projector(
    projector_for=CustomerOrderSummary,
    aggregates=[Customer, Order, Payment]
)
class CustomerOrderSummaryProjector:
    @on(CustomerRegistered)
    def on_customer_registered(self, event: CustomerRegistered):
        # Initialize customer summary
        pass

    @on(OrderPlaced)
    def on_order_placed(self, event: OrderPlaced):
        # Update order count and total
        pass

    @on(PaymentProcessed)
    def on_payment_processed(self, event: PaymentProcessed):
        # Update payment status
        pass

For more granular control, use stream categories instead of aggregates:

@domain.projector(
    projector_for=SystemMetrics,
    stream_categories=["user", "order", "payment", "inventory"]
)
class SystemMetricsProjector:
    @on(UserRegistered)
    def on_user_registered(self, event: UserRegistered):
        # Update user metrics
        pass

    @on(OrderPlaced)
    def on_order_placed(self, event: OrderPlaced):
        # Update order metrics
        pass

Idempotency

Projectors should be idempotent to handle duplicate events gracefully:

from protean import current_domain
from protean.exceptions import ObjectNotFoundError

@domain.projector(projector_for=UserProfile, aggregates=[User])
class UserProfileProjector:
    @on(UserRegistered)
    def on_user_registered(self, event: UserRegistered):
        repository = current_domain.repository_for(UserProfile)

        # Check if profile already exists
        try:
            existing_profile = repository.get(event.user_id)
            # Profile already exists, skip creation
            return
        except ObjectNotFoundError:
            pass  # Expected case - create new profile

        profile = UserProfile(
            user_id=event.user_id,
            email=event.email,
            name=event.name
        )
        repository.add(profile)

Event Ordering

Be aware that events may not always arrive in the expected order. Design projectors to handle out-of-order events:

@domain.projector(projector_for=OrderStatus, aggregates=[Order])
class OrderStatusProjector:
    @on(OrderCreated)
    def on_order_created(self, event: OrderCreated):
        repository = current_domain.repository_for(OrderStatus)

        # Use event timestamp to handle ordering
        status = OrderStatus(
            order_id=event.order_id,
            status="CREATED",
            last_updated=event._metadata.timestamp
        )
        repository.add(status)

    @on(OrderShipped)
    def on_order_shipped(self, event: OrderShipped):
        repository = current_domain.repository_for(OrderStatus)
        status = repository.get(event.order_id)

        # Only update if this event is newer
        if event._metadata.timestamp > status.last_updated:
            status.status = "SHIPPED"
            status.last_updated = event._metadata.timestamp
            repository.add(status)

Error Handling

Projectors should handle errors gracefully to ensure system resilience:

@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
    @on(ProductAdded)
    def on_product_added(self, event: ProductAdded):
        try:
            repository = current_domain.repository_for(ProductInventory)

            # Check if inventory already exists
            try:
                existing = repository.get(event.product_id)
                # Handle duplicate case
                return
            except ObjectNotFoundError:
                pass  # Expected case - create new inventory

            inventory = ProductInventory(
                product_id=event.product_id,
                name=event.name,
                price=event.price,
                stock_quantity=event.stock_quantity,
            )

            repository.add(inventory)

        except Exception as e:
            # Log error and potentially raise for retry mechanisms
            logger.error(f"Failed to process ProductAdded event: {e}")
            raise

Handling Deletions

When a projection record should be removed in response to an event, use repository.remove():

@domain.projector(projector_for=ActiveOrder, aggregates=[Order])
class ActiveOrderProjector:
    @on(OrderPlaced)
    def on_order_placed(self, event: OrderPlaced):
        repository = current_domain.repository_for(ActiveOrder)
        repository.add(ActiveOrder(
            order_id=event.order_id,
            customer_name=event.customer_name,
            status="placed",
        ))

    @on(OrderDelivered)
    def on_order_delivered(self, event: OrderDelivered):
        repository = current_domain.repository_for(ActiveOrder)
        order = repository.get(event.order_id)
        repository.remove(order)

Complete Example

Below is a comprehensive example showing projections and projectors working together to maintain multiple read models from a single aggregate:

from protean import Domain
from protean.core.projector import on
from protean.fields import DateTime, Float, Identifier, Integer, String, Text

domain = Domain()

# Process events and commands synchronously for demonstration
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"


@domain.aggregate
class Product:
    name: String(max_length=100, required=True)
    description: Text()
    price: Float(required=True)
    stock_quantity: Integer(default=0)

    def adjust_stock(self, quantity):
        self.stock_quantity += quantity
        self.raise_(
            StockAdjusted(
                product_id=self.id,
                quantity=quantity,
                new_stock_quantity=self.stock_quantity,
            )
        )

    @classmethod
    def create(cls, name, description, price, stock_quantity=0):
        product = cls(
            name=name,
            description=description,
            price=price,
            stock_quantity=stock_quantity,
        )
        product.raise_(
            ProductAdded(
                product_id=product.id,
                name=product.name,
                description=product.description,
                price=product.price,
                stock_quantity=product.stock_quantity,
            )
        )
        return product


@domain.event(part_of=Product)
class ProductAdded:
    product_id: Identifier(required=True)
    name: String(max_length=100, required=True)
    description: Text(required=True)
    price: Float(required=True)
    stock_quantity: Integer(default=0)


@domain.event(part_of=Product)
class StockAdjusted:
    product_id: Identifier(required=True)
    quantity: Integer(required=True)
    new_stock_quantity: Integer(required=True)


@domain.projection
class ProductInventory:
    """Projection for product inventory data optimized for querying."""

    product_id: Identifier(identifier=True, required=True)
    name: String(max_length=100, required=True)
    description: Text(required=True)
    price: Float(required=True)
    stock_quantity: Integer(default=0)
    last_updated: DateTime()


@domain.projection
class ProductCatalog:
    """Projection for product catalog data optimized for browsing."""

    product_id: Identifier(identifier=True, required=True)
    name: String(max_length=100, required=True)
    description: Text(required=True)
    price: Float(required=True)
    in_stock: String(choices=["YES", "NO"], default="YES")


@domain.projector(projector_for=ProductInventory, aggregates=[Product])
class ProductInventoryProjector:
    """Projector that maintains the ProductInventory projection."""

    @on(ProductAdded)
    def on_product_added(self, event: ProductAdded):
        """Create inventory record when a new product is added."""
        repository = domain.repository_for(ProductInventory)

        inventory = ProductInventory(
            product_id=event.product_id,
            name=event.name,
            description=event.description,
            price=event.price,
            stock_quantity=event.stock_quantity,
            last_updated=event._metadata.headers.time,
        )

        repository.add(inventory)

    @on(StockAdjusted)
    def on_stock_adjusted(self, event: StockAdjusted):
        """Update inventory when stock levels change."""
        repository = domain.repository_for(ProductInventory)
        inventory = repository.get(event.product_id)

        inventory.stock_quantity = event.new_stock_quantity
        inventory.last_updated = event._metadata.headers.time

        repository.add(inventory)


@domain.projector(projector_for=ProductCatalog, aggregates=[Product])
class ProductCatalogProjector:
    """Projector that maintains the ProductCatalog projection."""

    @on(ProductAdded)
    def on_product_added(self, event: ProductAdded):
        """Create catalog entry when a new product is added."""
        repository = domain.repository_for(ProductCatalog)

        catalog_entry = ProductCatalog(
            product_id=event.product_id,
            name=event.name,
            description=event.description,
            price=event.price,
            in_stock="YES" if event.stock_quantity > 0 else "NO",
        )

        repository.add(catalog_entry)

    @on(StockAdjusted)
    def on_stock_adjusted(self, event: StockAdjusted):
        """Update catalog availability when stock changes."""
        repository = domain.repository_for(ProductCatalog)
        catalog_entry = repository.get(event.product_id)

        catalog_entry.in_stock = "YES" if event.new_stock_quantity > 0 else "NO"

        repository.add(catalog_entry)


# Initialize the domain
domain.init(traverse=False)

# Demonstrate the projector workflow
if __name__ == "__main__":
    with domain.domain_context():
        # Create a new product
        product = Product.create(
            name="Laptop",
            description="High-performance laptop",
            price=999.99,
            stock_quantity=50,
        )

        # Persist the product (this will trigger ProductAdded event)
        product_repo = domain.repository_for(Product)
        product_repo.add(product)

        # Verify projections were updated
        inventory_repo = domain.repository_for(ProductInventory)
        catalog_repo = domain.repository_for(ProductCatalog)

        inventory = inventory_repo.get(product.id)
        catalog = catalog_repo.get(product.id)

        print("=== After Product Creation ===")
        print(f"Product: {product.name} (Stock: {product.stock_quantity})")
        print(
            f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
        )
        print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")

        # Adjust stock (this will trigger StockAdjusted event)
        product.adjust_stock(-30)  # Sell 30 units
        product_repo.add(product)

        # Verify projections were updated again
        inventory = inventory_repo.get(product.id)
        catalog = catalog_repo.get(product.id)

        print("\n=== After Stock Adjustment ===")
        print(f"Product: {product.name} (Stock: {product.stock_quantity})")
        print(
            f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
        )
        print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")

        # Sell all remaining stock
        product.adjust_stock(-20)  # Sell remaining 20 units
        product_repo.add(product)

        # Verify out-of-stock status
        inventory = inventory_repo.get(product.id)
        catalog = catalog_repo.get(product.id)

        print("\n=== After Selling All Stock ===")
        print(f"Product: {product.name} (Stock: {product.stock_quantity})")
        print(
            f"Inventory Projection: {inventory.name} (Stock: {inventory.stock_quantity})"
        )
        print(f"Catalog Projection: {catalog.name} (In Stock: {catalog.in_stock})")

        # Assertions for testing
        assert inventory.stock_quantity == 0
        assert catalog.in_stock == "NO"
        print("\n✅ All projections updated correctly!")

This example demonstrates:

  • Multiple Projections: ProductInventory for detailed inventory tracking and ProductCatalog for simplified browsing
  • Multiple Projectors: Each projection has its own dedicated projector
  • Event Handling: Both projectors respond to the same events but update different projections
  • Real-time Updates: Projections are automatically updated when domain events occur

Testing Projectors

Unit Testing

Test individual projector methods by creating events and calling the methods directly:

import pytest
from protean import Domain

def test_product_inventory_projector_on_product_added():
    domain = Domain()
    # ... register domain elements

    with domain.domain_context():
        # Create test event
        event = ProductAdded(
            product_id="test-123",
            name="Test Product",
            description="A test product",
            price=99.99,
            stock_quantity=10
        )

        # Create projector instance
        projector = ProductInventoryProjector()

        # Call the handler method
        projector.on_product_added(event)

        # Verify projection was created
        repository = domain.repository_for(ProductInventory)
        inventory = repository.get("test-123")

        assert inventory.name == "Test Product"
        assert inventory.stock_quantity == 10

Integration Testing

Test the complete flow by raising events from aggregates:

def test_projector_integration():
    domain = Domain()
    # ... register domain elements

    with domain.domain_context():
        # Create and persist aggregate
        product = Product.create(
            name="Integration Test Product",
            description="Testing projector integration",
            price=149.99,
            stock_quantity=25
        )

        product_repo = domain.repository_for(Product)
        product_repo.add(product)  # This triggers events

        # Verify projections were updated via view_for (read-only)
        inventory_view = domain.view_for(ProductInventory)
        catalog_view = domain.view_for(ProductCatalog)

        inventory = inventory_view.get(product.id)
        catalog = catalog_view.get(product.id)

        assert inventory.name == "Integration Test Product"
        assert catalog.in_stock == "YES"

See also

Concept overviews:

  • Projections — Read-optimized views in CQRS.
  • Projectors — Specialized handlers that maintain projections.

Patterns: