Skip to content

Association fields

Fields for establishing relationships between domain elements -- connecting entities and aggregates.

See Association Fields reference for usage examples.

HasOne

HasOne(to_cls, **kwargs)

Bases: Association

Represents an one-to-one association between an aggregate and its entities.

This field is used to define a relationship where an aggregate is associated with at most one instance of a child entity.

Source code in src/protean/fields/association.py
266
267
268
269
270
271
272
273
274
275
def __init__(self, to_cls, **kwargs):
    super().__init__(**kwargs)

    self._to_cls = to_cls
    self.via = kwargs.pop("via", None)

    # FIXME Find an elegant way to avoid these declarations in associations
    # Associations cannot be marked `required` or `unique`
    self.required = False
    self.unique = False

__set__

__set__(instance, value)

Setup relationship to be persisted/updated

We track the change in the instance's _temp_cache to determine if the relationship has been added, updated, or deleted. We track two aspects: state and old value.

For HasOne, there are three possible states:

  • ADDED: The relationship is being added for the first time
  • UPDATED: The relationship is being updated
  • DELETED: The relationship is being removed

Of these, the old value is applicable for UPDATED and DELETED states.

Also, we recursively remove child entities if they are associated with the old value.

The temp_cache we set up here is eventually used by the Repository to determine the changes to be persisted.

Source code in src/protean/fields/association.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
def __set__(self, instance, value):
    """Setup relationship to be persisted/updated

    We track the change in the instance's `_temp_cache` to determine if the relationship
    has been added, updated, or deleted. We track two aspects: state and old value.

    For `HasOne`, there are three possible states:

    - ADDED: The relationship is being added for the first time
    - UPDATED: The relationship is being updated
    - DELETED: The relationship is being removed

    Of these, the old value is applicable for `UPDATED` and `DELETED` states.

    Also, we recursively remove child entities if they are associated with the old value.

    The `temp_cache` we set up here is eventually used by the `Repository` to determine
    the changes to be persisted.
    """
    # Accept dictionary values and convert them to Entity objects
    if isinstance(value, dict):
        value = self.to_cls(**value)

    super().__set__(instance, value)

    if value is not None and not isinstance(value, self.to_cls):
        raise ValidationError(
            {
                "_entity": [
                    f"Value assigned to '{self.field_name}' is not of type '{self.to_cls.__name__}'"
                ]
            }
        )

    # 1. Preserve parent linkage in child entity
    if value is not None:
        # This updates the parent's unique identifier in the child
        #   so that the foreign key relationship is preserved
        instance_id_fld = id_field(instance)
        assert instance_id_fld is not None
        id_value = getattr(instance, instance_id_fld.field_name)
        linked_attribute = self._linked_attribute(instance.__class__)
        if hasattr(value, linked_attribute):
            setattr(
                value, linked_attribute, id_value
            )  # This overwrites any existing linkage, which is correct

        # Add the parent to the child entity cache
        # Temporarily set linkage to parent in child entity
        setattr(value, self._linked_reference(type(instance)), instance)

    # 2. Determine and store the change in the relationship
    current_value = getattr(instance, self.field_name)
    if current_value:
        current_value_id_fld = id_field(current_value)
        assert current_value_id_fld is not None
        current_value_id = getattr(current_value, current_value_id_fld.field_name)
    else:
        current_value_id = None
    if value:
        value_id_fld = id_field(value)
        assert value_id_fld is not None
        value_id = getattr(value, value_id_fld.field_name)
    else:
        value_id = None
    if current_value is None:
        # Entity was not associated earlier
        instance._temp_cache[self.field_name]["change"] = "ADDED"
    elif value is None:
        # Entity was associated earlier, but now being removed
        instance._temp_cache[self.field_name]["change"] = "DELETED"
        instance._temp_cache[self.field_name]["old_value"] = current_value
    elif current_value_id != value_id:
        # A New Entity is being associated replacing the old one
        instance._temp_cache[self.field_name]["change"] = "UPDATED"
        instance._temp_cache[self.field_name]["old_value"] = current_value
    elif current_value_id == value_id and value.state_.is_changed:
        # Entity was associated earlier, but now being updated
        instance._temp_cache[self.field_name]["change"] = "UPDATED"
    else:
        instance._temp_cache[self.field_name]["change"] = (
            None  # The same object has been assigned, No-Op
        )

    self._set_own_value(instance, value)

    # 3. Go Recursive and remove child entities if they are associated with the old value
    if instance._temp_cache[self.field_name]["change"] == "DELETED":
        old_value = instance._temp_cache[self.field_name]["old_value"]
        if has_association_fields(old_value):
            for field_name, field_obj in association_fields(old_value).items():
                if isinstance(field_obj, HasMany):
                    field_obj.remove(old_value, getattr(old_value, field_name))
                elif isinstance(field_obj, HasOne):
                    setattr(old_value, field_name, None)

    if instance._initialized and instance._root is not None:
        instance._root._postcheck()  # Trigger validations from the top

