Skip to content

Domain

The central registry object. Create one per bounded context to register all domain elements, manage configuration, and coordinate infrastructure adapters.

See Compose a Domain for a practical guide.

The domain object is a one-stop gateway to:

  • Registering Domain Objects/Concepts
  • Querying/Retrieving Domain Artifacts like Entities, Services, etc.
  • Retrieve injected infrastructure adapters

Usually you create a Domain instance in your main module or in the __init__.py file of your package like this::

from protean import Domain
domain = Domain()

The Domain will automatically detect the root path of the calling module. You can also specify the root path explicitly::

domain = Domain(root_path="/path/to/domain")

The root path resolution follows this priority:

  1. Explicit root_path parameter if provided
  2. DOMAIN_ROOT_PATH environment variable if set
  3. Auto-detection of caller's file location
  4. Current working directory as last resort

:param root_path: the path to the folder containing the domain file (optional, will auto-detect if not provided) :param name: the name of the domain (optional, will use the module name if not provided) :param config: optional configuration dictionary :param identity_function: optional function to generate identities for domain objects

Source code in src/protean/domain/__init__.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def __init__(
    self,
    root_path: str = None,
    name: str = "",
    config: Optional[Dict] = None,
    identity_function: Optional[Callable] = None,
):
    # Determine root_path based on resolution priority
    if root_path is None:
        # Try to get from environment variable
        env_root_path = os.environ.get("DOMAIN_ROOT_PATH")
        if env_root_path:
            self.root_path = env_root_path
        else:
            # Auto-detect
            self.root_path = self._guess_caller_path()
    else:
        self.root_path = root_path

    # Initialize the domain with the name of the module if not provided
    # Get the stack frame of the caller of the __init__ method
    caller_frame = inspect.stack()[1]
    # Get the module name from the globals of the frame where the object was instantiated
    self.name = name if name else caller_frame.frame.f_globals["__name__"]

    # Registry for all domain Objects
    self._domain_registry = _DomainRegistry()

    #: The configuration dictionary as ``Config``.  This behaves
    #: exactly like a regular dictionary but supports additional methods
    #: to load a config from files.
    self.config = self.load_config(config)

    # The function to invoke to generate identity
    self._identity_function = identity_function

    self.providers = Providers(self)
    self.event_store = EventStore(self)
    self.brokers = Brokers(self)
    self.caches = Caches(self)
    self.email_providers = EmailProviders(self)

    # Cache for holding Model to Entity/Aggregate associations
    # Structure mirrors Providers._repositories:
    # {
    #    'app.User': {
    #        'postgresql': UserPostgresModel,
    #        'sqlite': UserSQLiteModel,
    #        None: UserGenericModel,       # database=None → fallback for any provider
    #    }
    # }
    self._database_models: dict[str, dict[str | None, type[BaseDatabaseModel]]] = (
        defaultdict(dict)
    )
    self._constructed_models: dict[str, BaseDatabaseModel] = {}

    # Message enricher hooks — callables that add custom metadata to events/commands.
    # Event enrichers receive (event, aggregate) and return dict[str, Any].
    # Command enrichers receive (command,) and return dict[str, Any].
    # Results are merged into metadata.extensions.
    self._event_enrichers: List[Callable] = []
    self._command_enrichers: List[Callable] = []

    # Composed helpers — see handler_setup.py, validation.py, etc.
    self._command_processor = CommandProcessor(self)
    self._handler_configurator = HandlerConfigurator(self)
    self._infrastructure = InfrastructureManager(self)
    self._query_processor = QueryProcessor(self)
    self._resolver = ElementResolver(self)
    self._type_manager = TypeManager(self)
    self._validator = DomainValidator(self)

    #: A list of functions that are called when the domain context
    #: is destroyed.  This is the place to store code that cleans up and
    #: disconnects from databases, for example.
    self.teardown_domain_context_functions: List[Callable] = []

    # Placeholder array for resolving classes referenced by domain elements
    self._pending_class_resolutions: dict[str, Any] = defaultdict(list)

    # Lazy-initialized idempotency store
    self._idempotency_store = None

    # Lazy-initialized trace emitter for command processing observability
    self._trace_emitter = None

    # Lazy-initialized OpenTelemetry providers (set by init_telemetry)
    self._otel_tracer_provider = None
    self._otel_meter_provider = None
    self._otel_init_attempted = False

