Transformer Package

Transformer (Base Class)

Bases: ABC

Base class for all transformers.

A transformer will generate an instance of a target class from an instance of a source class, making use of a specification.

This is an abstract class. Different implementations will subclass this.

Specification normalization has two phases:

  1. Load-time normalization (_normalize_spec_dict): Structural fixes applied to a raw dict before Pydantic instantiation — YAML quirk handling, $ref expansion, dict-to-list conversion. Does not require a source schema. All entry points (load_transformer_specification, create_transformer_specification, Session, loaders) go through this single method.

  2. Schema-bind-time induction (derived_specification): Semantic defaults inferred from the source schema — populated_from, range, foreign-key resolution. Runs lazily on first access to derived_specification and requires source_schemaview to be set.

Source code in src/linkml_map/transformer/transformer.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
@dataclass
class Transformer(ABC):
    """
    Base class for all transformers.

    A transformer will generate an instance of a target class from
    an instance of a source class, making use of a specification.

    This is an abstract class. Different implementations will
    subclass this.

    Specification normalization has two phases:

    1. **Load-time normalization** (``_normalize_spec_dict``): Structural fixes
       applied to a raw dict before Pydantic instantiation — YAML quirk handling,
       ``$ref`` expansion, dict-to-list conversion. Does not require a source schema.
       All entry points (``load_transformer_specification``,
       ``create_transformer_specification``, ``Session``, ``loaders``) go through
       this single method.

    2. **Schema-bind-time induction** (``derived_specification``): Semantic defaults
       inferred from the source schema — ``populated_from``, ``range``, foreign-key
       resolution. Runs lazily on first access to ``derived_specification`` and
       requires ``source_schemaview`` to be set.
    """

    specification: TransformationSpecification = None
    """A specification of how to generate target objects from source objects."""

    source_schemaview: SchemaView = None
    """A view over the schema describing the input/source object."""

    _derived_specification: TransformationSpecification = None
    """A specification with inferred missing values."""

    _source_schema_patched: bool = field(default=False)
    """Flag to track if source schema patches have been applied."""

    target_schemaview: SchemaView | None = None
    """A view over the schema describing the output/target object."""

    unrestricted_eval: bool = field(default=False)
    """Set to True to allow arbitrary evals as part of transformation."""

    strict: bool = field(default=False)
    """Raise on expression references that do not resolve to a schema slot.

    When ``False`` (the default), unresolved names emit a warning and the
    expression evaluator returns ``None`` (preserving SQL-style null
    propagation for legitimate empty values but losing the signal for
    typos). When ``True``, unresolved names raise
    :class:`~linkml_map.transformer.errors.TransformationError`, which
    surfaces typos, stale references, and wrong-table accesses that
    would otherwise produce silent nulls in the output.
    """

    _curie_converter: Converter = None

    def map_object(self, obj: OBJECT_TYPE, source_type: str | None = None, **kwargs: Any) -> OBJECT_TYPE:
        """
        Transform source object into an instance of the target class.

        :param obj:
        :param source_type:
        :return:
        """
        raise NotImplementedError

    def map_database(
        self, source_database: Any, target_database: Any | None = None, **kwargs: dict[str, Any]
    ) -> OBJECT_TYPE:
        """
        Transform source resource.

        :param source_database:
        :param target_database:
        :param kwargs:
        :return:
        """
        raise NotImplementedError

    def load_source_schema(self, path: str | Path | dict) -> None:
        """
        Set source_schemaview from a schema path.

        :param path:
        """
        if isinstance(path, Path):
            path = str(path)
        self.source_schemaview = SchemaView(path)

    def load_transformer_specification(self, path: str | Path) -> None:
        """
        Set specification from a schema path.

        :param path:
        :return:
        """
        with open(path) as f:
            obj = yaml.safe_load(f)
            self._normalize_spec_dict(obj)
            self.specification = TransformationSpecification(**obj)

    def load_transformer_specifications(self, paths: tuple[str | Path, ...]) -> None:
        """Load and merge multiple transformation spec files into a single specification.

        Accepts file paths and/or directories.  Directories are recursively
        searched for YAML files.  All specs are merged (class_derivations
        appended, enum/slot_derivations unioned by name) and the result is
        set as ``self.specification``.

        :param paths: One or more file or directory paths.
        """
        from linkml_map.utils.spec_merge import load_and_merge_specs

        obj = load_and_merge_specs(paths)
        self._normalize_spec_dict(obj)
        self.specification = TransformationSpecification(**obj)

    @classmethod
    def normalize_transform_spec(cls, obj: dict[str, Any], normalizer: ReferenceValidator) -> dict:
        """Recursively normalize class_derivations and flatten object_derivations."""
        obj = normalizer.normalize(obj)

        class_derivations = obj.get("class_derivations", [])
        if isinstance(class_derivations, dict):
            cd_iter = class_derivations.values()
        else:
            cd_iter = class_derivations
        for class_spec in cd_iter:
            if not isinstance(class_spec, dict):
                continue
            parent_source = class_spec.get("populated_from")
            slot_derivations = class_spec.get("slot_derivations", {})
            for slot_name, slot_spec in slot_derivations.items():
                if slot_spec.get("value") is not None and slot_spec.get("range") is None:
                    slot_spec["range"] = "string"
                cls._normalize_slot_class_derivations(slot_name, slot_spec, normalizer, parent_source)
        return obj

    @classmethod
    def _normalize_slot_class_derivations(
        cls,
        slot_name: str,
        slot_spec: dict[str, Any],
        normalizer: ReferenceValidator,
        parent_populated_from: str | None = None,
    ) -> None:
        """Flatten object_derivations and normalize class_derivations on a slot.

        Four steps, applied recursively to nested slots:
        1. Flatten ``object_derivations`` into ``class_derivations`` (with
           deprecation warning; error if both are present).
        2. Expand compact-key entries (``- Condition: {...}`` → ``{name: Condition, ...}``).
        3. Inherit ``populated_from`` from the parent class derivation when absent.
        4. Run the normalizer on each class derivation entry so that nested
           dict-keyed ``slot_derivations`` get ``name`` injected.
        """
        # Step 1: flatten object_derivations → class_derivations
        object_derivations = slot_spec.get("object_derivations", [])
        if object_derivations:
            if slot_spec.get("class_derivations"):
                msg = (
                    f"SlotDerivation '{slot_name}' has both 'object_derivations' and "
                    f"'class_derivations'. Remove 'object_derivations' and use "
                    f"'class_derivations' only."
                )
                raise ValueError(msg)

            warnings.warn(
                f"SlotDerivation '{slot_name}' uses 'object_derivations', which is "
                f"deprecated. Use list-based class_derivations instead. "
                f"'object_derivations' will be removed in a future version. "
                f"See https://github.com/linkml/linkml-map/issues/112",
                DeprecationWarning,
                stacklevel=4,
            )

            flattened: list[dict[str, Any]] = []
            for od in object_derivations:
                od_cd = od.get("class_derivations", {})
                if isinstance(od_cd, dict):
                    for name, body in od_cd.items():
                        entry = body if isinstance(body, dict) else {}
                        entry.setdefault("name", name)
                        flattened.append(entry)
                elif isinstance(od_cd, list):
                    flattened.extend(od_cd)
            slot_spec["class_derivations"] = flattened
            del slot_spec["object_derivations"]

        # Steps 2-4: expand compact keys, inherit populated_from, normalize, and recurse
        slot_cd = slot_spec.get("class_derivations")
        if not isinstance(slot_cd, list):
            return

        cls._expand_compact_keys(slot_cd)

        for cd_entry in slot_cd:
            if not isinstance(cd_entry, dict):
                continue
            if not cd_entry.get("populated_from") and parent_populated_from:
                cd_entry["populated_from"] = parent_populated_from
            normalized = normalizer.normalize(cd_entry)
            cd_entry.clear()
            cd_entry.update(normalized)
            child_source = cd_entry.get("populated_from")
            for nested_name, nested_sd in cd_entry.get("slot_derivations", {}).items():
                if isinstance(nested_sd, dict):
                    cls._normalize_slot_class_derivations(nested_name, nested_sd, normalizer, child_source)

    @classmethod
    def _normalize_spec_dict(cls, obj: dict[str, Any]) -> None:
        """Normalize a raw specification dict in place.

        Bundles _preprocess_class_derivations, ReferenceValidator normalization,
        and nested ObjectDerivation fixup into a single entry point. Mutates
        ``obj`` by replacing its contents with the normalized result.

        :param obj: Raw specification dict (e.g. from YAML or user code).
        """
        cls._preprocess_class_derivations(obj)
        normalizer = ReferenceValidator(package_schemaview("linkml_map.datamodel.transformer_model"))
        normalizer.expand_all = True
        normalized = cls.normalize_transform_spec(obj, normalizer)
        obj.clear()
        obj.update(normalized)

    @staticmethod
    def _expand_compact_keys(items: list[dict[str, Any]]) -> None:
        """Expand YAML compact-key dicts in a list in place.

        Converts ``{"Condition": {"populated_from": "x"}}`` →
        ``{"name": "Condition", "populated_from": "x"}``.
        Skips items whose sole key is ``"name"`` (already expanded).
        """
        for i, item in enumerate(items):
            if isinstance(item, dict) and len(item) == 1:
                key, val = next(iter(item.items()))
                if key != "name" and isinstance(val, dict | type(None)):
                    expanded = val if val is not None else {}
                    expanded.setdefault("name", key)
                    items[i] = expanded

    @staticmethod
    def _preprocess_class_derivations(obj: dict[str, Any]) -> None:
        """Pre-process top-level class_derivations before ReferenceValidator normalization.

        Handles two cases:
        1. Dict format with None values (e.g. ``Entity:`` with no body) — replace
           with empty dicts so ReferenceValidator.ensure_list doesn't choke.
        2. List format with compact keys — delegate to ``_expand_compact_keys``.
        """
        cd = obj.get("class_derivations")
        if isinstance(cd, dict):
            for k, v in cd.items():
                if v is None:
                    cd[k] = {}
        elif isinstance(cd, list):
            Transformer._expand_compact_keys(cd)

    def create_transformer_specification(self, obj: dict[str, Any]) -> None:
        """
        Create specification from a dict.

        TODO: this will no longer be necessary when pydantic supports inlined as dict

        :param path:
        :return:
        """
        self._normalize_spec_dict(obj)
        self.specification = TransformationSpecification(**obj)

    def _apply_source_schema_patches(self) -> None:
        """Apply source_schema_patches from specification to source_schemaview."""
        if self._source_schema_patched:
            return
        if self.specification and self.source_schemaview:
            patches = self.specification.source_schema_patches
            if patches:
                apply_schema_patch(self.source_schemaview, patches)
                self.source_schemaview.induced_slot.cache_clear()
        self._source_schema_patched = True

    @property
    def derived_specification(self) -> TransformationSpecification | None:
        """Return the specification with schema-inferred defaults filled in.

        Creates a deep copy of ``self.specification``, applies any source schema
        patches, then calls ``induce_missing_values`` to fill in ``populated_from``,
        ``range``, and other fields that require knowledge of the source schema.
        The result is cached for subsequent access.

        This is the second phase of normalization — see the class docstring for
        the full two-phase pipeline.
        """
        if self._derived_specification is None:
            if self.specification is None:
                return None
            self._apply_source_schema_patches()
            self._derived_specification = deepcopy(self.specification)
            induce_missing_values(self._derived_specification, self.source_schemaview)
            self._synthesize_implicit_joins(self._derived_specification)
        return self._derived_specification

    def _synthesize_implicit_joins(self, spec: TransformationSpecification) -> None:
        """Add explicit join specs for nested class_derivations with cross-table references.

        Walks all nested class_derivations. When a nested CD has a different
        ``populated_from`` than its parent and no explicit ``joins:`` block,
        synthesizes an ``AliasedClass`` join entry on the parent CD using
        :func:`pick_join_key` to determine the join column.

        Mutates *spec* in place.

        :param spec: The derived specification to augment.
        """
        sv = self.source_schemaview
        if sv is None:
            return

        for cd in spec.class_derivations:
            parent_source = cd.populated_from or cd.name
            self._walk_and_synthesize_joins(cd, parent_source, sv)

    def _walk_and_synthesize_joins(
        self,
        class_deriv: ClassDerivation,
        parent_source: str,
        sv: SchemaView,
    ) -> None:
        """Recursively walk slot_derivations and synthesize joins on parent CDs.

        :param class_deriv: The parent ClassDerivation to add joins to.
        :param parent_source: The parent's populated_from.
        :param sv: Source schema view.
        """
        for sd in class_deriv.slot_derivations.values():
            if not sd.class_derivations:
                continue
            for nested_cd in sd.class_derivations:
                nested_source = nested_cd.populated_from or parent_source

                # Synthesize a join when the nested CD references a different table
                if nested_source != parent_source:
                    join_key = pick_join_key(sv, parent_source, nested_source)
                    if join_key is not None:
                        if class_deriv.joins is None:
                            class_deriv.joins = {}
                        if nested_source not in class_deriv.joins:
                            class_deriv.joins[nested_source] = AliasedClass(
                                alias=nested_source,
                                join_on=join_key,
                            )
                            logger.info(
                                "Synthesized implicit join: %s.joins[%r] on column %r",
                                class_deriv.name,
                                nested_source,
                                join_key,
                            )

                # Always recurse into nested CD's own slots
                if nested_cd.slot_derivations:
                    self._walk_and_synthesize_joins(nested_cd, nested_source, sv)

    def _get_class_derivation(self, target_class_name: str) -> ClassDerivation:
        spec = self.derived_specification
        matching_tgt_class_derivs = [
            deriv
            for deriv in spec.class_derivations
            if deriv.populated_from == target_class_name
            or (not deriv.populated_from and target_class_name == deriv.name)
        ]
        logger.debug(f"Target class derivations={matching_tgt_class_derivs}")
        if len(matching_tgt_class_derivs) != 1:
            msg = f"Could not find class derivation for {target_class_name} (results={len(matching_tgt_class_derivs)})"
            raise ValueError(msg)
        cd = matching_tgt_class_derivs[0]
        ancmap = self._class_derivation_ancestors(cd)
        if ancmap:
            cd = deepcopy(cd)
            for ancestor in ancmap.values():
                for k, v in ancestor.__dict__.items():
                    if v is not None and v != []:
                        curr_v = getattr(cd, k, None)
                        if isinstance(curr_v, list):
                            curr_v.extend(v)
                        elif isinstance(curr_v, dict):
                            curr_v.update({**v, **curr_v})
                        elif curr_v is None:
                            setattr(cd, k, v)
        return cd

    def _find_class_derivation_by_name(self, name: str) -> ClassDerivation:
        """Look up a class derivation by name from the specification.

        Returns the first match when multiple derivations share the same name.
        """
        for cd in self.specification.class_derivations:
            if cd.name == name:
                return cd
        msg = f"No class derivation named '{name}'"
        raise KeyError(msg)

    def _class_derivation_ancestors(self, cd: ClassDerivation) -> dict[str, ClassDerivation]:
        """
        Return a map of all class derivations that are ancestors of the given class derivation.

        :param cd:
        :return:
        """
        ancestors = {}
        parents = cd.mixins + ([cd.is_a] if cd.is_a else [])
        for parent in parents:
            parent_cd = self._find_class_derivation_by_name(parent)
            ancestors[parent] = parent_cd
            ancestors.update(self._class_derivation_ancestors(parent_cd))
        return ancestors

    def _get_enum_derivation(self, target_enum_name: str) -> EnumDerivation:
        spec = self.derived_specification
        matching_tgt_enum_derivs = [
            deriv
            for deriv in spec.enum_derivations.values()
            if deriv.populated_from == target_enum_name or (not deriv.populated_from and target_enum_name == deriv.name)
        ]
        logger.debug(f"Target enum derivations={matching_tgt_enum_derivs}")
        if len(matching_tgt_enum_derivs) != 1:
            msg = f"Could not find what to derive from a source {target_enum_name}"
            raise ValueError(msg)
        return matching_tgt_enum_derivs[0]

    def _is_coerce_to_multivalued(self, slot_derivation: SlotDerivation, class_derivation: ClassDerivation) -> bool:
        cast_as = slot_derivation.cast_collection_as
        if cast_as and cast_as in [
            CollectionType.MultiValued,
            CollectionType.MultiValuedDict,
            CollectionType.MultiValuedDict,
        ]:
            return True
        if slot_derivation.stringification and slot_derivation.stringification.reversed:
            return True
        sv = self.target_schemaview
        if sv:
            slot = sv.induced_slot(slot_derivation.name, class_derivation.name)
            if slot.multivalued:
                return True
        return False

    def _is_coerce_to_singlevalued(self, slot_derivation: SlotDerivation, class_derivation: ClassDerivation) -> bool:
        cast_as = slot_derivation.cast_collection_as
        if cast_as and cast_as == CollectionType(CollectionType.SingleValued):
            return True
        if slot_derivation.stringification and not slot_derivation.stringification.reversed:
            return True
        sv = self.target_schemaview
        if sv:
            slot = sv.induced_slot(slot_derivation.name, class_derivation.name)
            if not slot.multivalued:
                return True
        return False

    def _coerce_datatype(self, v: Any, target_range: str | None) -> Any:
        if target_range is None:
            return v
        if isinstance(v, list):
            return [self._coerce_datatype(v1, target_range) for v1 in v]
        if isinstance(v, dict):
            return {k: self._coerce_datatype(v1, target_range) for k, v1 in v.items()}
        cmap = {
            "integer": int,
            "float": float,
            "string": str,
            "boolean": bool,
        }
        cls = cmap.get(target_range)
        if not cls:
            logger.warning(f"Unknown target range {target_range}")
            return v
        if isinstance(v, cls):
            return v
        return cls(v)

    @property
    def curie_converter(self) -> Converter:
        if not self._curie_converter:
            self._curie_converter = Converter([])
            for prefix in self.source_schemaview.schema.prefixes.values():
                self._curie_converter.add_prefix(prefix.prefix_prefix, prefix.prefix_reference)
            for prefix in self.specification.prefixes.values():
                self._curie_converter.add_prefix(prefix.key, prefix.value)
        return self._curie_converter

    def expand_curie(self, curie: str) -> str:
        return self.curie_converter.expand(curie)

    def compress_uri(self, uri: str) -> str:
        return self.curie_converter.compress(uri)