as_dict

as_dict(value)

Return JSON-compatible value of self

Source code in src/protean/fields/association.py
517
518
519
520
def as_dict(self, value):
    """Return JSON-compatible value of self"""
    if value is not None:
        return value.to_dict()

HasMany

HasMany(to_cls, **kwargs)

Bases: Association

Represents a one-to-many association between two entities. This field is used to define a relationship where an aggregate has multiple instances of a chil entity.

PARAMETER DESCRIPTION
to_cls

The class of the target entity.

TYPE: type

**kwargs

Additional keyword arguments to be passed to the base field class.

TYPE: Any DEFAULT: {}

Source code in src/protean/fields/association.py
266
267
268
269
270
271
272
273
274
275
def __init__(self, to_cls, **kwargs):
    super().__init__(**kwargs)

    self._to_cls = to_cls
    self.via = kwargs.pop("via", None)

    # FIXME Find an elegant way to avoid these declarations in associations
    # Associations cannot be marked `required` or `unique`
    self.required = False
    self.unique = False

__set__

__set__(instance, value)

This supports direct assignment of values to HasMany fields, like: order.items = [item1, item2, item3]

Source code in src/protean/fields/association.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
def __set__(self, instance, value):
    """This supports direct assignment of values to HasMany fields, like:
    `order.items = [item1, item2, item3]`
    """
    value = value if isinstance(value, list) else [value]

    # Accept dictionary values and convert them to Entity objects
    values = []
    for item in value:
        if isinstance(item, dict):
            values.append(self.to_cls(**item))
        else:
            values.append(item)

    super().__set__(instance, values)

    if value is not None:
        # Ensure cache is initialized before add() tries to read via getattr().
        # Without this, getattr triggers __get__ which falls back to a DB query,
        # failing for event-sourced aggregates that have no database tables.
        try:
            self.get_cached_value(instance)
        except KeyError:
            self.set_cached_value(instance, [])
        self.add(instance, values)

add

add(instance, items) -> None

Available as add_<HasMany Field Name> method on the entity instance.

This method adds one or more linked entities to the source entity. It also diffs the current value with the new value to determine the changes that need to be persisted.

The method also takes care of the attributes of the linked entities, preparing them for persistence.

Each linkage takes care of its own attributes, preparing them for persistence. One exception is deletion of an entity - all child entities have to be marked as removed.

We track the change in the instance's _temp_cache to determine if the relationship has been added, updated, or deleted. We track each item's state and group the changes into three buckets: - ADDED: The relationship is being added for the first time - UPDATED: The relationship is being updated - DELETED: The relationship is being removed

The DELETED objects are detected from the pool of new objects, but it is also possible to remove them directly with the remove method.

The temp_cache we set up here is eventually used by the Repository to determine the changes to be persisted.

PARAMETER DESCRIPTION
instance

The source entity instance.

