Skip to content

Read Models

Projections are read-optimized views maintained by projectors that react to domain events. Together they form the read side of CQRS.

Guides: Projections


BaseProjection

BaseProjection(*args: Any, **kwargs: Any)

Bases: BaseModel, OptionsMixin

Base class for projections -- read-optimized, denormalized views maintained by projectors in response to domain events.

Projections are mutable with identity-based equality. They only support basic field types (String, Integer, Float, Identifier, DateTime, etc.) — no Reference, HasOne, HasMany, or ValueObject fields. Every projection must have at least one field marked with identifier=True.

Projections can be backed by a database provider or a cache (e.g. Redis). When cache is specified, it overrides the database provider.

Fields are declared using standard Python type annotations with optional Field constraints.

Meta Options

Option Type Description
provider str The persistence provider name (default: "default").
cache str Cache adapter name. When set, overrides provider.
schema_name str The storage table/collection name.
order_by tuple Default ordering for queries.
limit int Default query result limit (default: 100).

Example::

@domain.projection
class OrderSummary(BaseProjection):
    order_id = Identifier(identifier=True)
    customer_name = String(max_length=100)
    total_amount = Float()
    status = String(max_length=20)
Source code in src/protean/core/projection.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def __init__(self, *args: Any, **kwargs: Any) -> None:
    # Check abstract before Pydantic validation to give a clear error
    # (abstract classes may lack fields, causing misleading Pydantic errors)
    if self.meta_.abstract is True:
        raise NotSupportedError(
            f"{self.__class__.__name__} class has been marked abstract"
            f" and cannot be instantiated"
        )

    # Support template dict pattern: Projection({"key": "val"}, key2="val2")
    # Keyword args take precedence over template dict values.
    if args:
        merged: dict[str, Any] = {}
        for template in args:
            if not isinstance(template, dict):
                raise AssertionError(
                    f"Positional argument {template} passed must be a dict. "
                    f"This argument serves as a template for loading common "
                    f"values.",
                )
            merged.update(template)
        merged.update(kwargs)
        kwargs = merged

    try:
        super().__init__(**kwargs)
    except PydanticValidationError as e:
        raise ValidationError(convert_pydantic_errors(e))

state_ property writable

state_: _EntityState

Access projection lifecycle state.

__validate_for_basic_field_types classmethod

__validate_for_basic_field_types() -> None

Reject non-basic field descriptors (ValueObject, Reference, Association).

Source code in src/protean/core/projection.py
131
132
133
134
135
136
137
138
139
@classmethod
def __validate_for_basic_field_types(cls) -> None:
    """Reject non-basic field descriptors (ValueObject, Reference, Association)."""
    for field_name, field_obj in vars(cls).items():
        if isinstance(field_obj, (Reference, Association, ValueObject)):
            raise IncorrectUsageError(
                f"Projections can only contain basic field types. "
                f"Remove {field_name} ({field_obj.__class__.__name__}) from class {cls.__name__}"
            )

__track_id_field classmethod

__track_id_field() -> None

Find the field marked identifier=True and record its name.

Source code in src/protean/core/projection.py
156
157
158
159
160
161
162
163
164
165
166
167
@classmethod
def __track_id_field(cls) -> None:
    """Find the field marked ``identifier=True`` and record its name."""
    try:
        id_fld = next(
            field
            for _, field in getattr(cls, _FIELDS, {}).items()
            if getattr(field, "identifier", False)
        )
        setattr(cls, _ID_FIELD_NAME, id_fld.field_name)
    except StopIteration:
        pass

defaults

defaults() -> None

Placeholder for defaults.

Override in subclass when an attribute's default depends on other attribute values.

Source code in src/protean/core/projection.py
202
203
204
205
206
207
def defaults(self) -> None:
    """Placeholder for defaults.

    Override in subclass when an attribute's default depends on other
    attribute values.
    """

to_dict

to_dict() -> dict[str, Any]

Return projection data as a dictionary.

Internal fields (prefixed with _) are excluded for consistency with the entity to_dict() behaviour.

Source code in src/protean/core/projection.py
255
256
257
258
259
260
261
262
263
264
265
266
def to_dict(self) -> dict[str, Any]:
    """Return projection data as a dictionary.

    Internal fields (prefixed with ``_``) are excluded for consistency
    with the entity ``to_dict()`` behaviour.
    """
    result: dict[str, Any] = {}
    for fname, shim in getattr(self, _FIELDS, {}).items():
        if fname.startswith("_"):
            continue
        result[fname] = shim.as_dict(getattr(self, fname, None))
    return result

BaseProjector

Bases: Element, HandlerMixin, OptionsMixin

Base class for projectors that maintain read-optimized projections by listening to domain events.

Projectors are always associated with a projection via projector_for and use the @handle (or its alias @on) decorator to map specific event types to handler methods. Unlike generic event handlers, projectors explicitly target a projection and can listen to events from multiple aggregates.

Handler methods receive the event as their only argument and are responsible for creating, updating, or deleting the projection record.

Meta Options

Option Type Description
projector_for type The projection class this projector maintains. Required.
aggregates list Aggregate classes whose events this projector listens to.
stream_categories list Explicit stream categories to subscribe to. Overrides aggregates.

Example::

@domain.projector(projector_for=OrderSummary, aggregates=[Order])
class OrderSummaryProjector(BaseProjector):

    @on(OrderPlaced)
    def on_order_placed(self, event):
        repo = current_domain.repository_for(OrderSummary)
        repo.add(OrderSummary(
            order_id=event.order_id,
            status="placed",
        ))

    @on(OrderShipped)
    def on_order_shipped(self, event):
        repo = current_domain.repository_for(OrderSummary)
        summary = repo.get(event.order_id)
        summary.status = "shipped"
        repo.add(summary)