Chapter 14: Connecting to the Outside World — Subscribers
Bookshelf has partnered with a book distributor — BookSupply — that sends webhook notifications when new books become available or when stock is replenished. These messages arrive on a Redis stream from an external system. We need to consume them and translate them into our domain language.
Subscribers vs. Event Handlers
| Event Handlers | Subscribers | |
|---|---|---|
| Listens to | Internal domain events (typed) | External broker messages (raw dicts) |
| Input | Typed event objects | Raw dict payloads |
| Purpose | React to domain state changes | Anti-corruption layer for external data |
| Registration | part_of=Aggregate |
stream="stream_name" |
Event handlers trust the data because it comes from our own aggregates. Subscribers do not trust the data — they validate, translate, and map it into domain operations.
Defining the Subscriber
@domain.subscriber(stream="book_supply")
class BookSupplyWebhookSubscriber:
"""Consumes messages from the BookSupply distributor and translates
them into domain operations."""
def __call__(self, payload: dict) -> None:
event_type = payload.get("event_type")
if event_type == "new_book_available":
current_domain.process(
AddBook(
title=payload["title"],
author=payload["author"],
isbn=payload.get("isbn"),
price_amount=payload["price"],
)
)
elif event_type == "stock_replenished":
current_domain.process(
RestockInventory(
book_id=payload["book_id"],
quantity=payload["quantity"],
)
)
The subscriber listens to the "book_supply" stream. When a message
arrives, __call__ receives the raw dict payload. We inspect the
event_type field and dispatch to the appropriate domain command.
The RestockInventory Command
We need a new command and handler for restocking:
@domain.command(part_of=Inventory)
class RestockInventory:
book_id: Identifier(required=True)
quantity: Integer(required=True)
@domain.command_handler(part_of=Inventory)
class InventoryCommandHandler:
@handle(RestockInventory)
def restock(self, command: RestockInventory) -> None:
repo = current_domain.repository_for(Inventory)
inventory = repo.get(command.book_id)
inventory.adjust_stock(command.quantity)
repo.add(inventory)
How It Works
External System Protean
(BookSupply) (Bookshelf)
│ │
│ webhook POST │
│───────────────────► │
│ Redis Stream
│ "book_supply"
│ │
│ ┌─────▼──────┐
│ │ Subscriber │
│ │ (ACL) │
│ └─────┬──────┘
│ │ domain.process(AddBook)
│ ┌─────▼──────┐
│ │ Command │
│ │ Handler │
│ └────────────┘
The subscriber acts as an anti-corruption layer (ACL) — it prevents external data formats from leaking into the domain. If BookSupply changes their payload format, only the subscriber needs to change.
Testing the Subscriber
# tests/test_subscribers.py (example tests)
def test_new_book_available_webhook():
"""External 'new_book_available' message creates a book."""
subscriber = BookSupplyWebhookSubscriber()
subscriber(
{
"event_type": "new_book_available",
"title": "War and Peace",
"author": "Leo Tolstoy",
"isbn": "9780199232765",
"price": 18.99,
}
)
# Verify the book was created
books = current_domain.repository_for(Book)._dao.query.all()
assert books.total >= 1
What We Built
- A
BookSupplyWebhookSubscriberthat consumes messages from an external system. - A
RestockInventorycommand and handler for replenishing stock. - An anti-corruption layer that translates external data into domain operations.
In the next chapter, we will enable fact events so the marketing team can get complete state snapshots for their analytics dashboard.
Full Source
from protean import Domain, handle
from protean.fields import Float, Identifier, Integer, String
from protean.utils.globals import current_domain
domain = Domain("bookshelf")
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"
@domain.aggregate
class Book:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price: Float(default=0.0)
@domain.aggregate
class Inventory:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
quantity: Integer(default=0)
def adjust_stock(self, amount: int):
self.quantity += amount
@domain.command(part_of=Book)
class AddBook:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price_amount: Float(required=True)
@domain.command(part_of=Inventory)
class RestockInventory:
book_id: Identifier(required=True)
quantity: Integer(required=True)
@domain.command_handler(part_of=Inventory)
class InventoryCommandHandler:
@handle(RestockInventory)
def restock(self, command: RestockInventory) -> None:
repo = current_domain.repository_for(Inventory)
inventory = repo.get(command.book_id)
inventory.adjust_stock(command.quantity)
repo.add(inventory)
@domain.subscriber(stream="book_supply")
class BookSupplyWebhookSubscriber:
"""Consumes messages from the BookSupply distributor and translates
them into domain operations."""
def __call__(self, payload: dict) -> None:
event_type = payload.get("event_type")
if event_type == "new_book_available":
current_domain.process(
AddBook(
title=payload["title"],
author=payload["author"],
isbn=payload.get("isbn"),
price_amount=payload["price"],
)
)
elif event_type == "stock_replenished":
current_domain.process(
RestockInventory(
book_id=payload["book_id"],
quantity=payload["quantity"],
)
)
domain.init(traverse=False)
# tests/test_subscribers.py (example tests)
def test_new_book_available_webhook():
"""External 'new_book_available' message creates a book."""
subscriber = BookSupplyWebhookSubscriber()
subscriber(
{
"event_type": "new_book_available",
"title": "War and Peace",
"author": "Leo Tolstoy",
"isbn": "9780199232765",
"price": 18.99,
}
)
# Verify the book was created
books = current_domain.repository_for(Book)._dao.query.all()
assert books.total >= 1