derived_specification property

Return the specification with schema-inferred defaults filled in.

Creates a deep copy of self.specification, applies any source schema patches, then calls induce_missing_values to fill in populated_from, range, and other fields that require knowledge of the source schema. The result is cached for subsequent access.

This is the second phase of normalization — see the class docstring for the full two-phase pipeline.

source_schemaview = None class-attribute instance-attribute

A view over the schema describing the input/source object.

specification = None class-attribute instance-attribute

A specification of how to generate target objects from source objects.

strict = field(default=False) class-attribute instance-attribute

Raise on expression references that do not resolve to a schema slot.

When False (the default), unresolved names emit a warning and the expression evaluator returns None (preserving SQL-style null propagation for legitimate empty values but losing the signal for typos). When True, unresolved names raise :class:~linkml_map.transformer.errors.TransformationError, which surfaces typos, stale references, and wrong-table accesses that would otherwise produce silent nulls in the output.

target_schemaview = None class-attribute instance-attribute

A view over the schema describing the output/target object.

unrestricted_eval = field(default=False) class-attribute instance-attribute

Set to True to allow arbitrary evals as part of transformation.

create_transformer_specification(obj)

Create specification from a dict.

TODO: this will no longer be necessary when pydantic supports inlined as dict

:param path: :return:

Source code in src/linkml_map/transformer/transformer.py
299
300
301
302
303
304
305
306
307
308
309
def create_transformer_specification(self, obj: dict[str, Any]) -> None:
    """
    Create specification from a dict.

    TODO: this will no longer be necessary when pydantic supports inlined as dict

    :param path:
    :return:
    """
    self._normalize_spec_dict(obj)
    self.specification = TransformationSpecification(**obj)

load_source_schema(path)

Set source_schemaview from a schema path.

:param path:

Source code in src/linkml_map/transformer/transformer.py
119
120
121
122
123
124
125
126
127
def load_source_schema(self, path: str | Path | dict) -> None:
    """
    Set source_schemaview from a schema path.

    :param path:
    """
    if isinstance(path, Path):
        path = str(path)
    self.source_schemaview = SchemaView(path)

load_transformer_specification(path)

Set specification from a schema path.

:param path: :return:

Source code in src/linkml_map/transformer/transformer.py
129
130
131
132
133
134
135
136
137
138
139
def load_transformer_specification(self, path: str | Path) -> None:
    """
    Set specification from a schema path.

    :param path:
    :return:
    """
    with open(path) as f:
        obj = yaml.safe_load(f)
        self._normalize_spec_dict(obj)
        self.specification = TransformationSpecification(**obj)

load_transformer_specifications(paths)

Load and merge multiple transformation spec files into a single specification.

Accepts file paths and/or directories. Directories are recursively searched for YAML files. All specs are merged (class_derivations appended, enum/slot_derivations unioned by name) and the result is set as self.specification.

:param paths: One or more file or directory paths.

Source code in src/linkml_map/transformer/transformer.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def load_transformer_specifications(self, paths: tuple[str | Path, ...]) -> None:
    """Load and merge multiple transformation spec files into a single specification.

    Accepts file paths and/or directories.  Directories are recursively
    searched for YAML files.  All specs are merged (class_derivations
    appended, enum/slot_derivations unioned by name) and the result is
    set as ``self.specification``.

    :param paths: One or more file or directory paths.
    """
    from linkml_map.utils.spec_merge import load_and_merge_specs

    obj = load_and_merge_specs(paths)
    self._normalize_spec_dict(obj)
    self.specification = TransformationSpecification(**obj)

map_database(source_database, target_database=None, **kwargs)

Transform source resource.

:param source_database: :param target_database: :param kwargs: :return:

Source code in src/linkml_map/transformer/transformer.py
106
107
108
109
110
111
112
113
114
115
116
117
def map_database(
    self, source_database: Any, target_database: Any | None = None, **kwargs: dict[str, Any]
) -> OBJECT_TYPE:
    """
    Transform source resource.

    :param source_database:
    :param target_database:
    :param kwargs:
    :return:
    """
    raise NotImplementedError

map_object(obj, source_type=None, **kwargs)

Transform source object into an instance of the target class.

:param obj: :param source_type: :return:

Source code in src/linkml_map/transformer/transformer.py
 96
 97
 98
 99
100
101
102
103
104
def map_object(self, obj: OBJECT_TYPE, source_type: str | None = None, **kwargs: Any) -> OBJECT_TYPE:
    """
    Transform source object into an instance of the target class.

    :param obj:
    :param source_type:
    :return:
    """
    raise NotImplementedError

normalize_transform_spec(obj, normalizer) classmethod

Recursively normalize class_derivations and flatten object_derivations.

Source code in src/linkml_map/transformer/transformer.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@classmethod
def normalize_transform_spec(cls, obj: dict[str, Any], normalizer: ReferenceValidator) -> dict:
    """Recursively normalize class_derivations and flatten object_derivations."""
    obj = normalizer.normalize(obj)

    class_derivations = obj.get("class_derivations", [])
    if isinstance(class_derivations, dict):
        cd_iter = class_derivations.values()
    else:
        cd_iter = class_derivations
    for class_spec in cd_iter:
        if not isinstance(class_spec, dict):
            continue
        parent_source = class_spec.get("populated_from")
        slot_derivations = class_spec.get("slot_derivations", {})
        for slot_name, slot_spec in slot_derivations.items():
            if slot_spec.get("value") is not None and slot_spec.get("range") is None:
                slot_spec["range"] = "string"
            cls._normalize_slot_class_derivations(slot_name, slot_spec, normalizer, parent_source)
    return obj

ObjectTransformer

Bases: Transformer

A Transformer that works on in-memory dict objects.

This works recursively

