Skip to content

Domain

The central registry object. Create one per bounded context to register all domain elements, manage configuration, and coordinate infrastructure adapters.

See Compose a Domain for a practical guide.

The domain object is a one-stop gateway to:

  • Registering Domain Objects/Concepts
  • Querying/Retrieving Domain Artifacts like Entities, Services, etc.
  • Retrieve injected infrastructure adapters

Usually you create a Domain instance in your main module or in the __init__.py file of your package like this::

from protean import Domain
domain = Domain()

The Domain will automatically detect the root path of the calling module. You can also specify the root path explicitly::

domain = Domain(root_path="/path/to/domain")

The root path resolution follows this priority:

  1. Explicit root_path parameter if provided
  2. DOMAIN_ROOT_PATH environment variable if set
  3. Auto-detection of caller's file location
  4. Current working directory as last resort

:param root_path: the path to the folder containing the domain file (optional, will auto-detect if not provided) :param name: the name of the domain (optional, will use the module name if not provided) :param config: optional configuration dictionary :param identity_function: optional function to generate identities for domain objects

Source code in src/protean/domain/__init__.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def __init__(
    self,
    root_path: str = None,
    name: str = "",
    config: Optional[Dict] = None,
    identity_function: Optional[Callable] = None,
):
    # Determine root_path based on resolution priority
    if root_path is None:
        # Try to get from environment variable
        env_root_path = os.environ.get("DOMAIN_ROOT_PATH")
        if env_root_path:
            self.root_path = env_root_path
        else:
            # Auto-detect
            self.root_path = self._guess_caller_path()
    else:
        self.root_path = root_path

    # Initialize the domain with the name of the module if not provided
    # Get the stack frame of the caller of the __init__ method
    caller_frame = inspect.stack()[1]
    # Get the module name from the globals of the frame where the object was instantiated
    self.name = name if name else caller_frame.frame.f_globals["__name__"]

    # FIXME Additional domain attributes: (Think if this is needed)
    #   - Type of Domain: Core, Supporting, Third-party(?)
    #   - Type of Implementation: CRUD, CQRS, ES

    # Registry for all domain Objects
    self._domain_registry = _DomainRegistry()

    #: The configuration dictionary as ``Config``.  This behaves
    #: exactly like a regular dictionary but supports additional methods
    #: to load a config from files.
    self.config = self.load_config(config)

    # The function to invoke to generate identity
    self._identity_function = identity_function

    self.providers = Providers(self)
    self.event_store = EventStore(self)
    self.brokers = Brokers(self)
    self.caches = Caches(self)
    self.email_providers = EmailProviders(self)

    # Cache for holding Model to Entity/Aggregate associations
    self._database_models: Dict[str, BaseDatabaseModel] = {}
    self._constructed_models: Dict[str, BaseDatabaseModel] = {}

    # Cache for holding events and commands by their types
    self._events_and_commands: Dict[str, Union[BaseCommand, BaseEvent]] = {}

    # Upcaster infrastructure (lightweight, not full domain elements)
    from protean.utils.upcasting import UpcasterChain

    self._upcasters: list[type] = []
    self._upcaster_chain: UpcasterChain = UpcasterChain()

    # Message enricher hooks — callables that add custom metadata to events/commands.
    # Event enrichers receive (event, aggregate) and return dict[str, Any].
    # Command enrichers receive (command,) and return dict[str, Any].
    # Results are merged into metadata.extensions.
    self._event_enrichers: List[Callable] = []
    self._command_enrichers: List[Callable] = []

    #: A list of functions that are called when the domain context
    #: is destroyed.  This is the place to store code that cleans up and
    #: disconnects from databases, for example.
    self.teardown_domain_context_functions: List[Callable] = []

    # Placeholder array for resolving classes referenced by domain elements
    # FIXME Should all protean elements be subclassed from a base element?
    self._pending_class_resolutions: dict[str, Any] = defaultdict(list)

    # Store outbox DAOs per provider
    self._outbox_repos = {}

    # Lazy-initialized idempotency store
    self._idempotency_store = None

has_outbox property

has_outbox: bool

Whether the outbox pattern is active.

Derived from server.default_subscription_type: outbox is enabled when subscription type is "stream". For backward compatibility, an explicit enable_outbox = true also activates the outbox.

camel_case_name cached property

camel_case_name: str

Return the CamelCase name of the domain.

