Skip to content

Read Models

Projections are read-optimized views maintained by projectors that react to domain events. Together they form the read side of CQRS.

Guides: Projections


BaseProjection

BaseProjection(*args: Any, **kwargs: Any)

Bases: BaseModel, OptionsMixin

Base class for projections -- read-optimized, denormalized views maintained by projectors in response to domain events.

Projections are mutable with identity-based equality. They support basic field types (String, Integer, Float, Identifier, DateTime, etc.) and ValueObject fields. Reference, HasOne, and HasMany fields are not allowed. Every projection must have at least one field marked with identifier=True.

ValueObject fields are stored as flattened shadow fields for persistence (e.g. billing_address_street, billing_address_city) and are queryable by individual attribute.

Projections can be backed by a database provider or a cache (e.g. Redis). When cache is specified, it overrides the database provider.

Fields are declared using standard Python type annotations with optional Field constraints.

Meta Options

Option Type Description
provider str The persistence provider name (default: "default").
cache str Cache adapter name. When set, overrides provider.
schema_name str The storage table/collection name.
order_by tuple Default ordering for queries.
limit int Default query result limit (default: 100).

Example::

@domain.projection
class OrderSummary(BaseProjection):
    order_id = Identifier(identifier=True)
    customer_name = String(max_length=100)
    total_amount = Float()
    status = String(max_length=20)
    shipping_address = ValueObject(Address)
Source code in src/protean/core/projection.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def __init__(self, *args: Any, **kwargs: Any) -> None:
    # Check abstract before Pydantic validation to give a clear error
    # (abstract classes may lack fields, causing misleading Pydantic errors)
    if self.meta_.abstract is True:
        raise NotSupportedError(
            f"{self.__class__.__name__} class has been marked abstract"
            f" and cannot be instantiated"
        )

    # Pop VO descriptor kwargs and shadow field kwargs before Pydantic init.
    # Shadow fields (e.g. address_street, address_city) are dynamically
    # generated and must not reach Pydantic's __init__ (extra="forbid").
    descriptor_kwargs: dict[str, Any] = {}
    shadow_kwargs: dict[str, Any] = {}

    # Build the set of known shadow field names from VO descriptors
    _shadow_field_names: set[str] = set()
    for _, fobj in getattr(type(self), _FIELDS, {}).items():
        if isinstance(fobj, ValueObject):
            for sf in fobj.embedded_fields.values():
                _shadow_field_names.add(sf.attribute_name)

    for name in list(kwargs):
        if name in _shadow_field_names:
            shadow_kwargs[name] = kwargs.pop(name)
        elif self._is_vo_descriptor(type(self), name):
            descriptor_kwargs[name] = kwargs.pop(name)

    # Support template dict pattern: Projection({"key": "val"}, key2="val2")
    # Keyword args take precedence over template dict values.
    if args:
        merged: dict[str, Any] = {}
        for template in args:
            if not isinstance(template, dict):
                raise AssertionError(
                    f"Positional argument {template} passed must be a dict. "
                    f"This argument serves as a template for loading common "
                    f"values.",
                )
            # Also separate descriptor and shadow kwargs from template dicts
            for tname in list(template):
                if tname in _shadow_field_names:
                    shadow_kwargs[tname] = template.pop(tname)
                elif self._is_vo_descriptor(type(self), tname):
                    descriptor_kwargs[tname] = template.pop(tname)
            merged.update(template)
        merged.update(kwargs)
        kwargs = merged

    # Push context onto thread-local stack for model_post_init to retrieve.
    # Pydantic clears __dict__ during super().__init__(), so we cannot
    # stash data on the instance.
    stack: list[dict[str, Any]] = getattr(_projection_init_context, "stack", [])
    stack.append(
        {
            "descriptor_kwargs": descriptor_kwargs,
            "shadow_kwargs": shadow_kwargs,
        }
    )
    _projection_init_context.stack = stack

    try:
        super().__init__(**kwargs)
    except PydanticValidationError as e:
        raise ValidationError(convert_pydantic_errors(e))

state_ property writable

state_: _EntityState

Access projection lifecycle state.

__validate_for_basic_field_types classmethod

__validate_for_basic_field_types() -> None

Reject Reference and Association descriptors.

ValueObject descriptors are allowed — they are handled via shadow fields for flat persistence storage.

Source code in src/protean/core/projection.py
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def __validate_for_basic_field_types(cls) -> None:
    """Reject Reference and Association descriptors.

    ValueObject descriptors are allowed — they are handled via shadow
    fields for flat persistence storage.
    """
    for field_name, field_obj in vars(cls).items():
        if isinstance(field_obj, (Reference, Association)):
            raise IncorrectUsageError(
                f"Projections can only contain basic field types and ValueObjects. "
                f"Remove {field_name} ({field_obj.__class__.__name__}) from class {cls.__name__}"
            )

__track_id_field classmethod

__track_id_field() -> None

Find the field marked identifier=True and record its name.

Source code in src/protean/core/projection.py
198
199
200
201
202
203
204
205
206
207
208
209
@classmethod
def __track_id_field(cls) -> None:
    """Find the field marked ``identifier=True`` and record its name."""
    try:
        id_fld = next(
            field
            for _, field in getattr(cls, _FIELDS, {}).items()
            if getattr(field, "identifier", False)
        )
        setattr(cls, _ID_FIELD_NAME, id_fld.field_name)
    except StopIteration:
        pass

defaults

defaults() -> None

Placeholder for defaults.

Override in subclass when an attribute's default depends on other attribute values.

Source code in src/protean/core/projection.py
332
333
334
335
336
337
def defaults(self) -> None:
    """Placeholder for defaults.

    Override in subclass when an attribute's default depends on other
    attribute values.
    """

to_dict

to_dict() -> dict[str, Any]

Return projection data as a dictionary.

ValueObject fields are serialized via the descriptor's as_dict() method.

Source code in src/protean/core/projection.py
392
393
394
395
396
397
398
399
400
401
402
def to_dict(self) -> dict[str, Any]:
    """Return projection data as a dictionary.

    ValueObject fields are serialized via the descriptor's
    ``as_dict()`` method.
    """
    result: dict[str, Any] = {}
    for fname, shim in getattr(self, _FIELDS, {}).items():
        value = getattr(self, fname, None)
        result[fname] = shim.as_dict(value)
    return result

BaseProjector

Bases: Element, HandlerMixin, OptionsMixin

Base class for projectors that maintain read-optimized projections by listening to domain events.

Projectors are always associated with a projection via projector_for and use the @handle (or its alias @on) decorator to map specific event types to handler methods. Unlike generic event handlers, projectors explicitly target a projection and can listen to events from multiple aggregates.

Handler methods receive the event as their only argument and are responsible for creating, updating, or deleting the projection record.

Meta Options

Option Type Description
projector_for type The projection class this projector maintains. Required.
aggregates list Aggregate classes whose events this projector listens to.
stream_categories list Explicit stream categories to subscribe to. Overrides aggregates.

Example::

@domain.projector(projector_for=OrderSummary, aggregates=[Order])
class OrderSummaryProjector(BaseProjector):

    @on(OrderPlaced)
    def on_order_placed(self, event):
        repo = current_domain.repository_for(OrderSummary)
        repo.add(OrderSummary(
            order_id=event.order_id,
            status="placed",
        ))

    @on(OrderShipped)
    def on_order_shipped(self, event):
        repo = current_domain.repository_for(OrderSummary)
        summary = repo.get(event.order_id)
        summary.status = "shipped"
        repo.add(summary)