Engine Architecture
The Engine class is the core of Protean's async message processing system. It
manages subscriptions, coordinates message handling, and provides graceful
lifecycle management.
Responsibilities
The Engine is responsible for:
- Registering handler subscriptions - Creating subscriptions for event handlers, command handlers, and projectors
- Managing broker subscriptions - Handling external message consumers
- Running outbox processors - Publishing messages from the transactional outbox
- Coordinating lifecycle - Starting, running, and gracefully shutting down all components
- Emitting trace events - Publishing structured
MessageTraceevents at each stage of message processing for real-time observability
Engine Initialization
When you create an Engine instance, it automatically discovers and registers all handlers from your domain:
from protean.server import Engine
# Create an engine for your domain
engine = Engine(domain)
# Or with options
engine = Engine(
domain,
test_mode=False, # Set True for deterministic testing
debug=False, # Set True for verbose logging
)
Subscription Registration
The Engine associates subscriptions with each handler during initialization. The factory determines the subscription type based on the configuration hierarchy.
Handler Subscriptions
For each event handler and projector, the Engine creates one subscription per handler, inferring the stream category and resolving configuration:
- Infers the stream category from handler metadata or associated aggregate
- Resolves subscription configuration using the priority hierarchy
- Initializes the right Subscription Type, either
a
StreamSubscriptionor anEventStoreSubscription, for the handler.
# Example: How the engine registers an event handler
handler_cls = OrderEventHandler
stream_category = engine._infer_stream_category(handler_cls) # e.g., "orders"
subscription = engine.subscription_factory.create_subscription(
handler=handler_cls,
stream_category=stream_category,
)
Command dispatch
When multiple command handlers belong to the same aggregate (and therefore the
same stream category), the Engine consolidates them into a single subscription
using a CommandDispatcher. Instead of creating N separate subscriptions that
would compete for the same messages, the dispatcher reads each command once and
routes it to the correct handler based on the command type:
# Two command handlers for the same aggregate
@domain.command_handler(part_of=User)
class RegisterUserHandler:
@handle(RegisterUser)
def register(self, command): ...
@domain.command_handler(part_of=User)
class DeactivateUserHandler:
@handle(DeactivateUser)
def deactivate(self, command): ...
# The engine creates ONE subscription keyed as "commands:{stream_category}"
# that routes RegisterUser → RegisterUserHandler
# and DeactivateUser → DeactivateUserHandler
The CommandDispatcher caches the deserialized domain object between handler
resolution and execution to avoid double deserialization.
Broker Subscriptions
For external message subscribers, the Engine creates BrokerSubscription
instances that connect to the configured message broker:
@domain.subscriber(broker="default", stream="external_events")
class ExternalEventSubscriber:
@handle("OrderCreated")
def handle_order_created(self, event):
...
Outbox Processors
When the outbox is enabled (via default_subscription_type = "stream"), the
Engine creates an OutboxProcessor for each database provider to publish
messages to the configured broker:
# Configuration in domain.toml
[server]
default_subscription_type = "stream" # Enables outbox
[outbox]
broker = "default"
messages_per_tick = 10
tick_interval = 1
Configuration Hierarchy
When registering subscriptions, the Engine consults a hierarchy of configuration sources to determine the appropriate subscription type and options for each handler. The resolution process typically follows this order of precedence:
- Handler-level configuration: Explicit parameters set on the handler, such as stream name, broker, or processing options.
- Domain element metadata: Values inferred from associated aggregate, event, or command definitions.
- Profile or role defaults: Settings derived from the active configuration profile (e.g., "production", "projection") or handler role (such as projector vs. handler).
- Domain/global configuration: Defaults specified in the domain configuration files (
domain.tomlor equivalent).
This means that explicit intent at the handler level takes priority, but system-wide defaults provide sensible behavior when specifics are not set.
For example, if a handler specifies a custom stream name, it will be used; otherwise, the engine will infer the relevant category from the handler’s associated aggregate or fall back to profile/domain defaults.
Tracing
The Engine initializes a TraceEmitter at startup that publishes structured
MessageTrace events as messages flow through the pipeline. Trace events are
emitted at three points during handle_message:
handler.started-- Before the handler processes the messagehandler.completed-- After successful processing (includesduration_ms)handler.failed-- When the handler raises an exception (includeserror)
Additional trace events are emitted by StreamSubscription (message.acked,
message.nacked, message.dlq) and OutboxProcessor (outbox.published,
outbox.failed).
Dual-channel output
The TraceEmitter writes to two Redis channels:
- Pub/Sub (
protean:trace) -- Real-time fan-out for SSE clients. The emitter checks subscriber count and skips when nobody is listening. - Stream (
protean:traces) -- Time-bounded history for the Observatory dashboard and REST API. Old entries are automatically trimmed based on the configured retention period.
Trace retention
The Engine reads trace_retention_days from the domain's [observatory]
configuration (default: 7 days). When set to 0, Stream persistence is
disabled and only Pub/Sub broadcasting is available. If the configuration value
is missing or invalid, the Engine falls back to the 7-day default.
The emitter adds zero overhead when no monitoring tools are subscribed and persistence is disabled -- see Observability for the full design, the Observatory monitoring server, and trace API endpoints.
Running the Engine
For comprehensive information on how to start, configure, and operate the engine, including:
- CLI commands and options
- Test mode for deterministic testing
- Debug mode for troubleshooting
- Programmatic usage
- Production deployment strategies
- Signal handling and graceful shutdown
- Monitoring, health checks, and observability
See the Running the Server guide.
Next Steps
- Subscriptions: Learn how handlers connect to message sources and react to events.
- Subscription Types: Compare StreamSubscription and EventStoreSubscription, and choose the right one for your workload.
- Configuration: Dive deeper into configuring engine profiles, subscriptions, and runtime options.
- Outbox Pattern: Understand reliable message publishing and transactional outbox processing.
- Observability: Real-time tracing, the Observatory server, SSE streaming, and Prometheus metrics.
- Running the Server: Explore how to deploy, operate, and monitor the server in different environments.