Source code in src/linkml_map/transformer/object_transformer.py
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
@dataclass
class ObjectTransformer(Transformer):
    """
    A Transformer that works on in-memory dict objects.

    This works recursively
    """

    object_index: ObjectIndex = None
    lookup_index: Any = None  # Optional[LookupIndex] — lazy import to avoid hard duckdb dep

    _warned_unbound_names: set[str] = field(default_factory=set, repr=False)
    """Names already warned about in non-strict mode.

    Shared across all expression evaluations on this transformer
    instance — each unique unbound reference logs once, not once per
    row. The set is never cleared automatically, so a transformer
    reused across multiple logical runs will not re-warn for the same
    typo. This is the intended behavior: if a spec has a typo, one
    warning per typo across the lifetime of the transformer is enough.
    Construct a fresh ``ObjectTransformer`` if you want a clean slate.
    """

    def index(self, source_obj: Any, target: str | None = None) -> None:
        """
        Create an index over a container object.

        :param source_obj: source data structure to be indexed
        :param target: class to convert source object into
        """
        if isinstance(source_obj, dict):
            if target is None:
                [target] = [c.name for c in self.source_schemaview.all_classes().values() if c.tree_root]
            if target is None:
                msg = f"target must be passed if source_obj is dict: {source_obj}"
                raise ValueError(msg)
            source_obj_typed = dynamic_object(source_obj, self.source_schemaview, target)
            self.object_index = ObjectIndex(source_obj_typed, schemaview=self.source_schemaview)
        else:
            self.object_index = ObjectIndex(source_obj, schemaview=self.source_schemaview)

    def _resolve_source_type(self, source_type: str | None, sv: SchemaView | None) -> str | None:
        """
        Resolve the source type when not explicitly provided.

        :param source_type: Explicitly provided source type, or None.
        :param sv: Source schema view, may be None.
        :return: Resolved source type name.
        """
        if source_type is None and sv is None:
            # TODO: use smarter method
            if self.specification.class_derivations:
                source_type = self.specification.class_derivations[0].name
            else:
                msg = (
                    "Cannot resolve source type: no source_type provided, "
                    "no SchemaView available, and specification has no class_derivations"
                )
                raise ValueError(msg)
        if source_type is None and sv is not None:
            source_types = [c.name for c in sv.all_classes().values() if c.tree_root]
            if len(source_types) == 1:
                source_type = source_types[0]
            elif len(source_types) > 1:
                msg = "No source type specified and multiple root classes found"
                raise ValueError(msg)
            elif len(source_types) == 0:
                if len(sv.all_classes()) == 1:
                    source_type = next(iter(sv.all_classes().keys()))
                else:
                    msg = "No source type specified and no root classes found"
                    raise ValueError(msg)
        return source_type

    def map_object(
        self,
        source_obj: OBJECT_TYPE,
        source_type: str | None = None,
        target_type: str | None = None,
        class_derivation: ClassDerivation | None = None,
    ) -> DICT_OBJ | Any:
        """
        Transform a source object into a target object.

        Slot derivations are evaluated in declaration order. Expressions
        may call ``slot('name')`` to reference previously computed values.
        Slots with ``hide: true`` are computed (so downstream ``slot()``
        calls can reference them) but excluded from the returned dict.

        :param source_obj: source data structure
        :param source_type: source_obj instantiates this (may be class, type, or enum)
        :param target_type: target_obj instantiates this (may be class, type, or enum)
        :return: transformed data, either as type target_type or a dictionary
        """
        sv = self.source_schemaview
        source_type = self._resolve_source_type(source_type, sv)

        if source_type in sv.all_types():
            if target_type:
                if target_type == "string":
                    return str(source_obj)
                if target_type == "integer":
                    return int(source_obj)
                if target_type in {"float", "double"}:
                    return float(source_obj)
                if target_type == "uri":
                    return self.expand_curie(source_obj)
                if target_type == "curie":
                    return self.compress_uri(source_obj)
            return source_obj
        if source_type in sv.all_enums():
            return self.transform_enum(source_obj, [source_type], source_obj)

        source_obj_typed = None
        if isinstance(source_obj, BaseModel | YAMLRoot):
            # ensure dict
            source_obj_typed = source_obj
            source_obj = vars(source_obj)
        if not isinstance(source_obj, dict):
            logger.warning(f"Unexpected: {source_obj} for type {source_type}")
            return source_obj
        class_deriv = class_derivation or self._get_class_derivation(source_type)

        # Handle class-level pivot operations (UNMELT from EAV to wide format)
        if class_deriv.pivot_operation:
            return self._perform_pivot_operation(class_deriv.pivot_operation, source_obj, class_deriv, sv, source_type)

        context = DerivationContext(
            source_obj=source_obj,
            source_obj_typed=source_obj_typed,
            source_type=source_type,
            sv=sv,
            class_deriv=class_deriv,
        )
        tgt_attrs = {}
        bindings = Bindings.from_context(self, context)
        expr_functions = {"slot": lambda name: tgt_attrs.get(name)}
        for slot_deriv in class_deriv.slot_derivations.values():
            with self._slot_error_context(slot_deriv, context):
                tgt_attrs[str(slot_deriv.name)] = self._derive_slot(
                    slot_deriv, context, target_type, bindings, expr_functions
                )
        # Remove hidden slots from output (they exist only for slot() references)
        for slot_deriv in class_deriv.slot_derivations.values():
            if slot_deriv.hide:
                tgt_attrs.pop(str(slot_deriv.name), None)
        return tgt_attrs

    @contextmanager
    def _slot_error_context(
        self,
        slot_derivation: SlotDerivation,
        context: DerivationContext,
    ) -> Iterator[None]:
        """Wrap slot derivation in error enrichment.

        Re-raises ``TransformationError`` unchanged; wraps all other exceptions
        with derivation context so callers get actionable diagnostics.
        """
        try:
            yield
        except TransformationError:
            raise
        except Exception as exc:
            raise TransformationError(
                message=str(exc),
                class_derivation_name=context.class_deriv.name,
                class_populated_from=context.class_deriv.populated_from,
                slot_derivation_name=slot_derivation.name,
                slot_populated_from=slot_derivation.populated_from,
                source_row=context.source_obj,
                cause=exc,
            ) from exc

    def _derive_slot(
        self,
        slot_derivation: SlotDerivation,
        context: DerivationContext,
        target_type: str | None,
        bindings: Bindings,
        expr_functions: dict[str, Any] | None = None,
    ) -> Any:
        """Derive a single target slot value from the source object.

        Dispatches on the slot derivation type (literal value, expression,
        populated_from, etc.) and applies post-processing (range mapping,
        cardinality coercion, datatype coercion, reshaping).

        Slot derivations are evaluated in declaration order. Expressions
        can reference previously computed slots via ``slot('name')``.

        :param slot_derivation: The slot derivation spec to apply.
        :param context: Current derivation context.
        :param target_type: Target class name (needed for nested object derivations).
        :param bindings: Bindings instance for expression evaluation.
        :param expr_functions: Extra functions for expression evaluation (e.g. ``slot``).
        :returns: The derived value for this slot.
        """
        v = None
        source_class_slot = None
        if slot_derivation.value is not None:
            v = slot_derivation.value
        elif slot_derivation.unit_conversion:
            v = self._perform_unit_conversion(slot_derivation, context)
        elif slot_derivation.pivot_operation:
            # MELT operation: wide format to EAV/long format
            v = self._perform_melt(slot_derivation.pivot_operation, context.source_obj, slot_derivation)
        elif slot_derivation.expr:
            v = self._eval_expr(slot_derivation.expr, bindings, functions=expr_functions)
        elif slot_derivation.populated_from:
            populated_from = slot_derivation.populated_from

            if "." in populated_from:
                table_name, field_path = populated_from.split(".", 1)
                if context.class_deriv.joins and table_name in context.class_deriv.joins:
                    (v, source_class_slot) = self._perform_join_resolution(table_name, field_path, context)
                elif isinstance(context.source_obj, MergedRow) and table_name in context.source_obj.rows_by_table:
                    v = context.source_obj.rows_by_table[table_name].get(field_path)
                    source_class_slot = (
                        context.sv.induced_slot(field_path, table_name)
                        if table_name in context.sv.all_classes()
                        else None
                    )
                elif table_name == context.source_type:
                    v = context.source_obj.get(field_path)
                    if v is _AMBIGUOUS:
                        _raise_ambiguous_column(
                            field_path,
                            class_deriv=context.class_deriv,
                            slot_derivation=slot_derivation,
                        )
                    source_class_slot = context.sv.induced_slot(field_path, context.source_type)
                else:
                    (v, source_class_slot) = self._resolve_fk_or_literal(
                        populated_from,
                        slot_derivation,
                        context,
                        require_fk=True,
                    )
            else:
                (v, source_class_slot) = self._resolve_fk_or_literal(populated_from, slot_derivation, context)

            if (slot_derivation.value_mappings or slot_derivation.expression_mappings) and v is not None:
                v = self._apply_mappings(slot_derivation, v, bindings, functions=expr_functions)

            if slot_derivation.offset and v is not None:
                v = self._apply_offset(v, slot_derivation, context.source_obj)

            logger.debug(
                f"Pop slot {slot_derivation.name} => {v} using {slot_derivation.populated_from} // {context.source_obj}"
            )
        elif slot_derivation.sources:
            (v, source_class_slot) = self._resolve_sources(slot_derivation, context)
        elif slot_derivation.class_derivations:
            v = self._derive_nested_objects(slot_derivation, context.source_obj, target_type, context.class_deriv)
        else:
            source_class_slot = context.sv.induced_slot(slot_derivation.name, context.source_type)
            v = context.source_obj.get(slot_derivation.name, None)
            if v is _AMBIGUOUS:
                _raise_ambiguous_column(
                    slot_derivation.name,
                    class_deriv=context.class_deriv,
                    slot_derivation=slot_derivation,
                )

        if source_class_slot and v is not None and not slot_derivation.hide:
            target_range = slot_derivation.range
            v = self._map_value_by_range(v, source_class_slot, target_range, context.source_obj)
            v = self._coerce_cardinality(v, slot_derivation, context.class_deriv)
            v = self._coerce_datatype(v, target_range)
            v = self._reshape_collection(v, slot_derivation, source_class_slot)
        return v

    def _eval_expr(
        self,
        expr: str,
        bindings: Bindings,
        functions: dict[str, Any] | None = None,
    ) -> Any:
        """Evaluate an expression string against bindings.

        Uses the restricted evaluator by default, with fallback to asteval
        when ``unrestricted_eval`` is enabled on the transformer.

        :param expr: The expression string to evaluate.
        :param bindings: Variable bindings for the expression.
        :param functions: Extra functions injected into the evaluator
            (e.g. ``slot`` for referencing previously derived target slots).
        """
        try:
            return eval_expr_with_mapping(
                expr,
                bindings,
                functions=functions,
                strict=self.strict,
                warned_unbound=self._warned_unbound_names,
            )
        except (InvalidExpression, TypeError, ValueError):
            if not self.unrestricted_eval:
                raise
            ctxt_obj, _ = bindings.get_ctxt_obj_and_dict()
            usersyms = {"src": ctxt_obj, "target": None, "uuid5": _uuid5}
            if functions:
                usersyms.update(functions)
            aeval = Interpreter(usersyms=usersyms)
            aeval(expr)
            return aeval.symtable["target"]

    def _apply_mappings(
        self,
        slot_derivation: SlotDerivation,
        v: Any,
        bindings: Bindings,
        functions: dict[str, Any] | None = None,
    ) -> Any:
        """Look up a value in value_mappings then expression_mappings.

        Checks ``value_mappings`` first (literal result). On miss, falls through
        to ``expression_mappings`` (evaluated against *bindings*). Returns
        ``None`` if neither table contains the key.
        """
        str_v = str(v)
        vm_hit = slot_derivation.value_mappings.get(str_v) if slot_derivation.value_mappings else None
        if vm_hit is not None:
            return vm_hit.value
        if slot_derivation.expression_mappings:
            em_hit = slot_derivation.expression_mappings.get(str_v)
            if em_hit is not None:
                return self._eval_expr(em_hit.value, bindings, functions=functions)
        return None

    def _perform_fk_resolution(
        self,
        fk_resolution: FKResolution,
        slot_derivation: SlotDerivation,
        fk_value: Any,
    ) -> tuple[Any, SlotDefinition | None]:
        """Resolve a foreign key value through the object index and walk the remaining path."""
        if fk_value is not None and self.object_index:
            cache_key = (fk_resolution.target_class, str(fk_value))
            referenced_obj = self.object_index._source_object_cache.get(cache_key)

            if referenced_obj:
                v = referenced_obj
                for attr in fk_resolution.remaining_path.split("."):
                    if isinstance(v, dict):
                        v = v.get(attr)
                    elif v is not None:
                        v = getattr(v, attr, None)
                    if v is None:
                        break
            else:
                v = None
                logger.debug(f"FK reference not found for {slot_derivation.name}")
        else:
            v = None
            if fk_value is not None and not self.object_index:
                logger.warning(
                    "Cross-class lookup requires object_index. Call transformer.index(container_data) first."
                )

        source_class_slot = fk_resolution.final_slot
        return v, source_class_slot

    def _resolve_fk_or_literal(
        self,
        populated_from: str,
        slot_derivation: SlotDerivation,
        context: DerivationContext,
        *,
        require_fk: bool = False,
    ) -> tuple[Any, SlotDefinition | None]:
        """Resolve a populated_from value via FK path or direct field lookup.

        :param populated_from: The populated_from string (may contain dots for FK paths).
        :param slot_derivation: The active slot derivation.
        :param context: Current derivation context.
        :param require_fk: If True, raise ValueError when no FK path is found
            (used for dot-notation without a matching join).
        :returns: Tuple of (resolved value, source slot definition or None).
        """
        fk_resolution = resolve_fk_path(context.sv, context.source_type, populated_from)
        if fk_resolution:
            fk_value = context.source_obj.get(fk_resolution.fk_slot_name)
            return self._perform_fk_resolution(fk_resolution, slot_derivation, fk_value)
        if require_fk:
            msg = (
                f"Dot-notation '{populated_from}' in populated_from "
                f"requires a matching join spec or FK path, but neither was found"
            )
            raise ValueError(msg)
        v = context.source_obj.get(populated_from, None)
        if v is _AMBIGUOUS:
            _raise_ambiguous_column(
                populated_from,
                class_deriv=context.class_deriv,
                slot_derivation=slot_derivation,
            )
        source_class_slot = context.sv.induced_slot(populated_from, context.source_type)
        return v, source_class_slot

    def _resolve_joined_row(
        self,
        table_name: str,
        source_obj: DICT_OBJ,
        class_deriv: ClassDerivation,
    ) -> dict | None:
        """Resolve a row from a joined table using LookupIndex.

        This is the single source of truth for cross-table join resolution.
        Both ``Bindings._resolve_join`` (for ``expr:``) and
        ``_perform_join_resolution`` (for ``populated_from:``) delegate here.

        :param table_name: Join name (key in ``class_deriv.joins``).
        :param source_obj: Current primary-table row.
        :param class_deriv: The active ClassDerivation (carries join specs).
        :returns: Matched row as a dict, or ``None`` if no match found.
        :raises ValueError: If join spec is missing keys or lookup_index is not initialized.
        """
        spec = class_deriv.joins[table_name]
        source_key = spec.source_key or spec.join_on
        lookup_key = spec.lookup_key or spec.join_on
        if not source_key or not lookup_key:
            msg = f"Join spec for {table_name!r} must specify 'join_on' or both 'source_key' and 'lookup_key'"
            raise ValueError(msg)
        key_val = source_obj.get(source_key)
        if key_val is None:
            return None
        if self.lookup_index is None:
            msg = f"Join configured for {table_name!r} but lookup_index has not been initialized"
            raise ValueError(msg)
        return self.lookup_index.lookup_row(table_name, lookup_key, key_val)

    def _perform_join_resolution(
        self,
        table_name: str,
        field_path: str,
        context: DerivationContext,
    ) -> tuple[Any, SlotDefinition | None]:
        """Resolve a slot value via cross-table join lookup.

        :param table_name: Join name (key in ``class_deriv.joins``).
        :param field_path: Column name within the joined table.
        :param context: Current derivation context.
        :returns: Tuple of (resolved value, source slot definition or None).
        """
        row = self._resolve_joined_row(table_name, context.source_obj, context.class_deriv)
        v = row.get(field_path) if row else None
        joined_class = context.class_deriv.joins[table_name].class_named or table_name
        source_class_slot = None
        if joined_class in context.sv.all_classes():
            if field_path in context.sv.class_induced_slots(joined_class):
                source_class_slot = context.sv.induced_slot(field_path, joined_class)
        return v, source_class_slot

    @staticmethod
    def _merge_rows(
        parent_row: DICT_OBJ,
        joined_row: DICT_OBJ,
        join_key: str,
        parent_source: str,
        nested_source: str,
    ) -> MergedRow:
        """Merge parent and joined rows, marking ambiguous columns.

        Columns present in both rows (except the join key) are set to the
        ``_AMBIGUOUS`` sentinel so that slot resolution raises a clear error
        instead of silently picking one. The original rows are preserved in
        the returned :class:`MergedRow` for dot-notation disambiguation.

        :param parent_row: Row from the parent table.
        :param joined_row: Row from the joined table.
        :param join_key: The column used to join (kept from parent).
        :param parent_source: Parent table name (for logging).
        :param nested_source: Joined table name (for logging).
        :returns: MergedRow with ambiguous markers and original rows.
        """
        ambiguous = {k for k in parent_row if k in joined_row and k != join_key}
        merged = {}
        for k, v in parent_row.items():
            merged[k] = _AMBIGUOUS if k in ambiguous else v
        for k, v in joined_row.items():
            if k not in merged:
                merged[k] = v

        if ambiguous:
            logger.debug(
                "Ambiguous columns %s shared between %s and %s — dot notation required",
                sorted(ambiguous),
                parent_source,
                nested_source,
            )

        return MergedRow(merged, rows_by_table={parent_source: parent_row, nested_source: joined_row})

    def _apply_offset(self, value: Any, slot_derivation: SlotDerivation, source_obj: DICT_OBJ) -> Any:
        """Apply an offset calculation using a value from another source field."""
        off = slot_derivation.offset
        off_field_val = source_obj.get(off.offset_field)

        if off_field_val is None:
            logger.debug(
                f"Offset field '{off.offset_field}' not found in source object; "
                f"skipping offset for '{slot_derivation.name}'"
            )
            return value

        delta = off.offset_value * off_field_val
        result = value - delta if off.offset_reverse else value + delta
        logger.debug(
            f"Offset for '{slot_derivation.name}': "
            f"{value} {'-' if off.offset_reverse else '+'} "
            f"({off.offset_value} * {off_field_val}) = {result}"
        )
        return result

    def _resolve_sources(
        self, slot_derivation: SlotDerivation, context: DerivationContext
    ) -> tuple[Any, SlotDefinition | None]:
        """Resolve a slot value from multiple candidate source slots (first available wins)."""
        vmap = {s: context.source_obj.get(s, None) for s in slot_derivation.sources}
        vmap = {k: v for k, v in vmap.items() if v is not None}
        if len(vmap.keys()) > 1:
            msg = f"Multiple sources for {slot_derivation.name}: {vmap}"
            raise ValueError(msg)
        if len(vmap.keys()) == 1:
            v = next(iter(vmap.values()))
            source_class_slot_name = next(iter(vmap.keys()))
            source_class_slot = context.sv.induced_slot(source_class_slot_name, context.source_type)
        else:
            v = None
            source_class_slot = None

        logger.debug(
            f"Pop slot {slot_derivation.name} => {v} using {slot_derivation.populated_from} // {context.source_obj}"
        )
        return v, source_class_slot

    def _derive_nested_objects(
        self,
        slot_derivation: SlotDerivation,
        source_obj: DICT_OBJ,
        target_type: str,
        parent_class_deriv: ClassDerivation,
    ) -> Any:
        """Build nested objects from slot-level class_derivation declarations.

        When a nested class_derivation has a different ``populated_from`` than
        its parent and the parent has a matching join spec (explicit or
        synthesized during normalization), this method resolves the joined row
        and merges it with the parent row before passing it to ``map_object``.

        :param slot_derivation: The slot derivation that declares nested class_derivations.
        :param source_obj: The current (parent) source row.
        :param target_type: Target class name for cardinality decisions.
        :param parent_class_deriv: The parent's ClassDerivation (carries join specs).
        """
        derived_objs = []
        parent_source = parent_class_deriv.populated_from or parent_class_deriv.name

        for cls_derivation in slot_derivation.class_derivations:
            nested_source = cls_derivation.populated_from
            effective_obj = source_obj

            if nested_source and nested_source != parent_source:
                has_join = parent_class_deriv.joins and nested_source in parent_class_deriv.joins
                if has_join:
                    # Join spec exists (explicit or synthesized) — resolve and merge
                    joined_row = self._resolve_joined_row(nested_source, source_obj, parent_class_deriv)
                    if joined_row is not None:
                        join_spec = parent_class_deriv.joins[nested_source]
                        join_key = join_spec.join_on or join_spec.source_key or ""
                        effective_obj = self._merge_rows(
                            source_obj,
                            joined_row,
                            join_key,
                            parent_source,
                            nested_source,
                        )
                else:
                    # No join spec for this nested source — cross-table reference can't be resolved.
                    # Re-derive the candidate set so the diagnostic tells the user *why* synthesis
                    # failed (no overlap vs. multiple non-identifier candidates), recovering the
                    # detail lost when the per-row resolution path was consolidated into
                    # normalization-time synthesis.
                    from linkml_map.utils.join_utils import find_common_columns

                    common = find_common_columns(self.source_schemaview, parent_source, nested_source)
                    if not common:
                        reason = f"no columns are shared between {parent_source!r} and {nested_source!r}"
                    else:
                        candidates = ", ".join(sorted(common))
                        reason = (
                            f"multiple candidate join columns are shared between "
                            f"{parent_source!r} and {nested_source!r} ({candidates}); "
                            f"specify which to use"
                        )
                    raise TransformationError(
                        message=(
                            f"Nested class {cls_derivation.name!r} has "
                            f"populated_from={nested_source!r} which differs from parent "
                            f"populated_from={parent_source!r}, but no implicit join could be "
                            f"synthesized: {reason}. Add an explicit joins: block."
                        ),
                        class_derivation_name=cls_derivation.name,
                        class_populated_from=nested_source,
                    )

            nested_result = self.map_object(
                effective_obj,
                source_type=cls_derivation.populated_from,
                target_type=cls_derivation.name,
                class_derivation=cls_derivation,
            )
            derived_objs.append(nested_result)

        # If the slot is multivalued, we assign the whole list
        # Otherwise, just assign the first (for now; error/warning later if >1)
        target_class_slot = self.target_schemaview.induced_slot(slot_derivation.name, target_type)
        if target_class_slot.multivalued:
            v = derived_objs
        else:
            v = derived_objs[0] if derived_objs else None
        return v

    def _map_value_by_range(
        self,
        v: Any,
        source_class_slot: SlotDefinition,
        target_range: str | None,
        source_obj: DICT_OBJ,
    ) -> Any:
        """Recursively map nested values based on the source slot's range type."""
        source_class_slot_range = source_class_slot.range
        sv = self.source_schemaview

        # Check for enums defined via any_of when the range is None or "Any"
        if source_class_slot_range is None or source_class_slot_range == "Any":
            any_of_enums = self._get_any_of_enum_names(source_class_slot, sv)
            if any_of_enums:
                if source_class_slot.multivalued and isinstance(v, list):
                    return [self.transform_enum(v1, any_of_enums, source_obj) for v1 in v]
                return self.transform_enum(v, any_of_enums, source_obj)
            # No range and no any_of enums: nothing to recurse into for scalars
            if not isinstance(v, dict | list):
                return v
            if isinstance(v, list) and all(not isinstance(v1, dict | list) for v1 in v):
                return v

        if source_class_slot.multivalued:
            if isinstance(v, list):
                return [self.map_object(v1, source_class_slot_range, target_range) for v1 in v]
            elif isinstance(v, dict):
                return {k1: self.map_object(v1, source_class_slot_range, target_range) for k1, v1 in v.items()}
            else:
                return [self.map_object(v, source_class_slot_range, target_range)]
        else:
            return self.map_object(v, source_class_slot_range, target_range)

    def _coerce_cardinality(self, v: Any, slot_derivation: SlotDerivation, class_derivation: ClassDerivation) -> Any:
        """Coerce between single-valued and multi-valued based on target schema and spec."""
        if (
            self._is_coerce_to_multivalued(slot_derivation, class_derivation)
            and v is not None
            and not isinstance(v, list)
        ):
            return self._singlevalued_to_multivalued(v, slot_derivation)
        elif self._is_coerce_to_singlevalued(slot_derivation, class_derivation) and isinstance(v, list):
            return self._multivalued_to_singlevalued(v, slot_derivation)
        return v

    def _reshape_collection(self, v: Any, slot_derivation: SlotDerivation, source_class_slot: SlotDefinition) -> Any:
        """Reshape between list and compact-dict collection formats."""
        if slot_derivation.dictionary_key and isinstance(v, list):
            # List to CompactDict
            v = {v1[slot_derivation.dictionary_key]: v1 for v1 in v}
            for v1 in v.values():
                del v1[slot_derivation.dictionary_key]
            return v
        elif (
            slot_derivation.cast_collection_as
            and slot_derivation.cast_collection_as == CollectionType.MultiValuedList
            and isinstance(v, dict)
        ):
            # CompactDict to List
            src_rng = source_class_slot.range
            src_rng_id_slot = self.source_schemaview.get_identifier_slot(src_rng, use_key=True)
            if src_rng_id_slot:
                return [{**v1, src_rng_id_slot.name: k} for k, v1 in v.items()]
            else:
                return list(v.values())
        return v

    @staticmethod
    def _get_any_of_enum_names(slot: Any, sv: SchemaView) -> list[str]:
        """Extract enum names from a slot's any_of constraints.

        :param slot: An induced slot definition (from SchemaView.induced_slot).
        :param sv: Source schema view.
        :return: List of enum names found in any_of, empty if none.
        """
        if not hasattr(slot, "any_of") or not slot.any_of:
            return []
        all_enums = sv.all_enums()
        return [ao.range for ao in slot.any_of if ao.range in all_enums]

    def _perform_unit_conversion(
        self,
        slot_derivation: SlotDerivation,
        context: DerivationContext,
    ) -> float | dict | None:
        """Perform unit conversion for a slot derivation."""
        uc = slot_derivation.unit_conversion
        curr_v = context.source_obj.get(slot_derivation.populated_from, None)

        if curr_v is None:
            logger.debug(f"No value found for slot '{slot_derivation.populated_from}'; skipping conversion")
            return None

        slot = context.sv.induced_slot(slot_derivation.populated_from, context.source_type)
        schema_unit = None
        from_unit = None
        system = UnitSystem.UCUM

        if slot.unit:
            if slot.unit.ucum_code:
                schema_unit = slot.unit.ucum_code
            elif slot.unit.iec61360code:
                schema_unit = slot.unit.iec61360code
                system = UnitSystem.IEC61360
            elif slot.unit.symbol:
                schema_unit = slot.unit.symbol
                system = None
            elif slot.unit.abbreviation:
                schema_unit = slot.unit.abbreviation
                system = None
            elif slot.unit.descriptive_name:
                schema_unit = slot.unit.descriptive_name
                system = None
            else:
                raise NotImplementedError(
                    f"Cannot determine unit system for slot '{slot.name}' — all unit fields are None"
                )

        spec_unit = uc.source_unit if uc.source_unit else None

        if schema_unit and spec_unit:
            if schema_unit != spec_unit:
                raise ValueError(
                    f"Mismatch in source units for slot '{slot_derivation.populated_from}': "
                    f"schema unit '{schema_unit}' vs. transformation spec '{spec_unit}'"
                )
            from_unit = schema_unit
        elif schema_unit:
            from_unit = schema_unit
        elif spec_unit:
            from_unit = spec_unit
        else:
            if uc.source_unit_slot:
                from_unit = None
            else:
                slot_name = slot_derivation.populated_from
                raise ValueError(f"No source unit provided in schema or transformation spec for slot '{slot_name}'")

        if uc.source_unit_slot:
            # Structured input, e.g., {"value": 120, "unit": "cm"}
            from_unit_val = curr_v.get(uc.source_unit_slot)
            if from_unit_val:
                if from_unit and from_unit_val != from_unit:
                    slot_name = slot_derivation.populated_from
                    raise ValueError(
                        f"Value unit '{from_unit_val}' does not match expected '{from_unit}' for slot '{slot_name}'"
                    )
                from_unit = from_unit_val
            else:
                raise ValueError(
                    f"Missing unit in structured value for slot '{slot_derivation.populated_from}': {curr_v}"
                )

            magnitude = curr_v.get(uc.source_magnitude_slot)
            if magnitude is None:
                raise ValueError(
                    f"Missing magnitude in structured value for slot '{slot_derivation.populated_from}': {curr_v}"
                )
        else:
            magnitude = curr_v

        try:
            magnitude = float(magnitude)
        except (TypeError, ValueError):
            if uc.none_if_non_numeric:
                return None
            raise

        to_unit = uc.target_unit or from_unit
        if from_unit == to_unit:
            result = magnitude
        else:
            result = convert_units(magnitude, from_unit=from_unit, to_unit=to_unit, system=system)

        if uc.target_magnitude_slot:
            return {uc.target_magnitude_slot: result, uc.target_unit_slot: to_unit}
        return result

    def _multivalued_to_singlevalued(self, vs: list[Any], slot_derivation: SlotDerivation) -> Any:
        if slot_derivation.stringification:
            stringification = slot_derivation.stringification
            delimiter = stringification.delimiter
            if delimiter:
                return delimiter.join(vs)
            if stringification.syntax:
                if stringification.syntax == SerializationSyntaxType.JSON:
                    return json.dumps(vs)
                if stringification.syntax == SerializationSyntaxType.YAML:
                    return yaml.dump(vs, default_flow_style=True).strip()
                msg = f"Unknown syntax: {stringification.syntax}"
                raise ValueError(msg)
            msg = f"Cannot convert multivalued to single valued: {vs}; no delimiter"
            raise ValueError(msg)
        if len(vs) > 1:
            msg = f"Cannot coerce multiple values {vs}"
            raise ValueError(msg)
        if len(vs) == 0:
            return None
        return vs[0]

    def _singlevalued_to_multivalued(self, v: Any, slot_derivation: SlotDerivation) -> list[Any]:
        stringification = slot_derivation.stringification
        if stringification:
            delimiter = stringification.delimiter
            if delimiter:
                vs = v.split(slot_derivation.stringification.delimiter)
                if vs == [""]:
                    vs = []
            elif stringification.syntax:
                syntax = stringification.syntax
                if syntax == SerializationSyntaxType.JSON:
                    vs = json.loads(v)
                elif syntax == SerializationSyntaxType.YAML:
                    vs = yaml.safe_load(v)
                else:
                    msg = f"Unknown syntax: {syntax}"
                    raise ValueError(msg)
            else:
                msg = f"Cannot convert single valued to multivalued: {v}; no delimiter"
                raise ValueError(msg)
            return vs
        return [v]

    def transform_object(
        self,
        source_obj: YAMLRoot | BaseModel,
        target_class: type[YAMLRoot] | type[BaseModel] | None = None,
    ) -> YAMLRoot | BaseModel:
        """
        Transform an object into an object of class target_class.

        :param source_obj: source object
        :type source_obj: Union[YAMLRoot, BaseModel]
        :param target_class: class to transform the object into, defaults to None
        :type target_class: Optional[Union[Type[YAMLRoot], Type[BaseModel]]], optional
        :return: transformed object of class target_class
        :rtype: Union[YAMLRoot, BaseModel]
        """
        if not target_class:
            msg = "No target_class specified for transform_object"
            raise ValueError(msg)

        source_type = type(source_obj)
        source_type_name = source_type.__name__
        # if isinstance(source_obj, YAMLRoot):
        #    source_obj_dict = json_dumper.to_dict(source_obj)
        # elif isinstance(source_obj, BaseModel):
        #    source_obj_dict = source_obj.dict()
        # else:
        #    raise ValueError(f"Do not know how to handle type: {typ}")
        tr_obj_dict = self.map_object(source_obj, source_type_name)
        return target_class(**tr_obj_dict)

    def transform_enum(self, source_value: str, enum_names: list[str], source_obj: Any) -> str | None:
        """Transform a source enum value through one or more enum derivations.

        Iterates *enum_names* in order. For each enum derivation, tries
        expression evaluation first, then permissible-value mappings.
        If mirror_source is set on a derivation and no mapping matched,
        returns the source value unchanged without trying further enums.

        :param source_value: The source enum value to transform.
        :param enum_names: Ordered list of source enum names to try.
        :param source_obj: The full source object (used for expr evaluation).
        :return: Transformed value, or None if no mapping found.
        """
        for enum_name in enum_names:
            enum_deriv = self._get_enum_derivation(enum_name)
            if enum_deriv.expr:
                try:
                    v = eval_expr(enum_deriv.expr, **source_obj, NULL=None)
                except Exception:
                    aeval = Interpreter(usersyms={"src": source_obj, "target": None, "uuid5": _uuid5})
                    aeval(enum_deriv.expr)
                    v = aeval.symtable["target"]
                if v is not None:
                    return v
            for pv_deriv in enum_deriv.permissible_value_derivations.values():
                if source_value == pv_deriv.populated_from:
                    return pv_deriv.name
                if source_value in pv_deriv.sources:
                    return pv_deriv.name
            if enum_deriv.mirror_source:
                return str(source_value)
        return None

    def _perform_pivot_operation(
        self,
        pivot_op: PivotOperation,
        source_obj: DICT_OBJ,
        class_deriv: ClassDerivation,
        sv: SchemaView,
        source_type: str,
    ) -> DICT_OBJ | list[DICT_OBJ]:
        """
        Perform a pivot (MELT or UNMELT) operation.

        :param pivot_op: The pivot operation configuration
        :param source_obj: The source object to transform
        :param class_deriv: The class derivation spec
        :param sv: Source schema view
        :param source_type: Source type name
        :return: Transformed object(s)
        """
        if pivot_op.direction == PivotDirectionType.UNMELT:
            return self._perform_unmelt(pivot_op, source_obj, class_deriv, sv, source_type)
        elif pivot_op.direction == PivotDirectionType.MELT:
            return self._perform_melt(pivot_op, source_obj, class_deriv)
        else:
            msg = f"Unknown pivot direction: {pivot_op.direction}"
            raise ValueError(msg)

    def _perform_unmelt(
        self,
        pivot_op: PivotOperation,
        source_obj: DICT_OBJ,
        class_deriv: ClassDerivation,
        sv: SchemaView,
        source_type: str,
    ) -> DICT_OBJ:
        """
        Transform EAV/long format to wide format.

        Handles both single record and collection-based unmelt:
        - Single record: {att: 'len', val: 1.0} -> {len: 1.0}
        - Collection: [{att: 'h', val: 1.8}, {att: 'w', val: 75}] -> {h: 1.8, w: 75}

        :param pivot_op: The pivot operation configuration
        :param source_obj: The source object (may contain EAV records)
        :param class_deriv: The class derivation spec
        :param sv: Source schema view
        :param source_type: Source type name
        :return: Wide-format object
        """
        variable_slot = pivot_op.variable_slot or "variable"
        value_slot = pivot_op.value_slot or "value"
        unit_slot = pivot_op.unit_slot
        template = pivot_op.slot_name_template or "{variable}"

        # Check if source_obj itself is an EAV record (has variable and value slots)
        if variable_slot in source_obj and value_slot in source_obj:
            return self._unmelt_single_record(pivot_op, source_obj, variable_slot, value_slot, unit_slot, template)

        # Otherwise, look for a collection of EAV records in the source
        # Try to find a multivalued slot containing EAV records
        for slot_name, slot_value in source_obj.items():
            if isinstance(slot_value, list) and len(slot_value) > 0:
                first_item = slot_value[0]
                if isinstance(first_item, dict) and variable_slot in first_item:
                    return self._unmelt_collection(pivot_op, slot_value)

        # Fallback: treat source_obj as a single EAV record
        return self._unmelt_single_record(pivot_op, source_obj, variable_slot, value_slot, unit_slot, template)

    def _unmelt_single_record(
        self,
        pivot_op: PivotOperation,
        record: DICT_OBJ,
        variable_slot: str,
        value_slot: str,
        unit_slot: str | None,
        template: str,
    ) -> DICT_OBJ:
        """
        Unmelt a single EAV record into slot assignment(s).

        Example:
            Input:  {att: 'len', val: 1.0, unit: 'm'}
            Output: {len_m: 1.0}
        """
        result = {}

        # Copy ID slots (non-pivoted attributes)
        if pivot_op.id_slots:
            for id_slot in pivot_op.id_slots:
                if id_slot in record:
                    result[id_slot] = record[id_slot]

        # Get variable and value
        variable = record.get(variable_slot)
        value = record.get(value_slot)

        if variable is None:
            logger.warning(f"No variable found in slot '{variable_slot}'")
            return result

        # Generate target slot name
        if unit_slot and unit_slot in record:
            unit = record[unit_slot]
            target_slot_name = template.format(variable=variable, unit=unit)
        else:
            target_slot_name = template.format(variable=variable, unit="")

        # Validate against target schema if unmelt_to_class specified
        if pivot_op.unmelt_to_class and self.target_schemaview:
            valid_slots = [s.name for s in self.target_schemaview.class_induced_slots(pivot_op.unmelt_to_class)]
            if pivot_op.unmelt_to_slots:
                valid_slots = [s for s in valid_slots if s in pivot_op.unmelt_to_slots]

            if target_slot_name not in valid_slots:
                logger.warning(
                    f"Generated slot name '{target_slot_name}' not in valid slots for "
                    f"class '{pivot_op.unmelt_to_class}'"
                )

        result[target_slot_name] = value
        return result

    def _unmelt_collection(
        self,
        pivot_op: PivotOperation,
        records: list[DICT_OBJ],
    ) -> DICT_OBJ:
        """
        Unmelt a collection of EAV records into a single wide object.

        Example:
            Input:  [{att: 'height', val: 1.8}, {att: 'weight', val: 75.0}]
            Output: {height: 1.8, weight: 75.0}
        """
        result = {}
        variable_slot = pivot_op.variable_slot or "variable"
        value_slot = pivot_op.value_slot or "value"
        unit_slot = pivot_op.unit_slot
        template = pivot_op.slot_name_template or "{variable}"

        for record in records:
            variable = record.get(variable_slot)
            value = record.get(value_slot)

            if variable is None:
                continue

            if unit_slot and unit_slot in record:
                unit = record[unit_slot]
                target_slot = template.format(variable=variable, unit=unit)
            else:
                target_slot = template.format(variable=variable, unit="")

            if target_slot in result:
                logger.warning(f"Duplicate variable '{variable}' in unmelt; last value wins")

            result[target_slot] = value

        # Copy ID slots from the first record (assuming they're the same across all)
        if pivot_op.id_slots and records:
            for id_slot in pivot_op.id_slots:
                if id_slot in records[0]:
                    result[id_slot] = records[0][id_slot]

        return result

    def _perform_melt(
        self,
        pivot_op: PivotOperation,
        source_obj: DICT_OBJ,
        slot_derivation: SlotDerivation | None = None,
    ) -> list[DICT_OBJ]:
        """
        Transform wide format to EAV/long format.

        Example:
            Input:  {height: 1.8, weight: 75.0}
            Output: [{variable: 'height', value: 1.8}, {variable: 'weight', value: 75.0}]

        :param pivot_op: The pivot operation configuration
        :param source_obj: The source object in wide format
        :param slot_derivation: Optional slot derivation (for context)
        :return: List of EAV records
        """
        variable_slot = pivot_op.variable_slot or "variable"
        value_slot = pivot_op.value_slot or "value"

        # Determine which slots to melt
        if pivot_op.source_slots:
            slots_to_melt = list(pivot_op.source_slots)
        elif pivot_op.unmelt_to_class and self.target_schemaview:
            # Infer from target class
            slots_to_melt = [s.name for s in self.target_schemaview.class_induced_slots(pivot_op.unmelt_to_class)]
        else:
            # Melt all non-ID slots
            id_slots = set(pivot_op.id_slots or [])
            slots_to_melt = [k for k in source_obj.keys() if k not in id_slots]

        results = []
        base_record = {}

        # Copy ID slots to base record
        if pivot_op.id_slots:
            for id_slot in pivot_op.id_slots:
                if id_slot in source_obj:
                    base_record[id_slot] = source_obj[id_slot]

        # Create one record per melted slot
        for slot_name in slots_to_melt:
            if slot_name in source_obj and source_obj[slot_name] is not None:
                record = base_record.copy()
                record[variable_slot] = slot_name
                record[value_slot] = source_obj[slot_name]
                results.append(record)

        return results