The CamelCase name is the name of the domain with the first letter capitalized. Examples: - my_domain -> MyDomain - my_domain_1 -> MyDomain1 - my_domain_1_0 -> MyDomain10

normalized_name cached property

normalized_name: str

Return the normalized name of the domain.

The normalized name is the underscored version of the domain name. Examples: - MyDomain -> my_domain - My Domain -> my_domain - My-Domain -> my_domain - My Domain 1 -> my_domain_1 - My Domain 1.0 -> my_domain_1_0

init

init(traverse=True)

Parse the domain folder, and attach elements dynamically to the domain.

Protean parses all files in the domain file's folder, as well as under it, to load elements. So, all domain files are to be nested under the file contain the domain definition.

One can use the traverse flag to control this functionality, True by default.

When enabled, Protean is responsible for loading domain elements and ensuring all functionality is activated.

The developer is responsible for activating functionality manually when autoloading is disabled. Element activation can be done by importing them in central areas of domain execution, like Application Services.

For example, asynchronous aspects of a domain like its Subscribers and Event Handlers should be imported in their relevant Application Services and Aggregates.

This method bubbles up circular import issues, if present, in the domain code.

Source code in src/protean/domain/__init__.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def init(self, traverse=True):  # noqa: C901
    """Parse the domain folder, and attach elements dynamically to the domain.

    Protean parses all files in the domain file's folder, as well as under it,
    to load elements. So, all domain files are to be nested under the file contain
    the domain definition.

    One can use the `traverse` flag to control this functionality, `True` by default.

    When enabled, Protean is responsible for loading domain elements and ensuring
    all functionality is activated.

    The developer is responsible for activating functionality manually when
    autoloading is disabled. Element activation can be done by importing them
    in central areas of domain execution, like Application Services.

    For example, asynchronous aspects of a domain like its Subscribers and
    Event Handlers should be imported in their relevant Application Services
    and Aggregates.

    This method bubbles up circular import issues, if present, in the domain code.
    """
    if traverse is True:
        self._traverse()

    # Resolve all pending references
    self._resolve_references()

    # Assign Aggregate Clusters
    self._assign_aggregate_clusters()

    # Set Aggregate Cluster Options
    self._set_aggregate_cluster_options()

    # Generate Fact Event Classes
    self._generate_fact_event_classes()

    # Generate and set event/command `__type__` value
    self._set_and_record_event_and_command_type()

    # Build upcaster chains for event schema evolution
    self._build_upcaster_chains()

    # Parse and setup handler methods in Command Handlers
    self._setup_command_handlers()

    # Parse and setup handler methods in Event Handlers
    self._setup_event_handlers()

    # Parse and setup handler methods in Projectors
    self._setup_projectors()

    # Parse and setup handler methods in Process Managers
    self._setup_process_managers()

    # Run Validations
    self._validate_domain()

    # Initialize adapters after loading domain
    self._initialize()

    # Validate outbox / subscription-type consistency
    subscription_type = self.config.get("server", {}).get(
        "default_subscription_type", "event_store"
    )
    if (
        self.config.get("enable_outbox", False)
        and subscription_type == "event_store"
    ):
        raise ConfigurationError(
            "Configuration conflict: 'enable_outbox' is True but "
            "'server.default_subscription_type' is 'event_store'. "
            "When outbox is enabled, subscription type must be 'stream' "
            "so that subscriptions read from the broker where the outbox publishes. "
            "Either set server.default_subscription_type = 'stream' or remove enable_outbox."
        )

    # Validate priority lanes configuration
    lanes_config = self.config.get("server", {}).get("priority_lanes", {})
    if lanes_config:
        enabled = lanes_config.get("enabled", False)
        if not isinstance(enabled, bool):
            raise ConfigurationError(
                f"server.priority_lanes.enabled must be a bool, "
                f"got {type(enabled).__name__}: {enabled!r}"
            )

        threshold = lanes_config.get("threshold", 0)
        if not isinstance(threshold, (int, float)) or isinstance(threshold, bool):
            raise ConfigurationError(
                f"server.priority_lanes.threshold must be an integer, "
                f"got {type(threshold).__name__}: {threshold!r}"
            )

        suffix = lanes_config.get("backfill_suffix", "backfill")
        if not isinstance(suffix, str) or not suffix.strip():
            raise ConfigurationError(
                f"server.priority_lanes.backfill_suffix must be a non-empty string, "
                f"got {type(suffix).__name__}: {suffix!r}"
            )

    # Initialize outbox DAOs for all providers
    if self.has_outbox:
        self._initialize_outbox()

