Run the Server
This guide covers how to start, configure, and operate Protean's async processing server — the background process that runs event handlers, command handlers, and projectors.
For the conceptual architecture behind the server, see Async Processing.
CLI Command
Start the server using the protean server command:
protean server [OPTIONS]
Options
| Option | Description | Default |
|---|---|---|
--domain |
Path to domain module | . (current directory) |
--test-mode |
Run in test mode | False |
--debug |
Enable debug logging | False |
--workers |
Number of worker processes | 1 |
--help |
Show help message |
Database Setup
Before starting the server, ensure your database tables are created:
# Create all tables (aggregates, entities, projections, outbox)
protean db setup --domain=my_domain
# Create only outbox tables (useful when migrating to stream subscriptions)
protean db setup-outbox --domain=my_domain
# Drop all tables (requires confirmation)
protean db drop --domain=my_domain
protean db drop --domain=my_domain --yes # Skip confirmation
# Delete all data, preserving schema (requires confirmation)
protean db truncate --domain=my_domain
protean db truncate --domain=my_domain --yes # Skip confirmation
See Database Commands for the full reference.
Basic Usage
Starting the Server
# Start with domain in current directory
protean server
# Start with specific domain path
protean server --domain=src/my_domain
# Start with module path
protean server --domain=my_package.my_domain
# Start with specific instance
protean server --domain=my_domain:custom_domain
Multiple Workers
Run multiple Engine processes from a single command using --workers:
# Start 4 worker processes
protean server --domain=my_domain --workers 4
Multi-worker mode requires stream subscriptions so that Redis consumer groups
can distribute messages across workers. Set this in your domain.toml:
[server]
default_subscription_type = "stream"
See Configuration Reference for the full list of server options.
Workers coordinate through Redis consumer groups (for stream message distribution) and database-level locking (for outbox processing). No IPC or shared memory is needed between workers.
For the full multi-worker guide including architecture, coordination details, and deployment patterns, see Multi-Worker Mode.
Domain Discovery
The server discovers your domain in this order:
- Environment variable:
PROTEAN_DOMAINif set - --domain parameter: Path or module specified
- Current directory: Looks for
domain.pyorsubdomain.py
Within a module, it looks for:
- Variable named
domainorsubdomain - Any variable that is a
Domaininstance - Raises error if multiple instances found
Debug Mode
Enable verbose logging for troubleshooting:
protean server --domain=my_domain --debug
Debug mode logs:
- Subscription registration details
- Message processing events
- Position updates
- Configuration resolution
Test Mode
Test mode processes available messages and exits, useful for integration tests:
protean server --domain=my_domain --test-mode
In test mode, the server:
- Starts all subscriptions and processors
- Runs multiple processing cycles
- Allows message chain propagation
- Shuts down after processing completes
Using Test Mode in Tests
Test mode enables deterministic testing of async message flows:
import pytest
from protean.server import Engine
def test_order_creates_inventory_reservation():
"""Test that creating an order reserves inventory."""
# Arrange: Create order (raises events)
with domain.domain_context():
order = Order.create(
customer_id="123",
items=[OrderItem(product_id="ABC", quantity=5)]
)
domain.repository_for(Order).add(order)
# Act: Process events in test mode
engine = Engine(domain, test_mode=True)
engine.run()
# Assert: Verify inventory was reserved
with domain.domain_context():
reservation = domain.repository_for(Reservation).get_by_order(order.id)
assert reservation is not None
assert reservation.quantity == 5
Testing Multi-Step Flows
Test mode handles cascading events automatically:
def test_order_fulfillment_flow():
"""Test complete order fulfillment flow."""
# Order created -> Inventory reserved -> Payment processed -> Order shipped
with domain.domain_context():
order = Order.create(...)
domain.repository_for(Order).add(order)
# Process all cascading events
engine = Engine(domain, test_mode=True)
engine.run()
with domain.domain_context():
order = domain.repository_for(Order).get(order.id)
assert order.status == "shipped"
Programmatic Usage
You can also start the engine programmatically:
from protean.server import Engine
from my_domain import domain
# Create and run the engine
engine = Engine(domain)
engine.run() # Blocking call
With Custom Options
engine = Engine(
domain,
test_mode=False,
debug=True,
)
engine.run()
Accessing Engine State
engine = Engine(domain)
# Check subscriptions
print(f"Handler subscriptions: {len(engine._subscriptions)}")
print(f"Broker subscriptions: {len(engine._broker_subscriptions)}")
print(f"Outbox processors: {len(engine._outbox_processors)}")
# Access subscription factory
factory = engine.subscription_factory
Signal Handling
The server handles shutdown signals gracefully:
| Signal | Behavior |
|---|---|
SIGINT (Ctrl+C) |
Graceful shutdown |
SIGTERM |
Graceful shutdown |
SIGHUP |
Graceful shutdown |
During graceful shutdown:
- Stop accepting new messages
- Complete processing of current batch
- Persist subscription positions
- Clean up resources
- Exit with appropriate code
Exit Codes
| Code | Meaning |
|---|---|
| 0 | Normal shutdown (signal or test mode completion) |
| 1 | Error during processing |
Next Steps
- Error Handling — Retry logic, dead letter queues, and recovery mechanisms
- Production Deployment — Process management, Docker, Kubernetes, scaling, and health checks
- Logging — Structured logging with structlog, environment-aware defaults, and context variables
- Monitoring — Observatory dashboard, Prometheus metrics, and subscription lag tracking
- Multi-Worker Mode — Run multiple Engine processes for higher throughput
- Engine Architecture — Understand engine internals
- Configuration — Full configuration reference
- Subscription Types — Choose the right subscription
- Using Priority Lanes — Route background workloads through the backfill lane
- External Event Dispatch — Deliver published events to external brokers