Skip to content

Domain

The central registry object. Create one per bounded context to register all domain elements, manage configuration, and coordinate infrastructure adapters.

See Compose a Domain for a practical guide.

The domain object is a one-stop gateway to:

  • Registering Domain Objects/Concepts
  • Querying/Retrieving Domain Artifacts like Entities, Services, etc.
  • Retrieve injected infrastructure adapters

Usually you create a Domain instance in your main module or in the __init__.py file of your package like this::

from protean import Domain
domain = Domain()

The Domain will automatically detect the root path of the calling module. You can also specify the root path explicitly::

domain = Domain(root_path="/path/to/domain")

The root path resolution follows this priority:

  1. Explicit root_path parameter if provided
  2. DOMAIN_ROOT_PATH environment variable if set
  3. Auto-detection of caller's file location
  4. Current working directory as last resort

:param root_path: the path to the folder containing the domain file (optional, will auto-detect if not provided) :param name: the name of the domain (optional, will use the module name if not provided) :param config: optional configuration dictionary :param identity_function: optional function to generate identities for domain objects

Source code in src/protean/domain/__init__.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def __init__(
    self,
    root_path: str = None,
    name: str = "",
    config: Optional[Dict] = None,
    identity_function: Optional[Callable] = None,
):
    # Determine root_path based on resolution priority
    if root_path is None:
        # Try to get from environment variable
        env_root_path = os.environ.get("DOMAIN_ROOT_PATH")
        if env_root_path:
            self.root_path = env_root_path
        else:
            # Auto-detect
            self.root_path = self._guess_caller_path()
    else:
        self.root_path = root_path

    # Initialize the domain with the name of the module if not provided
    # Get the stack frame of the caller of the __init__ method
    caller_frame = inspect.stack()[1]
    # Get the module name from the globals of the frame where the object was instantiated
    self.name = name if name else caller_frame.frame.f_globals["__name__"]

    # Registry for all domain Objects
    self._domain_registry = _DomainRegistry()

    #: The configuration dictionary as ``Config``.  This behaves
    #: exactly like a regular dictionary but supports additional methods
    #: to load a config from files.
    self.config = self.load_config(config)

    # The function to invoke to generate identity
    self._identity_function = identity_function

    self.providers = Providers(self)
    self.event_store = EventStore(self)
    self.brokers = Brokers(self)
    self.caches = Caches(self)
    self.email_providers = EmailProviders(self)

    # Cache for holding Model to Entity/Aggregate associations
    # Structure mirrors Providers._repositories:
    # {
    #    'app.User': {
    #        'postgresql': UserPostgresModel,
    #        'sqlite': UserSQLiteModel,
    #        None: UserGenericModel,       # database=None → fallback for any provider
    #    }
    # }
    self._database_models: dict[str, dict[str | None, type[BaseDatabaseModel]]] = (
        defaultdict(dict)
    )
    self._constructed_models: dict[str, BaseDatabaseModel] = {}

    # Message enricher hooks — callables that add custom metadata to events/commands.
    # Event enrichers receive (event, aggregate) and return dict[str, Any].
    # Command enrichers receive (command,) and return dict[str, Any].
    # Results are merged into metadata.extensions.
    self._event_enrichers: List[Callable] = []
    self._command_enrichers: List[Callable] = []

    # Composed helpers — see handler_setup.py, validation.py, etc.
    self._command_processor = CommandProcessor(self)
    self._handler_configurator = HandlerConfigurator(self)
    self._infrastructure = InfrastructureManager(self)
    self._query_processor = QueryProcessor(self)
    self._resolver = ElementResolver(self)
    self._type_manager = TypeManager(self)
    self._validator = DomainValidator(self)

    #: A list of functions that are called when the domain context
    #: is destroyed.  This is the place to store code that cleans up and
    #: disconnects from databases, for example.
    self.teardown_domain_context_functions: List[Callable] = []

    # Placeholder array for resolving classes referenced by domain elements
    self._pending_class_resolutions: dict[str, Any] = defaultdict(list)

    # Lazy-initialized idempotency store
    self._idempotency_store = None

    # Lazy-initialized trace emitter for command processing observability
    self._trace_emitter = None

    # Lazy-initialized OpenTelemetry providers (set by init_telemetry)
    self._otel_tracer_provider = None
    self._otel_meter_provider = None
    self._otel_init_attempted = False