TYPE: BaseEntity

items

The linked entity or entities to be added.

TYPE: list | BaseEntity

Source code in src/protean/fields/association.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
def add(self, instance, items) -> None:
    """
    Available as `add_<HasMany Field Name>` method on the entity instance.

    This method adds one or more linked entities to the source entity. It also diffs the current value with the
    new value to determine the changes that need to be persisted.

    The method also takes care of the attributes of the linked entities, preparing them for persistence.

    Each linkage takes care of its own attributes, preparing them for persistence.
    One exception is deletion of an entity - all child entities have to be marked as removed.

    We track the change in the instance's `_temp_cache` to determine if the relationship
    has been added, updated, or deleted. We track each item's state and group the changes
    into three buckets:
    - ADDED: The relationship is being added for the first time
    - UPDATED: The relationship is being updated
    - DELETED: The relationship is being removed

    The `DELETED` objects are detected from the pool of new objects, but it is also possible to remove them
    directly with the `remove` method.

    The `temp_cache` we set up here is eventually used by the `Repository` to determine
    the changes to be persisted.

    Args:
        instance (BaseEntity): The source entity instance.
        items (list | BaseEntity): The linked entity or entities to be added.
    """
    super().__set__(instance, items)

    data = getattr(instance, self.field_name)

    # Convert a single item into a list of items, if necessary
    items = [items] if not isinstance(items, list) else items

    # Validate that all items are of the same type, and the correct type
    for item in items:
        if not isinstance(item, self.to_cls):
            raise ValidationError(
                {
                    "_entity": [
                        f"Value assigned to '{self.field_name}' is not of type '{self.to_cls.__name__}'"
                    ]
                }
            )

    entity_id_fld = id_field(self.to_cls)
    assert entity_id_fld is not None
    instance_id_fld = id_field(instance)
    assert instance_id_fld is not None

    current_value_ids = [getattr(value, entity_id_fld.field_name) for value in data]

    # Remove items when set to empty
    if len(items) == 0 and len(current_value_ids) > 0:
        self.remove(instance, data)

    # Re-read from cache after potential removal so new_data reflects
    # the post-remove state instead of the stale `data` reference.
    try:
        cached = self.get_cached_value(instance)
        new_data = list(cached) if isinstance(cached, list) else [cached]
    except KeyError:
        new_data = list(data)

    for item in items:
        # Items to add
        identity = getattr(item, entity_id_fld.field_name)
        if identity not in current_value_ids:
            # If the same item is added multiple times, the last item added will win
            instance._temp_cache[self.field_name]["added"][identity] = item

            setattr(
                item,
                self._linked_attribute(type(instance)),
                getattr(instance, instance_id_fld.field_name),
            )

            # Temporarily set linkage to parent in child entity
            setattr(item, self._linked_reference(type(instance)), instance)

            new_data.append(item)
        # Items to update
        elif (
            identity in current_value_ids
            and item.state_.is_persisted
            and item.state_.is_changed
        ):
            setattr(
                item,
                self._linked_attribute(type(instance)),
                getattr(instance, instance_id_fld.field_name),
            )

            # Temporarily set linkage to parent in child entity
            setattr(item, self._linked_reference(type(instance)), instance)

            instance._temp_cache[self.field_name]["updated"][identity] = item

            # Replace updated item in the working copy
            for i, existing in enumerate(new_data):
                if getattr(existing, entity_id_fld.field_name) == identity:
                    new_data[i] = item
                    break

    # Update cache with complete item list instead of deleting it.
    # This ensures items are immediately visible via __get__ without
    # requiring a DB round-trip, which is essential for event-sourced
    # aggregates that have no database tables.
    self.set_cached_value(instance, new_data)

    if instance._initialized and instance._root is not None:
        instance._root._postcheck()  # Trigger validations from the top

remove

remove(instance, items) -> None

Available as add_<HasMany Field Name> method on the entity instance.