has_outbox property

has_outbox: bool

Whether the outbox pattern is active.

Derived from server.default_subscription_type: outbox is enabled when subscription type is "stream". For backward compatibility, an explicit enable_outbox = true also activates the outbox.

camel_case_name cached property

camel_case_name: str

Return the CamelCase name of the domain.

The CamelCase name is the name of the domain with the first letter capitalized. Examples: - my_domain -> MyDomain - my_domain_1 -> MyDomain1 - my_domain_1_0 -> MyDomain10

normalized_name cached property

normalized_name: str

Return the normalized name of the domain.

The normalized name is the underscored version of the domain name. Examples: - MyDomain -> my_domain - My Domain -> my_domain - My-Domain -> my_domain - My Domain 1 -> my_domain_1 - My Domain 1.0 -> my_domain_1_0

init

init(traverse=True)

Parse the domain folder, and attach elements dynamically to the domain.

Protean parses all files in the domain file's folder, as well as under it, to load elements. So, all domain files are to be nested under the file contain the domain definition.

One can use the traverse flag to control this functionality, True by default.

When enabled, Protean is responsible for loading domain elements and ensuring all functionality is activated.

The developer is responsible for activating functionality manually when autoloading is disabled. Element activation can be done by importing them in central areas of domain execution, like Application Services.

For example, asynchronous aspects of a domain like its Subscribers and Event Handlers should be imported in their relevant Application Services and Aggregates.

This method bubbles up circular import issues, if present, in the domain code.

Source code in src/protean/domain/__init__.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
def init(self, traverse=True):  # noqa: C901
    """Parse the domain folder, and attach elements dynamically to the domain.

    Protean parses all files in the domain file's folder, as well as under it,
    to load elements. So, all domain files are to be nested under the file contain
    the domain definition.

    One can use the `traverse` flag to control this functionality, `True` by default.

    When enabled, Protean is responsible for loading domain elements and ensuring
    all functionality is activated.

    The developer is responsible for activating functionality manually when
    autoloading is disabled. Element activation can be done by importing them
    in central areas of domain execution, like Application Services.

    For example, asynchronous aspects of a domain like its Subscribers and
    Event Handlers should be imported in their relevant Application Services
    and Aggregates.

    This method bubbles up circular import issues, if present, in the domain code.
    """
    self._prepare(traverse=traverse)

    # Initialize adapters after loading domain
    self._initialize()

    # Initialize outbox DAOs for all providers
    if self.has_outbox:
        self._initialize_outbox()

domain_context

domain_context(**kwargs)

Create a DomainContext. Use as a with block to push the context, which will make current_domain point at this domain.

::

with domain.domain_context():
    init_db()
Source code in src/protean/domain/__init__.py
808
809
810
811
812
813
814
815
816
817
818
def domain_context(self, **kwargs):
    """Create a ``DomainContext``. Use as a ``with``
    block to push the context, which will make ``current_domain``
    point at this domain.

    ::

        with domain.domain_context():
            init_db()
    """
    return DomainContext(self, **kwargs)

aggregate

