Skip to content

BaseProvider

Database provider interface. All persistence adapters (SQLAlchemy, Elasticsearch, etc.) implement this contract.

See Database Adapters for concrete adapter configuration.

Bases: RegisterLookupMixin

Provider implementation for each database.

Acts as a gateway to configure the database, retrieve connections, and perform commits.

Building a Database Adapter

To create a new database adapter, implement these components:

  1. Provider (extends BaseProvider)
  2. 12 abstract methods (see below) + capabilities property
  3. Manages connections, sessions, and database lifecycle
  4. Reference: protean.adapters.repository.memory.MemoryProvider

  5. DAO (extends BaseDAO from protean.port.dao)

  6. 8 abstract methods: _filter, _create, _update, _update_all, _delete, _delete_all, _raw, has_table
  7. Handles data access operations using sessions from the Provider
  8. BaseDAO provides lifecycle wrappers (get, save, create, update, delete) — you implement the underscored internals

  9. DatabaseModel (extends BaseDatabaseModel from protean.core.database_model)

  10. 2 abstract methods: from_entity, to_entity
  11. Use _entity_to_dict() helper for shared field extraction
  12. construct_database_model_class auto-generates models; decorate_database_model_class wraps user-defined @domain.model

  13. Lookups (extends BaseLookup from protean.port.dao)

  14. Required lookups: exact, iexact, contains, icontains, startswith, endswith, gt, gte, lt, lte, in (see REQUIRED_LOOKUPS)
  15. Register with @YourProvider.register_lookup
  16. Each lookup implements as_expression() returning adapter-native comparison

  17. Registration function

  18. A register() function that calls registry.register(name, class_path)
  19. Wrap imports in try/except for optional dependencies
  20. Add entry point in pyproject.toml under [project.entry-points."protean.providers"]

Session Protocol

get_session() and get_connection() must return objects that satisfy :class:SessionProtocol (commit, rollback, close, and an is_active flag).

The BaseDAO's _commit_if_standalone() calls these methods when operating outside a Unit of Work. Adapters without real transactions (e.g., Elasticsearch) should provide a session object with no-op implementations and is_active = True.

Call Flow

Initialization::

Domain.init()
  → ProviderRegistry.get(name)        # loads your Provider class
  → Provider.__init__(name, domain, conn_info)
  → Provider._create_database_artifacts()   # if setup_database() called

Persist (within UnitOfWork)::

Repository.add(aggregate)
  → DAO.save(aggregate)
    → DAO._validate_and_update_version(aggregate)
    → DatabaseModel.from_entity(aggregate)    # your conversion
    → DAO._create(model_obj) or DAO._update(model_obj)
    # UoW holds session — no commit yet

UnitOfWork.__exit__()
  → session.commit()                          # your session
  # On error: session.rollback()

Persist (standalone, no UoW)::

Repository.add(aggregate)
  → DAO.save(aggregate)
    → DatabaseModel.from_entity(aggregate)
    → DAO._create(model_obj) or DAO._update(model_obj)
    → DAO._commit_if_standalone(conn)
      → conn.commit() / conn.rollback() / conn.close()

Retrieve::

Repository.get(identifier)
  → DAO.get(identifier)
    → DAO.query.filter(id=identifier).all()
      → DAO._filter(criteria, offset, limit, order_by)
        # Must return ResultSet(items, total)
      → DatabaseModel.to_entity(item)         # your conversion
      → DAO._sync_event_position(entity)
      → DAO._track_in_uow(entity)

Lifecycle::

Provider._create_database_artifacts()   # create tables/indices
Provider._drop_database_artifacts()     # drop tables/indices
Provider._data_reset()                  # truncate all data (tests)
Provider.is_alive()                     # health check
Provider.close()                        # release connections

Initialize Provider with Connection/Adapter details

Source code in src/protean/port/provider.py
213
214
215
216
217
218
def __init__(self, name, domain, conn_info: dict):
    """Initialize Provider with Connection/Adapter details"""
    self.name = name
    self.domain = domain
    self.conn_info = conn_info
    self.managed = conn_info.get("managed", True)

capabilities abstractmethod property

capabilities: DatabaseCapabilities

Return the capabilities of this database provider.

has_capability

has_capability(capability: DatabaseCapabilities) -> bool

Check if provider has a specific capability.

Source code in src/protean/port/provider.py
225
226
227
def has_capability(self, capability: DatabaseCapabilities) -> bool:
    """Check if provider has a specific capability."""
    return capability in self.capabilities

has_all_capabilities

has_all_capabilities(
    capabilities: DatabaseCapabilities,
) -> bool

Check if provider has all the specified capabilities.

Source code in src/protean/port/provider.py
229
230
231
def has_all_capabilities(self, capabilities: DatabaseCapabilities) -> bool:
    """Check if provider has all the specified capabilities."""
    return (self.capabilities & capabilities) == capabilities

has_any_capability

has_any_capability(
    capabilities: DatabaseCapabilities,
) -> bool

Check if provider has any of the specified capabilities.

Source code in src/protean/port/provider.py
233
234
235
def has_any_capability(self, capabilities: DatabaseCapabilities) -> bool:
    """Check if provider has any of the specified capabilities."""
    return bool(self.capabilities & capabilities)

validate_lookups classmethod

validate_lookups() -> list[str]

Check that all required lookups are registered.

Returns a list of missing lookup names. Empty list means all required lookups are present.

Source code in src/protean/port/provider.py
237
238
239
240
241
242
243
244
245
@classmethod
def validate_lookups(cls) -> list[str]:
    """Check that all required lookups are registered.

    Returns a list of missing lookup names. Empty list means
    all required lookups are present.
    """
    registered = set(cls.get_lookups().keys())
    return sorted(cls.REQUIRED_LOOKUPS - registered)