index(source_obj, target=None)

Create an index over a container object.

:param source_obj: source data structure to be indexed :param target: class to convert source object into

Source code in src/linkml_map/transformer/object_transformer.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def index(self, source_obj: Any, target: str | None = None) -> None:
    """
    Create an index over a container object.

    :param source_obj: source data structure to be indexed
    :param target: class to convert source object into
    """
    if isinstance(source_obj, dict):
        if target is None:
            [target] = [c.name for c in self.source_schemaview.all_classes().values() if c.tree_root]
        if target is None:
            msg = f"target must be passed if source_obj is dict: {source_obj}"
            raise ValueError(msg)
        source_obj_typed = dynamic_object(source_obj, self.source_schemaview, target)
        self.object_index = ObjectIndex(source_obj_typed, schemaview=self.source_schemaview)
    else:
        self.object_index = ObjectIndex(source_obj, schemaview=self.source_schemaview)

map_object(source_obj, source_type=None, target_type=None, class_derivation=None)

Transform a source object into a target object.

Slot derivations are evaluated in declaration order. Expressions may call slot('name') to reference previously computed values. Slots with hide: true are computed (so downstream slot() calls can reference them) but excluded from the returned dict.

:param source_obj: source data structure :param source_type: source_obj instantiates this (may be class, type, or enum) :param target_type: target_obj instantiates this (may be class, type, or enum) :return: transformed data, either as type target_type or a dictionary