Remove one or more linked entities from the source entity.

We also recursively remove child entities if they are associated with the removed value.

PARAMETER DESCRIPTION
instance

The source entity instance.

TYPE: BaseEntity

items

The linked entity or entities to be removed.

TYPE: list | BaseEntity

Source code in src/protean/fields/association.py
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
def remove(self, instance, items) -> None:
    """
    Available as `add_<HasMany Field Name>` method on the entity instance.

    Remove one or more linked entities from the source entity.

    We also recursively remove child entities if they are associated with the removed value.

    Args:
        instance (BaseEntity): The source entity instance.
        items (list | BaseEntity): The linked entity or entities to be removed.
    """
    data = getattr(instance, self.field_name)

    # Convert a single item into a list of items, if necessary
    items = [items] if not isinstance(items, list) else items

    # Validate that all items are of the same type, and the correct type
    for item in items:
        if not isinstance(item, self.to_cls):
            raise ValidationError(
                {
                    "_entity": [
                        f"Value assigned to '{self.field_name}' is not of type '{self.to_cls.__name__}'"
                    ]
                }
            )

    entity_id_fld = id_field(self.to_cls)
    assert entity_id_fld is not None

    current_value_ids = [getattr(value, entity_id_fld.field_name) for value in data]

    removed_ids = set()
    for item in items:
        identity = getattr(item, entity_id_fld.field_name)
        if identity in current_value_ids:
            if identity not in instance._temp_cache[self.field_name]["removed"]:
                instance._temp_cache[self.field_name]["removed"][identity] = item
                removed_ids.add(identity)

        # Remove child entities
        if has_association_fields(item):
            for field_name, field_obj in association_fields(item).items():
                if isinstance(field_obj, HasMany):
                    field_obj.remove(item, getattr(item, field_name))
                elif isinstance(field_obj, HasOne):
                    setattr(item, field_name, None)

    # Update cache with items remaining after removal instead of deleting it.
    if removed_ids:
        new_data = [
            item
            for item in data
            if getattr(item, entity_id_fld.field_name) not in removed_ids
        ]
        self.set_cached_value(instance, new_data)

    if instance._initialized and instance._root is not None:
        instance._root._postcheck()  # Trigger validations from the top

as_dict

as_dict(value) -> list

Return JSON-compatible value of self.

PARAMETER DESCRIPTION
value

The value to be converted to a JSON-compatible format.

TYPE: list

RETURNS DESCRIPTION
list

A list of dictionaries representing the linked entities.

TYPE: list

Source code in src/protean/fields/association.py
788
789
790
791
792
793
794
795
796
797
798
def as_dict(self, value) -> list:
    """
    Return JSON-compatible value of self.

    Args:
        value (list): The value to be converted to a JSON-compatible format.

    Returns:
        list: A list of dictionaries representing the linked entities.
    """
    return [item.to_dict() for item in value]

get

get(instance, **kwargs)

Fetch a single linked entity based on the provided criteria.

Available as get_one_from_<HasMany Field Name> method on the entity instance.

PARAMETER DESCRIPTION
**kwargs

The filtering criteria.

TYPE: Any DEFAULT: {}

Source code in src/protean/fields/association.py
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
def get(self, instance, **kwargs):
    """Fetch a single linked entity based on the provided criteria.

    Available as `get_one_from_<HasMany Field Name>` method on the entity instance.

    Args:
        **kwargs (Any): The filtering criteria.
    """
    data = self.filter(instance, **kwargs)

    if len(data) == 0:
        raise exceptions.ObjectNotFoundError(
            "No linked entities matching criteria found"
        )

    if len(data) > 1:
        raise exceptions.TooManyObjectsError(
            "Multiple linked entities matching criteria found"
        )

    return data[0]

filter

filter(instance, **kwargs)

Filter the linked entities based on the provided criteria.

Available as filter_<HasMany Field Name> method on the entity instance.

PARAMETER DESCRIPTION
**kwargs