get_session abstractmethod

get_session() -> SessionProtocol

Establish a new session with the database.

Must return an object satisfying :class:SessionProtocol.

Typically the session factory should be created once per application. Which is then held on to and passed to different transactions.

In Protean's case, the session scope and the transaction scope match. Which means that a new session is created when a transaction needs to be initiated (at the beginning of request handling, for example) and terminated (after committing or rolling back) at the end of the process. The session will be used as a component in Unit of Work Pattern, to handle transactions reliably.

Sessions are made available to requests as part of a Context Manager.

Source code in src/protean/port/provider.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
@abstractmethod
def get_session(self) -> SessionProtocol:
    """Establish a new session with the database.

    Must return an object satisfying :class:`SessionProtocol`.

    Typically the session factory should be created once per application. Which is then
    held on to and passed to different transactions.

    In Protean's case, the session scope and the transaction scope match. Which means that a
    new session is created when a transaction needs to be initiated (at the beginning of
    request handling, for example) and terminated (after committing or rolling back) at the end
    of the process. The session will be used as a component in Unit of Work Pattern, to handle
    transactions reliably.

    Sessions are made available to requests as part of a Context Manager.
    """

get_connection abstractmethod

get_connection() -> SessionProtocol

Get the connection object for the repository.

Must return an object satisfying :class:SessionProtocol.

Source code in src/protean/port/provider.py
278
279
280
281
282
283
@abstractmethod
def get_connection(self) -> SessionProtocol:
    """Get the connection object for the repository.

    Must return an object satisfying :class:`SessionProtocol`.
    """

is_alive abstractmethod

is_alive() -> bool

Check if the connection is alive

Source code in src/protean/port/provider.py
285
286
287
@abstractmethod
def is_alive(self) -> bool:
    """Check if the connection is alive"""

close abstractmethod

close() -> None

Close the provider and clean up any persistent connections or resources.

This method should be called to properly dispose of connections and free up resources when the provider is no longer needed. Implementations should: - Close any connection pools - Dispose of any persistent connections - Clean up any other resources (engines, clients, etc.)

Source code in src/protean/port/provider.py
289
290
291
292
293
294
295
296
297
298
@abstractmethod
def close(self) -> None:
    """Close the provider and clean up any persistent connections or resources.

    This method should be called to properly dispose of connections and free up
    resources when the provider is no longer needed. Implementations should:
    - Close any connection pools
    - Dispose of any persistent connections
    - Clean up any other resources (engines, clients, etc.)
    """

get_dao abstractmethod

get_dao(entity_cls: Type, database_model_cls: Type) -> Any

Return a DAO object configured with a live connection

Source code in src/protean/port/provider.py
300
301
302
@abstractmethod
def get_dao(self, entity_cls: Type, database_model_cls: Type) -> Any:
    """Return a DAO object configured with a live connection"""

decorate_database_model_class abstractmethod

decorate_database_model_class(
    entity_cls: Type, database_model_cls: Type
) -> Type

Enhance a user-defined DatabaseModel class with adapter internals.

Called when the user has defined a custom @domain.model for an entity. The model class is passed in — add adapter-specific base classes, column mappings, or metadata as needed.

Must return the decorated model class.

Source code in src/protean/port/provider.py
304
305
306
307
308
309
310
311
312
313
314
315
@abstractmethod
def decorate_database_model_class(
    self, entity_cls: Type, database_model_cls: Type
) -> Type:
    """Enhance a user-defined DatabaseModel class with adapter internals.

    Called when the user has defined a custom ``@domain.model`` for an
    entity. The model class is passed in — add adapter-specific base
    classes, column mappings, or metadata as needed.

    Must return the decorated model class.
    """

construct_database_model_class abstractmethod

construct_database_model_class(entity_cls: Type) -> Type

Dynamically build a DatabaseModel class for an entity.

Called when no user-defined @domain.model exists for the entity. The framework calls this during domain initialization for every aggregate/entity that doesn't have an explicit model mapping.

Must return a class that extends BaseDatabaseModel with from_entity() and to_entity() implemented.

Source code in src/protean/port/provider.py
317
318
319
320
321
322
323
324
325
326
327
@abstractmethod
def construct_database_model_class(self, entity_cls: Type) -> Type:
    """Dynamically build a DatabaseModel class for an entity.

    Called when no user-defined ``@domain.model`` exists for the entity.
    The framework calls this during domain initialization for every
    aggregate/entity that doesn't have an explicit model mapping.

    Must return a class that extends ``BaseDatabaseModel`` with
    ``from_entity()`` and ``to_entity()`` implemented.
    """

raw

raw(query: Any, data: Any = None) -> Any

Run raw query directly on the database.

Query should be executed immediately on the database as a separate unit of work (in a different transaction context). The results should be returned as returned by the database without any intervention. It is left to the consumer to interpret and organize the results correctly.

Raises NotSupportedError if the provider does not support raw queries.

Source code in src/protean/port/provider.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
def raw(self, query: Any, data: Any = None) -> Any:
    """Run raw query directly on the database.

    Query should be executed immediately on the database as a separate unit of work
        (in a different transaction context). The results should be returned as returned by
        the database without any intervention. It is left to the consumer to interpret and
        organize the results correctly.

    Raises NotSupportedError if the provider does not support raw queries.
    """
    if not self.has_capability(DatabaseCapabilities.RAW_QUERIES):
        raise NotSupportedError(
            f"Provider '{self.name}' ({self.__class__.__name__}) "
            "does not support raw queries"
        )
    return self._raw(query, data)