Source code in src/linkml_map/transformer/object_transformer.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def map_object(
    self,
    source_obj: OBJECT_TYPE,
    source_type: str | None = None,
    target_type: str | None = None,
    class_derivation: ClassDerivation | None = None,
) -> DICT_OBJ | Any:
    """
    Transform a source object into a target object.

    Slot derivations are evaluated in declaration order. Expressions
    may call ``slot('name')`` to reference previously computed values.
    Slots with ``hide: true`` are computed (so downstream ``slot()``
    calls can reference them) but excluded from the returned dict.

    :param source_obj: source data structure
    :param source_type: source_obj instantiates this (may be class, type, or enum)
    :param target_type: target_obj instantiates this (may be class, type, or enum)
    :return: transformed data, either as type target_type or a dictionary
    """
    sv = self.source_schemaview
    source_type = self._resolve_source_type(source_type, sv)

    if source_type in sv.all_types():
        if target_type:
            if target_type == "string":
                return str(source_obj)
            if target_type == "integer":
                return int(source_obj)
            if target_type in {"float", "double"}:
                return float(source_obj)
            if target_type == "uri":
                return self.expand_curie(source_obj)
            if target_type == "curie":
                return self.compress_uri(source_obj)
        return source_obj
    if source_type in sv.all_enums():
        return self.transform_enum(source_obj, [source_type], source_obj)

    source_obj_typed = None
    if isinstance(source_obj, BaseModel | YAMLRoot):
        # ensure dict
        source_obj_typed = source_obj
        source_obj = vars(source_obj)
    if not isinstance(source_obj, dict):
        logger.warning(f"Unexpected: {source_obj} for type {source_type}")
        return source_obj
    class_deriv = class_derivation or self._get_class_derivation(source_type)

    # Handle class-level pivot operations (UNMELT from EAV to wide format)
    if class_deriv.pivot_operation:
        return self._perform_pivot_operation(class_deriv.pivot_operation, source_obj, class_deriv, sv, source_type)

    context = DerivationContext(
        source_obj=source_obj,
        source_obj_typed=source_obj_typed,
        source_type=source_type,
        sv=sv,
        class_deriv=class_deriv,
    )
    tgt_attrs = {}
    bindings = Bindings.from_context(self, context)
    expr_functions = {"slot": lambda name: tgt_attrs.get(name)}
    for slot_deriv in class_deriv.slot_derivations.values():
        with self._slot_error_context(slot_deriv, context):
            tgt_attrs[str(slot_deriv.name)] = self._derive_slot(
                slot_deriv, context, target_type, bindings, expr_functions
            )
    # Remove hidden slots from output (they exist only for slot() references)
    for slot_deriv in class_deriv.slot_derivations.values():
        if slot_deriv.hide:
            tgt_attrs.pop(str(slot_deriv.name), None)
    return tgt_attrs