The filtering criteria.

TYPE: Any DEFAULT: {}

Source code in src/protean/fields/association.py
822
823
824
825
826
827
828
829
830
831
832
833
834
835
def filter(self, instance, **kwargs):
    """Filter the linked entities based on the provided criteria.

    Available as `filter_<HasMany Field Name>` method on the entity instance.

    Args:
        **kwargs (Any): The filtering criteria.
    """
    data = getattr(instance, self.field_name)
    return [
        item
        for item in data
        if all(getattr(item, key) == value for key, value in kwargs.items())
    ]

Reference

Reference(to_cls, **kwargs)

Bases: FieldCacheMixin, Field

A field representing a reference to another entity. This field is used to establish the reverse relationship to the remote entity.

PARAMETER DESCRIPTION
to_cls

The target entity class or its name.

TYPE: str | type

**kwargs

Additional keyword arguments to be passed to the base Field class.

TYPE: Any DEFAULT: {}

Source code in src/protean/fields/association.py
100
101
102
103
104
def __init__(self, to_cls, **kwargs):
    super().__init__(**kwargs)
    self._to_cls = to_cls

    self.relation = _ReferenceField(self)

linked_attribute property

linked_attribute

Return linkage attribute to the target class

This method is initially called from __set_name__() -> get_attribute_name() at which point, the to_cls has not been initialized properly. We simply default the linked attribute to 'id' in that case.

Eventually, when setting value the first time, the to_cls entity is initialized and the attribute name is reset correctly.

get_attribute_name

get_attribute_name()

Return formatted attribute name for the shadow field

Source code in src/protean/fields/association.py
110
111
112
113
114
def get_attribute_name(self):
    """Return formatted attribute name for the shadow field"""
    return self.referenced_as or "{}_{}".format(
        self.field_name, self.linked_attribute
    )

get_shadow_field

get_shadow_field()

Return shadow field Primarily used during Entity initialization to register shadow field

Source code in src/protean/fields/association.py
116
117
118
119
def get_shadow_field(self):
    """Return shadow field
    Primarily used during Entity initialization to register shadow field"""
    return (self.attribute_name, self.relation)

__get__

__get__(instance, owner)

Retrieve associated objects

Source code in src/protean/fields/association.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def __get__(self, instance, owner):
    """Retrieve associated objects"""
    reference_obj = None
    if hasattr(instance, "state_"):
        try:
            reference_obj = self.get_cached_value(instance)
        except KeyError:
            # Fetch target object by own Identifier
            id_field = self.get_attribute_name()

            id_value = None
            if hasattr(instance, id_field):
                id_value = getattr(instance, id_field)

            if id_value:
                reference_obj = self._fetch_objects(self.linked_attribute, id_value)
                if reference_obj:
                    self._set_own_value(instance, reference_obj)
                else:
                    # No Objects were found in the remote entity with this Entity's ID
                    pass

    return reference_obj

__set__

__set__(instance, value)

Override __set__ to coordinate between relation field and its shadow attribute

Source code in src/protean/fields/association.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def __set__(self, instance, value):
    """Override `__set__` to coordinate between relation field and its shadow attribute"""
    value = self._load(value)

    if value:
        # Check if the reference object has been saved. Otherwise, throw ValueError
        # FIXME not a comprehensive check. Should refer to state
        id_fld = id_field(value)
        assert id_fld is not None
        if getattr(value, id_fld.field_name) is None:
            raise ValueError(
                "Target Object must be saved before being referenced",
                self.field_name,
            )
        else:
            self._set_own_value(instance, value)
            self._set_relation_value(
                instance, getattr(value, self.linked_attribute)
            )
    else:
        self._reset_values(instance)

as_dict

as_dict(value)

Return JSON-compatible value of self

Source code in src/protean/fields/association.py
250
251
252
def as_dict(self, value):
    """Return JSON-compatible value of self"""
    raise NotImplementedError