domain_context

domain_context(**kwargs)

Create a DomainContext. Use as a with block to push the context, which will make current_domain point at this domain.

::

with domain.domain_context():
    init_db()
Source code in src/protean/domain/__init__.py
568
569
570
571
572
573
574
575
576
577
578
def domain_context(self, **kwargs):
    """Create a ``DomainContext``. Use as a ``with``
    block to push the context, which will make ``current_domain``
    point at this domain.

    ::

        with domain.domain_context():
            init_db()
    """
    return DomainContext(self, **kwargs)

aggregate

aggregate(_cls: type[_T]) -> type[_T]
aggregate(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
aggregate(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1354
1355
1356
1357
1358
1359
1360
1361
1362
@dataclass_transform()
def aggregate(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.AGGREGATE,
        _cls=_cls,
        **kwargs,
    )

entity

entity(_cls: type[_T]) -> type[_T]
entity(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
entity(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1462
1463
1464
1465
1466
@dataclass_transform()
def entity(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.ENTITY, _cls=_cls, **kwargs)

value_object

value_object(_cls: type[_T]) -> type[_T]
value_object(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
value_object(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1526
1527
1528
1529
1530
1531
1532
1533
1534
@dataclass_transform()
def value_object(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.VALUE_OBJECT,
        _cls=_cls,
        **kwargs,
    )

command

command(_cls: type[_T]) -> type[_T]
command(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
command(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1386
1387
1388
1389
1390
1391
1392
1393
1394
@dataclass_transform()
def command(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.COMMAND,
        _cls=_cls,
        **kwargs,
    )

event

event(_cls: type[_T]) -> type[_T]
event(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
event(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1414
1415
1416
1417
1418
1419
1420
1421
1422
@dataclass_transform()
def event(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.EVENT,
        _cls=_cls,
        **kwargs,
    )

command_handler

command_handler(_cls: type[_T]) -> type[_T]
command_handler(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
command_handler(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1402
1403
1404
1405
1406
@dataclass_transform()
def command_handler(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.COMMAND_HANDLER, _cls=_cls, **kwargs)

event_handler

event_handler(_cls: type[_T]) -> type[_T]
event_handler(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
event_handler(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1430
1431
1432
1433
1434
1435
1436
1437
1438
@dataclass_transform()
def event_handler(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.EVENT_HANDLER,
        _cls=_cls,
        **kwargs,
    )

application_service

application_service(_cls: type[_T]) -> type[_T]
application_service(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
application_service(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1370
1371
1372
1373
1374
1375
1376
1377
1378
@dataclass_transform()
def application_service(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.APPLICATION_SERVICE,
        _cls=_cls,
        **kwargs,
    )

domain_service

domain_service(_cls: type[_T]) -> type[_T]
domain_service(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
domain_service(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1446
1447
1448
1449
1450
1451
1452
1453
1454
@dataclass_transform()
def domain_service(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.DOMAIN_SERVICE,
        _cls=_cls,
        **kwargs,
    )

repository

repository(_cls: type[_T]) -> type[_T]
repository(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
repository(_cls: type[_T" backlink-type="used-by" backlink-anchor="protean.domain.Domain.repository" optional hover>_T] | None = None, **kwargs: Any) -> type[_T" backlink-type="returned-by" backlink-anchor="protean.domain.Domain.repository" optional hover>_T] | Callable[[type[_T" backlink-type="returned-by" backlink-anchor="protean.domain.Domain.repository" optional hover>_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1498
1499
1500
1501
1502
@dataclass_transform()
def repository(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.REPOSITORY, _cls=_cls, **kwargs)

projection

projection(_cls: type[_T]) -> type[_T]
projection(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
projection(_cls: type[_T" backlink-type="used-by" backlink-anchor="protean.domain.Domain.projection" optional hover>_T] | None = None, **kwargs: Any) -> type[_T" backlink-type="returned-by" backlink-anchor="protean.domain.Domain.projection" optional hover>_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1542
1543
1544
1545
1546
1547
1548
1549
1550
@dataclass_transform()
def projection(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.PROJECTION,
        _cls=_cls,
        **kwargs,
    )

projector

projector(_cls: type[_T]) -> type[_T]
projector(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
projector(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1558
1559
1560
1561
1562
1563
1564
1565
1566
@dataclass_transform()
def projector(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.PROJECTOR,
        _cls=_cls,
        **kwargs,
    )

subscriber

subscriber(_cls: type[_T]) -> type[_T]
subscriber(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
subscriber(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1510
1511
1512
1513
1514
1515
1516
1517
1518
@dataclass_transform()
def subscriber(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.SUBSCRIBER,
        _cls=_cls,
        **kwargs,
    )

process_manager

process_manager(_cls: type[_T]) -> type[_T]
process_manager(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
process_manager(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1574
1575
1576
1577
1578
1579
1580
1581
1582
@dataclass_transform()
def process_manager(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(
        DomainObjects.PROCESS_MANAGER,
        _cls=_cls,
        **kwargs,
    )

upcaster

upcaster(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]

Register an event upcaster with the domain.

Upcasters transform raw event payloads from an old schema version to a newer one. They are applied lazily during deserialization so that @apply handlers and event handlers always see the current schema.

PARAMETER DESCRIPTION
event_type

The event class this upcaster targets (current version).

TYPE: type

from_version

Source version string (e.g. "v1").

TYPE: str

to_version

Target version string (e.g. "v2").

TYPE: str

Example::

@domain.upcaster(event_type=OrderPlaced, from_version="v1", to_version="v2")
class UpcastOrderPlacedV1ToV2(BaseUpcaster):
    def upcast(self, data: dict) -> dict:
        data["currency"] = "USD"
        return data
Source code in src/protean/domain/__init__.py
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
def upcaster(
    self,
    _cls: type[_T] | None = None,
    **kwargs: Any,
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    """Register an event upcaster with the domain.

    Upcasters transform raw event payloads from an old schema version to
    a newer one.  They are applied lazily during deserialization so that
    ``@apply`` handlers and event handlers always see the current schema.

    Keyword Args:
        event_type (type): The event class this upcaster targets (current version).
        from_version (str): Source version string (e.g. ``"v1"``).
        to_version (str): Target version string (e.g. ``"v2"``).

    Example::

        @domain.upcaster(event_type=OrderPlaced, from_version="v1", to_version="v2")
        class UpcastOrderPlacedV1ToV2(BaseUpcaster):
            def upcast(self, data: dict) -> dict:
                data["currency"] = "USD"
                return data
    """
    from protean.core.upcaster import upcaster_factory

    def wrap(cls: type) -> type:
        new_cls = upcaster_factory(cls, self, **kwargs)
        self._upcasters.append(new_cls)
        return new_cls

    if _cls is None:
        return wrap
    return wrap(_cls)

database_model

database_model(_cls: type[_T]) -> type[_T]
database_model(_cls: None = ..., **kwargs: Any) -> Callable[[type[_T]], type[_T]]
database_model(_cls: type[_T] | None = None, **kwargs: Any) -> type[_T] | Callable[[type[_T]], type[_T]]
Source code in src/protean/domain/__init__.py
1486
1487
1488
1489
1490
@dataclass_transform()
def database_model(
    self, _cls: type[_T] | None = None, **kwargs: Any
) -> type[_T] | Callable[[type[_T]], type[_T]]:
    return self._domain_element(DomainObjects.DATABASE_MODEL, _cls=_cls, **kwargs)

process

process(command: Any, asynchronous: Optional[bool] = None, idempotency_key: Optional[str] = None, raise_on_duplicate: bool = False, priority: Optional[int] = None, correlation_id: Optional[str] = None) -> Optional[Any]

Process command and return results based on specified preference.

By default, Protean does not return values after processing commands. This behavior can be overridden either by setting command_processing in config to "sync" or by specifying asynchronous=False when calling the domain's handle method.

PARAMETER DESCRIPTION
command

Command to process (instance of a @domain.command-decorated class)

TYPE: Any

asynchronous

Specifies if the command should be processed asynchronously. Defaults to True.

TYPE: Boolean DEFAULT: None

idempotency_key

Caller-provided key for command deduplication. When provided, enables submission-level dedup via the idempotency store.

TYPE: str DEFAULT: None

raise_on_duplicate

If True, raises DuplicateCommandError when a duplicate idempotency key is detected. If False (default), silently returns the cached result.

TYPE: bool DEFAULT: False

priority

Processing priority for events produced by this command. When priority lanes are enabled, events with priority below the configured threshold are routed to a backfill stream and processed only when the primary stream is empty. Use Priority enum values from protean.utils.processing. If not specified, uses the value from the current processing_priority() context, or Priority.NORMAL (0).

TYPE: int DEFAULT: None

correlation_id

Correlation ID for distributed tracing. When provided (e.g. from a frontend or API gateway), this ID is propagated to all commands and events in the causal chain. If not provided, a new UUID is auto-generated.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Optional[Any]

Optional[Any]: Returns either the command handler's return value or nothing, based on preference.

Source code in src/protean/domain/__init__.py
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
def process(
    self,
    command: Any,
    asynchronous: Optional[bool] = None,
    idempotency_key: Optional[str] = None,
    raise_on_duplicate: bool = False,
    priority: Optional[int] = None,
    correlation_id: Optional[str] = None,
) -> Optional[Any]:
    """Process command and return results based on specified preference.

    By default, Protean does not return values after processing commands. This behavior
    can be overridden either by setting command_processing in config to "sync" or by specifying
    ``asynchronous=False`` when calling the domain's ``handle`` method.

    Args:
        command: Command to process (instance of a ``@domain.command``-decorated class)
        asynchronous (Boolean, optional): Specifies if the command should be processed asynchronously.
            Defaults to True.
        idempotency_key (str, optional): Caller-provided key for command deduplication.
            When provided, enables submission-level dedup via the idempotency store.
        raise_on_duplicate (bool): If ``True``, raises ``DuplicateCommandError``
            when a duplicate idempotency key is detected. If ``False`` (default),
            silently returns the cached result.
        priority (int, optional): Processing priority for events produced by this command.
            When priority lanes are enabled, events with priority below the configured
            threshold are routed to a backfill stream and processed only when the
            primary stream is empty. Use ``Priority`` enum values from
            ``protean.utils.processing``. If not specified, uses the value from
            the current ``processing_priority()`` context, or ``Priority.NORMAL`` (0).
        correlation_id (str, optional): Correlation ID for distributed tracing.
            When provided (e.g. from a frontend or API gateway), this ID is propagated
            to all commands and events in the causal chain. If not provided, a new
            UUID is auto-generated.

    Returns:
        Optional[Any]: Returns either the command handler's return value or nothing, based on preference.
    """
    from protean.utils.eventing import Message
    from protean.utils.processing import current_priority, processing_priority

    # If asynchronous is not specified, use the command_processing setting from config
    if asynchronous is None:
        asynchronous = self.config["command_processing"] == Processing.ASYNC.value

    if (
        fqn(command.__class__)
        not in self.registry._elements[DomainObjects.COMMAND.value]
    ):
        raise IncorrectUsageError(
            f"Element {command.__class__.__name__} is not registered in domain {self.name}"
        )

    # --- Idempotency: check for existing result ---
    store = self.idempotency_store
    if idempotency_key and store.is_active:
        existing = store.check(idempotency_key)
        if existing and existing.get("status") == "success":
            cached_result = existing.get("result")
            if raise_on_duplicate:
                raise DuplicateCommandError(
                    f"Command with idempotency key '{idempotency_key}' "
                    f"has already been processed",
                    original_result=cached_result,
                )
            return cached_result

    # Resolve priority: explicit param > context var > default (0)
    resolved_priority = priority if priority is not None else current_priority()

    command_with_metadata = self._enrich_command(
        command,
        asynchronous,
        idempotency_key=idempotency_key,
        priority=resolved_priority,
        correlation_id=correlation_id,
    )
    position = self.event_store.store.append(command_with_metadata)

    if (
        not asynchronous
        or self.config["command_processing"] == Processing.SYNC.value
    ):
        handler_class = self.command_handler_for(command)
        if handler_class:
            try:
                # Build a Message for context propagation so that events
                # raised during sync handling inherit trace IDs.
                command_message = Message.from_domain_object(command_with_metadata)
                g.message_in_context = command_message

                # Set the processing priority context so that UoW.commit()
                # can read it when creating outbox records
                with processing_priority(resolved_priority):
                    result = handler_class._handle(command_with_metadata)
            except Exception:
                # Record failure with short TTL to allow retry
                if idempotency_key and store.is_active:
                    store.record_error(idempotency_key, "handler_failed")
                raise
            finally:
                g.pop("message_in_context", None)

            # Record success
            if idempotency_key and store.is_active:
                store.record_success(idempotency_key, result)
            return result

    # Async path: cache the position as the result
    if idempotency_key and store.is_active:
        store.record_success(idempotency_key, position)

    return position