has_outbox property

has_outbox: bool

Whether the outbox pattern is active.

Derived from server.default_subscription_type: outbox is enabled when subscription type is "stream". For backward compatibility, an explicit enable_outbox = true also activates the outbox.

camel_case_name cached property

camel_case_name: str

Return the CamelCase name of the domain.

The CamelCase name is the name of the domain with the first letter capitalized. Examples: - my_domain -> MyDomain - my_domain_1 -> MyDomain1 - my_domain_1_0 -> MyDomain10

normalized_name cached property

normalized_name: str

Return the normalized name of the domain.

The normalized name is the underscored version of the domain name. Examples: - MyDomain -> my_domain - My Domain -> my_domain - My-Domain -> my_domain - My Domain 1 -> my_domain_1 - My Domain 1.0 -> my_domain_1_0

init

init(traverse=True)

Parse the domain folder, and attach elements dynamically to the domain.

Protean parses all files in the domain file's folder, as well as under it, to load elements. So, all domain files are to be nested under the file contain the domain definition.

One can use the traverse flag to control this functionality, True by default.

When enabled, Protean is responsible for loading domain elements and ensuring all functionality is activated.

The developer is responsible for activating functionality manually when autoloading is disabled. Element activation can be done by importing them in central areas of domain execution, like Application Services.

For example, asynchronous aspects of a domain like its Subscribers and Event Handlers should be imported in their relevant Application Services and Aggregates.

This method bubbles up circular import issues, if present, in the domain code.

Source code in src/protean/domain/__init__.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
def init(self, traverse=True):  # noqa: C901
    """Parse the domain folder, and attach elements dynamically to the domain.

    Protean parses all files in the domain file's folder, as well as under it,
    to load elements. So, all domain files are to be nested under the file contain
    the domain definition.

    One can use the `traverse` flag to control this functionality, `True` by default.

    When enabled, Protean is responsible for loading domain elements and ensuring
    all functionality is activated.

    The developer is responsible for activating functionality manually when
    autoloading is disabled. Element activation can be done by importing them
    in central areas of domain execution, like Application Services.

    For example, asynchronous aspects of a domain like its Subscribers and
    Event Handlers should be imported in their relevant Application Services
    and Aggregates.

    This method bubbles up circular import issues, if present, in the domain code.
    """
    self._auto_configure_logging()

    self._prepare(traverse=traverse)

    # Initialize adapters after loading domain
    self._initialize()

    # Initialize outbox DAOs for all providers
    if self.has_outbox:
        self._initialize_outbox()

domain_context

domain_context(**kwargs)

Create a DomainContext. Use as a with block to push the context, which will make current_domain point at this domain.

::

with domain.domain_context():
    init_db()
Source code in src/protean/domain/__init__.py
844
845
846
847
848
849
850
851
852
853
854
def domain_context(self, **kwargs):
    """Create a ``DomainContext``. Use as a ``with``
    block to push the context, which will make ``current_domain``
    point at this domain.

    ::

        with domain.domain_context():
            init_db()
    """
    return DomainContext(self, **kwargs)

aggregate