transform_enum(source_value, enum_names, source_obj)

Transform a source enum value through one or more enum derivations.

Iterates enum_names in order. For each enum derivation, tries expression evaluation first, then permissible-value mappings. If mirror_source is set on a derivation and no mapping matched, returns the source value unchanged without trying further enums.

:param source_value: The source enum value to transform. :param enum_names: Ordered list of source enum names to try. :param source_obj: The full source object (used for expr evaluation). :return: Transformed value, or None if no mapping found.

Source code in src/linkml_map/transformer/object_transformer.py
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
def transform_enum(self, source_value: str, enum_names: list[str], source_obj: Any) -> str | None:
    """Transform a source enum value through one or more enum derivations.

    Iterates *enum_names* in order. For each enum derivation, tries
    expression evaluation first, then permissible-value mappings.
    If mirror_source is set on a derivation and no mapping matched,
    returns the source value unchanged without trying further enums.

    :param source_value: The source enum value to transform.
    :param enum_names: Ordered list of source enum names to try.
    :param source_obj: The full source object (used for expr evaluation).
    :return: Transformed value, or None if no mapping found.
    """
    for enum_name in enum_names:
        enum_deriv = self._get_enum_derivation(enum_name)
        if enum_deriv.expr:
            try:
                v = eval_expr(enum_deriv.expr, **source_obj, NULL=None)
            except Exception:
                aeval = Interpreter(usersyms={"src": source_obj, "target": None, "uuid5": _uuid5})
                aeval(enum_deriv.expr)
                v = aeval.symtable["target"]
            if v is not None:
                return v
        for pv_deriv in enum_deriv.permissible_value_derivations.values():
            if source_value == pv_deriv.populated_from:
                return pv_deriv.name
            if source_value in pv_deriv.sources:
                return pv_deriv.name
        if enum_deriv.mirror_source:
            return str(source_value)
    return None