aggregate(_cls: type[_T]) -> type[_T]
aggregate(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
aggregate(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1256
1257
1258
1259
1260
1261
1262
1263
1264
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def aggregate(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.AGGREGATE,
        _cls=_cls,
        **kwargs,
    )

entity

entity(_cls: type[_T]) -> type[_T]
entity(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
entity(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1364
1365
1366
1367
1368
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def entity(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.ENTITY, _cls=_cls, **kwargs)

value_object

value_object(_cls: type[_T]) -> type[_T]
value_object(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
value_object(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1428
1429
1430
1431
1432
1433
1434
1435
1436
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def value_object(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.VALUE_OBJECT,
        _cls=_cls,
        **kwargs,
    )

command

command(_cls: type[_T]) -> type[_T]
command(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
command(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1288
1289
1290
1291
1292
1293
1294
1295
1296
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def command(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.COMMAND,
        _cls=_cls,
        **kwargs,
    )

event

event(_cls: type[_T]) -> type[_T]
event(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
event(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1316
1317
1318
1319
1320
1321
1322
1323
1324
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def event(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.EVENT,
        _cls=_cls,
        **kwargs,
    )

command_handler

command_handler(_cls: type[_T]) -> type[_T]
command_handler(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
command_handler(
    _cls: type[_T" backlink-type="used-by" backlink-anchor="protean.domain.Domain.command_handler" optional hover>_T] | None = None, **kwargs: Any
) -> type[_T" backlink-type="returned-by" backlink-anchor="protean.domain.Domain.command_handler" optional hover>_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1304
1305
1306
1307
1308
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def command_handler(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.COMMAND_HANDLER, _cls=_cls, **kwargs)

event_handler

event_handler(_cls: type[_T]) -> type[_T]
event_handler(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
event_handler(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1332
1333
1334
1335
1336
1337
1338
1339
1340
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def event_handler(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.EVENT_HANDLER,
        _cls=_cls,
        **kwargs,
    )

application_service

application_service(_cls: type[_T]) -> type[_T]
application_service(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
application_service(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1272
1273
1274
1275
1276
1277
1278
1279
1280
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def application_service(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.APPLICATION_SERVICE,
        _cls=_cls,
        **kwargs,
    )

domain_service

domain_service(_cls: type[_T]) -> type[_T]
domain_service(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
domain_service(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1348
1349
1350
1351
1352
1353
1354
1355
1356
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def domain_service(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.DOMAIN_SERVICE,
        _cls=_cls,
        **kwargs,
    )

repository

repository(_cls: type[_T]) -> type[_T]
repository(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
repository(
    _cls: type[_T" backlink-type="used-by" backlink-anchor="protean.domain.Domain.repository" optional hover>_T] | None = None, **kwargs: Any
) -> type[_T" backlink-type="returned-by" backlink-anchor="protean.domain.Domain.repository" optional hover>_T] | Callable[[type[_T" backlink-type="returned-by" backlink-anchor="protean.domain.Domain.repository" optional hover>_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1400
1401
1402
1403
1404
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def repository(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.REPOSITORY, _cls=_cls, **kwargs)

projection

projection(_cls: type[_T]) -> type[_T]
projection(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
projection(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1444
1445
1446
1447
1448
1449
1450
1451
1452
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def projection(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.PROJECTION,
        _cls=_cls,
        **kwargs,
    )

projector

projector(_cls: type[_T]) -> type[_T]
projector(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
projector(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1460
1461
1462
1463
1464
1465
1466
1467
1468
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def projector(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.PROJECTOR,
        _cls=_cls,
        **kwargs,
    )

subscriber

subscriber(_cls: type[_T]) -> type[_T]
subscriber(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
subscriber(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1412
1413
1414
1415
1416
1417
1418
1419
1420
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def subscriber(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.SUBSCRIBER,
        _cls=_cls,
        **kwargs,
    )

process_manager

process_manager(_cls: type[_T]) -> type[_T]
process_manager(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
process_manager(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1508
1509
1510
1511
1512
1513
1514
1515
1516
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def process_manager(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.PROCESS_MANAGER,
        _cls=_cls,
        **kwargs,
    )

upcaster

upcaster(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]

Register an event upcaster with the domain.

Upcasters transform raw event payloads from an old schema version to a newer one. They are applied lazily during deserialization so that @apply handlers and event handlers always see the current schema.

PARAMETER DESCRIPTION
event_type

The event class this upcaster targets (current version).

TYPE: type

from_version

Source version number (e.g. 1).

TYPE: int

to_version

Target version number (e.g. 2).

TYPE: int

Example::

@domain.upcaster(event_type=OrderPlaced, from_version=1, to_version=2)
class UpcastOrderPlacedV1ToV2(BaseUpcaster):
    def upcast(self, data: dict) -> dict:
        data["currency"] = "USD"
        return data
Source code in src/protean/domain/__init__.py
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
def upcaster(
    self,
    _cls: type[_T] | None = None,
    **kwargs: Any,
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    """Register an event upcaster with the domain.

    Upcasters transform raw event payloads from an old schema version to
    a newer one.  They are applied lazily during deserialization so that
    ``@apply`` handlers and event handlers always see the current schema.

    Keyword Args:
        event_type (type): The event class this upcaster targets (current version).
        from_version (int): Source version number (e.g. ``1``).
        to_version (int): Target version number (e.g. ``2``).

    Example::

        @domain.upcaster(event_type=OrderPlaced, from_version=1, to_version=2)
        class UpcastOrderPlacedV1ToV2(BaseUpcaster):
            def upcast(self, data: dict) -> dict:
                data["currency"] = "USD"
                return data
    """
    from protean.core.upcaster import upcaster_factory

    def wrap(cls: type) -> type:
        new_cls = upcaster_factory(cls, self, **kwargs)
        self._upcasters.append(new_cls)
        return new_cls

    if _cls is None:
        return wrap
    return wrap(_cls)

database_model

database_model(_cls: type[_T]) -> type[_T]
database_model(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T" backlink-type="returned-by" backlink-anchor="protean.domain.Domain.database_model" optional hover>_T]], type[_T]]
database_model(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1388
1389
1390
1391
1392
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def database_model(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.DATABASE_MODEL, _cls=_cls, **kwargs)

process

process(
    command: Any,
    asynchronous: Optional[bool] = None,
    idempotency_key: Optional[str] = None,
    raise_on_duplicate: bool = False,
    priority: Optional[int] = None,
    correlation_id: Optional[str] = None,
) -> Optional[Any]

Process command and return results based on specified preference.

By default, Protean does not return values after processing commands. This behavior can be overridden either by setting command_processing in config to "sync" or by specifying asynchronous=False when calling the domain's handle method.

PARAMETER DESCRIPTION
command

Command to process (instance of a @domain.command-decorated class)

TYPE: Any

asynchronous

Specifies if the command should be processed asynchronously. Defaults to True.

TYPE: Boolean DEFAULT: None

idempotency_key

Caller-provided key for command deduplication. When provided, enables submission-level dedup via the idempotency store.

TYPE: str DEFAULT: None

raise_on_duplicate

If True, raises DuplicateCommandError when a duplicate idempotency key is detected. If False (default), silently returns the cached result.

TYPE: bool DEFAULT: False

priority

Processing priority for events produced by this command.

TYPE: int DEFAULT: None

correlation_id

Correlation ID for distributed tracing.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Optional[Any]

Optional[Any]: Returns either the command handler's return value or nothing, based on preference.

Source code in src/protean/domain/__init__.py
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
def process(
    self,
    command: Any,
    asynchronous: Optional[bool] = None,
    idempotency_key: Optional[str] = None,
    raise_on_duplicate: bool = False,
    priority: Optional[int] = None,
    correlation_id: Optional[str] = None,
) -> Optional[Any]:
    """Process command and return results based on specified preference.

    By default, Protean does not return values after processing commands. This behavior
    can be overridden either by setting command_processing in config to "sync" or by specifying
    ``asynchronous=False`` when calling the domain's ``handle`` method.

    Args:
        command: Command to process (instance of a ``@domain.command``-decorated class)
        asynchronous (Boolean, optional): Specifies if the command should be processed asynchronously.
            Defaults to True.
        idempotency_key (str, optional): Caller-provided key for command deduplication.
            When provided, enables submission-level dedup via the idempotency store.
        raise_on_duplicate (bool): If ``True``, raises ``DuplicateCommandError``
            when a duplicate idempotency key is detected. If ``False`` (default),
            silently returns the cached result.
        priority (int, optional): Processing priority for events produced by this command.
        correlation_id (str, optional): Correlation ID for distributed tracing.

    Returns:
        Optional[Any]: Returns either the command handler's return value or nothing, based on preference.
    """
    return self._command_processor.process(
        command,
        asynchronous=asynchronous,
        idempotency_key=idempotency_key,
        raise_on_duplicate=raise_on_duplicate,
        priority=priority,
        correlation_id=correlation_id,
    )