aggregate(_cls: type[_T]) -> type[_T]
aggregate(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
aggregate(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1292
1293
1294
1295
1296
1297
1298
1299
1300
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def aggregate(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.AGGREGATE,
        _cls=_cls,
        **kwargs,
    )

entity

entity(_cls: type[_T]) -> type[_T]
entity(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
entity(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1400
1401
1402
1403
1404
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def entity(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.ENTITY, _cls=_cls, **kwargs)

value_object

value_object(_cls: type[_T]) -> type[_T]
value_object(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
value_object(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1464
1465
1466
1467
1468
1469
1470
1471
1472
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def value_object(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.VALUE_OBJECT,
        _cls=_cls,
        **kwargs,
    )

command

command(_cls: type[_T]) -> type[_T]
command(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
command(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1324
1325
1326
1327
1328
1329
1330
1331
1332
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def command(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.COMMAND,
        _cls=_cls,
        **kwargs,
    )

event

event(_cls: type[_T]) -> type[_T]
event(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
event(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1352
1353
1354
1355
1356
1357
1358
1359
1360
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def event(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.EVENT,
        _cls=_cls,
        **kwargs,
    )

command_handler

command_handler(_cls: type[_T]) -> type[_T]
command_handler(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
command_handler(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1340
1341
1342
1343
1344
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def command_handler(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.COMMAND_HANDLER, _cls=_cls, **kwargs)

event_handler

event_handler(_cls: type[_T]) -> type[_T]
event_handler(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
event_handler(
    _cls: type[_T" backlink-type="used-by" backlink-anchor="protean.domain.Domain.event_handler" optional hover>_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1368
1369
1370
1371
1372
1373
1374
1375
1376
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def event_handler(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.EVENT_HANDLER,
        _cls=_cls,
        **kwargs,
    )

application_service

application_service(_cls: type[_T]) -> type[_T]
application_service(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
application_service(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1308
1309
1310
1311
1312
1313
1314
1315
1316
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def application_service(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.APPLICATION_SERVICE,
        _cls=_cls,
        **kwargs,
    )

domain_service

domain_service(_cls: type[_T]) -> type[_T]
domain_service(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
domain_service(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1384
1385
1386
1387
1388
1389
1390
1391
1392
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def domain_service(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.DOMAIN_SERVICE,
        _cls=_cls,
        **kwargs,
    )

repository

repository(_cls: type[_T" backlink-type="used-by" backlink-anchor="protean.domain.Domain.repository" optional hover>_T]) -> type[_T]
repository(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
repository(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1436
1437
1438
1439
1440
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def repository(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.REPOSITORY, _cls=_cls, **kwargs)

projection

projection(_cls: type[_T]) -> type[_T]
projection(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
projection(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1480
1481
1482
1483
1484
1485
1486
1487
1488
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def projection(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.PROJECTION,
        _cls=_cls,
        **kwargs,
    )

projector

projector(_cls: type[_T]) -> type[_T]
projector(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
projector(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1496
1497
1498
1499
1500
1501
1502
1503
1504
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def projector(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.PROJECTOR,
        _cls=_cls,
        **kwargs,
    )

subscriber

subscriber(_cls: type[_T]) -> type[_T]
subscriber(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
subscriber(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1448
1449
1450
1451
1452
1453
1454
1455
1456
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def subscriber(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.SUBSCRIBER,
        _cls=_cls,
        **kwargs,
    )

process_manager

process_manager(_cls: type[_T]) -> type[_T]
process_manager(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
process_manager(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1544
1545
1546
1547
1548
1549
1550
1551
1552
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def process_manager(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.PROCESS_MANAGER,
        _cls=_cls,
        **kwargs,
    )

upcaster

upcaster(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]

Register an event upcaster with the domain.

Upcasters transform raw event payloads from an old schema version to a newer one. They are applied lazily during deserialization so that @apply handlers and event handlers always see the current schema.

PARAMETER DESCRIPTION
event_type

The event class this upcaster targets (current version).

TYPE: type

from_version

Source version number (e.g. 1).

TYPE: int

to_version

Target version number (e.g. 2).

TYPE: int

Example::

@domain.upcaster(event_type=OrderPlaced, from_version=1, to_version=2)
class UpcastOrderPlacedV1ToV2(BaseUpcaster):
    def upcast(self, data: dict) -> dict:
        data["currency"] = "USD"
        return data
Source code in src/protean/domain/__init__.py
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
def upcaster(
    self,
    _cls: type[_T] | None = None,
    **kwargs: Any,
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    """Register an event upcaster with the domain.

    Upcasters transform raw event payloads from an old schema version to
    a newer one.  They are applied lazily during deserialization so that
    ``@apply`` handlers and event handlers always see the current schema.

    Keyword Args:
        event_type (type): The event class this upcaster targets (current version).
        from_version (int): Source version number (e.g. ``1``).
        to_version (int): Target version number (e.g. ``2``).

    Example::

        @domain.upcaster(event_type=OrderPlaced, from_version=1, to_version=2)
        class UpcastOrderPlacedV1ToV2(BaseUpcaster):
            def upcast(self, data: dict) -> dict:
                data["currency"] = "USD"
                return data
    """
    from protean.core.upcaster import upcaster_factory

    def wrap(cls: type) -> type:
        new_cls = upcaster_factory(cls, self, **kwargs)
        self._upcasters.append(new_cls)
        return new_cls

    if _cls is None:
        return wrap
    return wrap(_cls)

database_model

database_model(_cls: type[_T]) -> type[_T]
database_model(
    _cls: None = ..., **kwargs: Any
) -> Callable[[type[_T]], type[_T]]
database_model(
    _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1424
1425
1426
1427
1428
@dataclass_transform(field_specifiers=_FIELD_SPECIFIERS)
def database_model(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.DATABASE_MODEL, _cls=_cls, **kwargs)

process

process(
    command: Any,
    asynchronous: Optional[bool] = None,
    idempotency_key: Optional[str] = None,
    raise_on_duplicate: bool = False,
    priority: Optional[int] = None,
    correlation_id: Optional[str] = None,
) -> Optional[Any]

Process command and return results based on specified preference.

By default, Protean does not return values after processing commands. This behavior can be overridden either by setting command_processing in config to "sync" or by specifying asynchronous=False when calling the domain's handle method.

PARAMETER DESCRIPTION
command

Command to process (instance of a @domain.command-decorated class)

TYPE: Any

asynchronous

Specifies if the command should be processed asynchronously. Defaults to True.

TYPE: Boolean DEFAULT: None

idempotency_key

Caller-provided key for command deduplication. When provided, enables submission-level dedup via the idempotency store.

TYPE: str DEFAULT: None

raise_on_duplicate

If True, raises DuplicateCommandError when a duplicate idempotency key is detected. If False (default), silently returns the cached result.

TYPE: bool DEFAULT: False

priority

Processing priority for events produced by this command.

TYPE: int DEFAULT: None

correlation_id

Correlation ID for distributed tracing.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Optional[Any]

Optional[Any]: Returns either the command handler's return value or nothing, based on preference.

Source code in src/protean/domain/__init__.py
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
def process(
    self,
    command: Any,
    asynchronous: Optional[bool] = None,
    idempotency_key: Optional[str] = None,
    raise_on_duplicate: bool = False,
    priority: Optional[int] = None,
    correlation_id: Optional[str] = None,
) -> Optional[Any]:
    """Process command and return results based on specified preference.

    By default, Protean does not return values after processing commands. This behavior
    can be overridden either by setting command_processing in config to "sync" or by specifying
    ``asynchronous=False`` when calling the domain's ``handle`` method.

    Args:
        command: Command to process (instance of a ``@domain.command``-decorated class)
        asynchronous (Boolean, optional): Specifies if the command should be processed asynchronously.
            Defaults to True.
        idempotency_key (str, optional): Caller-provided key for command deduplication.
            When provided, enables submission-level dedup via the idempotency store.
        raise_on_duplicate (bool): If ``True``, raises ``DuplicateCommandError``
            when a duplicate idempotency key is detected. If ``False`` (default),
            silently returns the cached result.
        priority (int, optional): Processing priority for events produced by this command.
        correlation_id (str, optional): Correlation ID for distributed tracing.

    Returns:
        Optional[Any]: Returns either the command handler's return value or nothing, based on preference.
    """
    return self._command_processor.process(
        command,
        asynchronous=asynchronous,
        idempotency_key=idempotency_key,
        raise_on_duplicate=raise_on_duplicate,
        priority=priority,
        correlation_id=correlation_id,
    )