Skip to content

Chapter 14: Connecting to the Outside World — Subscribers

Bookshelf has partnered with a book distributor — BookSupply — that sends webhook notifications when new books become available or when stock is replenished. These messages arrive on a Redis stream from an external system. We need to consume them and translate them into our domain language.

Subscribers vs. Event Handlers

Event Handlers Subscribers
Listens to Internal domain events (typed) External broker messages (raw dicts)
Input Typed event objects Raw dict payloads
Purpose React to domain state changes Anti-corruption layer for external data
Registration part_of=Aggregate stream="stream_name"

Event handlers trust the data because it comes from our own aggregates. Subscribers do not trust the data — they validate, translate, and map it into domain operations.

Defining the Subscriber

@domain.subscriber(stream="book_supply")
class BookSupplyWebhookSubscriber:
    """Consumes messages from the BookSupply distributor and translates
    them into domain operations."""

    def __call__(self, payload: dict) -> None:
        event_type = payload.get("event_type")

        if event_type == "new_book_available":
            current_domain.process(
                AddBook(
                    title=payload["title"],
                    author=payload["author"],
                    isbn=payload.get("isbn"),
                    price_amount=payload["price"],
                )
            )

        elif event_type == "stock_replenished":
            current_domain.process(
                RestockInventory(
                    book_id=payload["book_id"],
                    quantity=payload["quantity"],
                )
            )

The subscriber listens to the "book_supply" stream. When a message arrives, __call__ receives the raw dict payload. We inspect the event_type field and dispatch to the appropriate domain command.

The RestockInventory Command

We need a new command and handler for restocking:

@domain.command(part_of=Inventory)
class RestockInventory:
    book_id: Identifier(required=True)
    quantity: Integer(required=True)
@domain.command_handler(part_of=Inventory)
class InventoryCommandHandler:
    @handle(RestockInventory)
    def restock(self, command: RestockInventory) -> None:
        repo = current_domain.repository_for(Inventory)
        inventory = repo.get(command.book_id)
        inventory.adjust_stock(command.quantity)
        repo.add(inventory)

How It Works

External System           Protean
(BookSupply)              (Bookshelf)
     │                        │
     │ webhook POST           │
     │───────────────────►    │
     │                  Redis Stream
     │                  "book_supply"
     │                        │
     │                  ┌─────▼──────┐
     │                  │ Subscriber │
     │                  │ (ACL)      │
     │                  └─────┬──────┘
     │                        │ domain.process(AddBook)
     │                  ┌─────▼──────┐
     │                  │  Command   │
     │                  │  Handler   │
     │                  └────────────┘

The subscriber acts as an anti-corruption layer (ACL) — it prevents external data formats from leaking into the domain. If BookSupply changes their payload format, only the subscriber needs to change.

Testing the Subscriber

# tests/test_subscribers.py (example tests)


def test_new_book_available_webhook():
    """External 'new_book_available' message creates a book."""
    subscriber = BookSupplyWebhookSubscriber()
    subscriber(
        {
            "event_type": "new_book_available",
            "title": "War and Peace",
            "author": "Leo Tolstoy",
            "isbn": "9780199232765",
            "price": 18.99,
        }
    )

    # Verify the book was created
    books = current_domain.repository_for(Book)._dao.query.all()
    assert books.total >= 1

What We Built

  • A BookSupplyWebhookSubscriber that consumes messages from an external system.
  • A RestockInventory command and handler for replenishing stock.
  • An anti-corruption layer that translates external data into domain operations.

In the next chapter, we will enable fact events so the marketing team can get complete state snapshots for their analytics dashboard.

Full Source

from protean import Domain, handle
from protean.fields import Float, Identifier, Integer, String
from protean.utils.globals import current_domain

domain = Domain("bookshelf")
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"


@domain.aggregate
class Book:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price: Float(default=0.0)


@domain.aggregate
class Inventory:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    quantity: Integer(default=0)

    def adjust_stock(self, amount: int):
        self.quantity += amount


@domain.command(part_of=Book)
class AddBook:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price_amount: Float(required=True)


@domain.command(part_of=Inventory)
class RestockInventory:
    book_id: Identifier(required=True)
    quantity: Integer(required=True)




@domain.command_handler(part_of=Inventory)
class InventoryCommandHandler:
    @handle(RestockInventory)
    def restock(self, command: RestockInventory) -> None:
        repo = current_domain.repository_for(Inventory)
        inventory = repo.get(command.book_id)
        inventory.adjust_stock(command.quantity)
        repo.add(inventory)




@domain.subscriber(stream="book_supply")
class BookSupplyWebhookSubscriber:
    """Consumes messages from the BookSupply distributor and translates
    them into domain operations."""

    def __call__(self, payload: dict) -> None:
        event_type = payload.get("event_type")

        if event_type == "new_book_available":
            current_domain.process(
                AddBook(
                    title=payload["title"],
                    author=payload["author"],
                    isbn=payload.get("isbn"),
                    price_amount=payload["price"],
                )
            )

        elif event_type == "stock_replenished":
            current_domain.process(
                RestockInventory(
                    book_id=payload["book_id"],
                    quantity=payload["quantity"],
                )
            )




domain.init(traverse=False)


# tests/test_subscribers.py (example tests)


def test_new_book_available_webhook():
    """External 'new_book_available' message creates a book."""
    subscriber = BookSupplyWebhookSubscriber()
    subscriber(
        {
            "event_type": "new_book_available",
            "title": "War and Peace",
            "author": "Leo Tolstoy",
            "isbn": "9780199232765",
            "price": 18.99,
        }
    )

    # Verify the book was created
    books = current_domain.repository_for(Book)._dao.query.all()
    assert books.total >= 1

Next

Chapter 15: Fact Events and the Reporting Pipeline →