transform_object(source_obj, target_class=None)

Transform an object into an object of class target_class.

:param source_obj: source object :type source_obj: Union[YAMLRoot, BaseModel] :param target_class: class to transform the object into, defaults to None :type target_class: Optional[Union[Type[YAMLRoot], Type[BaseModel]]], optional :return: transformed object of class target_class :rtype: Union[YAMLRoot, BaseModel]

Source code in src/linkml_map/transformer/object_transformer.py
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
def transform_object(
    self,
    source_obj: YAMLRoot | BaseModel,
    target_class: type[YAMLRoot] | type[BaseModel] | None = None,
) -> YAMLRoot | BaseModel:
    """
    Transform an object into an object of class target_class.

    :param source_obj: source object
    :type source_obj: Union[YAMLRoot, BaseModel]
    :param target_class: class to transform the object into, defaults to None
    :type target_class: Optional[Union[Type[YAMLRoot], Type[BaseModel]]], optional
    :return: transformed object of class target_class
    :rtype: Union[YAMLRoot, BaseModel]
    """
    if not target_class:
        msg = "No target_class specified for transform_object"
        raise ValueError(msg)

    source_type = type(source_obj)
    source_type_name = source_type.__name__
    # if isinstance(source_obj, YAMLRoot):
    #    source_obj_dict = json_dumper.to_dict(source_obj)
    # elif isinstance(source_obj, BaseModel):
    #    source_obj_dict = source_obj.dict()
    # else:
    #    raise ValueError(f"Do not know how to handle type: {typ}")
    tr_obj_dict = self.map_object(source_obj, source_type_name)
    return target_class(**tr_obj_dict)