Inference Package

Inference Utils

Infer missing values in a specification.

induce_missing_values(specification, source_schemaview)

Infer missing values in a specification.

Currently only uses copy directives.

:param specification: :param source_schemaview: :return:

Source code in src/linkml_map/inference/inference.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def induce_missing_values(specification: TransformationSpecification, source_schemaview: SchemaView) -> None:
    """
    Infer missing values in a specification.

    Currently only uses copy directives.

    :param specification:
    :param source_schemaview:
    :return:
    """
    for cd in specification.class_derivations:
        if not cd.populated_from:
            cd.populated_from = cd.name
    _warn_deprecated_fields(specification)

    for cd in specification.class_derivations:
        for sd in cd.slot_derivations.values():
            if sd.class_derivations:
                # skip inference for nested class derivations; inferences come from
                # the class derivation later
                continue
            # for null mappings, assume that the slot is copied from the same slot in the source
            # TODO: decide if this is the desired behavior
            if sd.populated_from is None and sd.expr is None:
                sd.populated_from = sd.name
            if sd.range is None and sd.value is not None:
                sd.range = "string"
            if not sd.range and sd.populated_from:
                # auto-populate range field
                if cd.populated_from not in source_schemaview.all_classes():
                    continue

                populated_from_slot = sd.populated_from

                source_induced_slot_range = None
                if "." in populated_from_slot and cd.joins:
                    table_name, field_path = populated_from_slot.split(".", 1)
                    if table_name in cd.joins:
                        joined_class = cd.joins[table_name].class_named or table_name
                        if joined_class not in source_schemaview.all_classes():
                            continue
                        if field_path in source_schemaview.class_induced_slots(joined_class):
                            source_induced_slot = source_schemaview.induced_slot(field_path, joined_class)
                            source_induced_slot_range = source_induced_slot.range
                        else:
                            continue

                if source_induced_slot_range is None:
                    fk_resolution = resolve_fk_path(source_schemaview, cd.populated_from, populated_from_slot)
                    if fk_resolution:
                        if not fk_resolution.final_slot:
                            continue
                        source_induced_slot_range = fk_resolution.final_slot.range
                    else:
                        source_induced_slot = source_schemaview.induced_slot(populated_from_slot, cd.populated_from)
                        source_induced_slot_range = source_induced_slot.range

                for range_cd in specification.class_derivations:
                    if range_cd.populated_from == source_induced_slot_range:
                        sd.range = range_cd.name

Inverter

Invert a transformation specification.

NonInvertibleSpecificationError

Bases: ValueError

Error thrown when a specification is not invertible.

Source code in src/linkml_map/inference/inverter.py
23
24
class NonInvertibleSpecificationError(ValueError):
    """Error thrown when a specification is not invertible."""

TransformationSpecificationInverter dataclass

Invert a transformation specification.

Source code in src/linkml_map/inference/inverter.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@dataclass
class TransformationSpecificationInverter:
    """
    Invert a transformation specification.
    """

    source_schemaview: SchemaView = None
    """The source schema for the forward transformation.
    Note this becomes the target schema for the generated inverse transformation,
    because the goal of the inverted transformation is to map back to the original source."""

    target_schemaview: SchemaView = None
    """The target schema for the forward transformation.
    Note this becomes the source schema for the generated inverse transformation."""

    strict: bool = field(default=True)

    def invert(self, spec: TransformationSpecification) -> TransformationSpecification:
        """
        Invert a transformation specification.

        :param spec:
        :return:
        """
        logger.info("Inverting specification")
        inverted_spec = TransformationSpecification()
        for cd in spec.class_derivations:
            inverted_cd = self.invert_class_derivation(cd, spec)
            inverted_spec.class_derivations.append(inverted_cd)
        for ed in spec.enum_derivations.values():
            inverted_ed = self.invert_enum_derivation(ed, spec)
            inverted_spec.enum_derivations[inverted_ed.name] = inverted_ed
        return inverted_spec

    def invert_class_derivation(self, cd: ClassDerivation, spec: TransformationSpecification) -> ClassDerivation:
        """
        Invert a class derivation.

        :param cd:
        :param spec:
        :return:
        """
        inverted_cd = ClassDerivation(name=cd.populated_from if cd.populated_from else cd.name, populated_from=cd.name)
        for sd in cd.slot_derivations.values():
            if sd.hide:
                # Hidden slots have no target counterpart, so they have nothing
                # to invert from.
                continue
            inverted_sd = self.invert_slot_derivation(sd, cd, spec)
            if inverted_sd:
                inverted_cd.slot_derivations[inverted_sd.name] = inverted_sd
            elif self.strict:
                msg = f"Cannot invert slot derivation: {sd.name}"
                raise NonInvertibleSpecificationError(msg)
        return inverted_cd

    def invert_enum_derivation(self, ed: EnumDerivation, spec: TransformationSpecification) -> EnumDerivation:
        """
        Invert an enum derivation.

        :param ed:
        :param spec:
        :return:
        """
        inverted_ed = EnumDerivation(name=ed.populated_from if ed.populated_from else ed.name, populated_from=ed.name)
        if inverted_ed.expr:
            msg = "TODO: invert enum derivation with expression"
            raise NonInvertibleSpecificationError(msg)
        for pv_deriv in ed.permissible_value_derivations.values():
            inverted_pv_deriv = PermissibleValueDerivation(
                name=pv_deriv.populated_from if pv_deriv.populated_from else pv_deriv.name,
                populated_from=pv_deriv.name,
            )
            inverted_ed.permissible_value_derivations[inverted_pv_deriv.name] = inverted_pv_deriv
        return inverted_ed

    def invert_slot_derivation(
        self, sd: SlotDerivation, cd: ClassDerivation, spec: TransformationSpecification
    ) -> SlotDerivation | None:
        """
        Invert a slot derivation.

        :param sd:
        :param cd:
        :param spec:
        :return:
        """
        populated_from = sd.populated_from
        if sd.expr:
            if re.match(r"^\w+$", sd.expr):
                populated_from = sd.expr
            else:
                if not self.strict:
                    return None
                # TODO: add logic for reversible expressions
                msg = f"Cannot invert expression {sd.expr} in slot derivation: {sd.name}"
                raise NonInvertibleSpecificationError(msg)

        if not populated_from:
            # use defaults. TODO: decide on semantics of defaults
            populated_from = sd.name
        inverted_sd = SlotDerivation(name=populated_from, populated_from=sd.name)
        source_cls_name = cd.populated_from
        if (source_cls_name is None or source_cls_name in self.source_schemaview.all_classes()) and sd.populated_from:
            source_slot = self.source_schemaview.induced_slot(sd.populated_from, source_cls_name)
        else:
            source_slot = None
        if sd.range:
            inverted_sd.range = source_slot.range
            if source_slot.range in self.source_schemaview.all_classes():
                id_slot = self.source_schemaview.get_identifier_slot(source_slot.range, use_key=True)
                if id_slot:
                    inverted_sd.dictionary_key = id_slot.name
        if source_slot and source_slot.multivalued:
            if source_slot.inlined_as_list:
                inverted_sd.cast_collection_as = CollectionType.MultiValuedList
            elif source_slot.inlined and source_slot.range in self.source_schemaview.all_classes():
                id_slot = self.source_schemaview.get_identifier_slot(source_slot.range, use_key=True)
                if id_slot:
                    inverted_sd.cast_collection_as = CollectionType.MultiValuedDict
                    inverted_sd.dictionary_key = id_slot.name
        if sd.unit_conversion:
            source_slot = self.source_schemaview.induced_slot(sd.populated_from, source_cls_name)
            target_unit = None
            target_unit_scheme = None
            if source_slot.unit is not None:
                for p in ["ucum_code", "symbol"]:
                    target_unit = getattr(source_slot.unit, p, None)
                    if target_unit is not None:
                        target_unit_scheme = p
                        break
            inverted_uc = UnitConversionConfiguration(target_unit=target_unit, target_unit_scheme=target_unit_scheme)
            if sd.unit_conversion.source_unit_slot:
                inverted_uc.target_unit_slot = sd.unit_conversion.source_unit_slot
            if sd.unit_conversion.source_magnitude_slot:
                inverted_uc.target_magnitude_slot = sd.unit_conversion.source_magnitude_slot
            if sd.unit_conversion.target_unit:
                inverted_uc.source_unit = sd.unit_conversion.target_unit
            inverted_sd.unit_conversion = inverted_uc
        if sd.stringification:
            inverted_sd.stringification = copy(sd.stringification)
            inverted_sd.stringification.reversed = not inverted_sd.stringification.reversed
        return inverted_sd

source_schemaview = None class-attribute instance-attribute

The source schema for the forward transformation. Note this becomes the target schema for the generated inverse transformation, because the goal of the inverted transformation is to map back to the original source.

target_schemaview = None class-attribute instance-attribute

The target schema for the forward transformation. Note this becomes the source schema for the generated inverse transformation.

invert(spec)

Invert a transformation specification.

:param spec: :return:

Source code in src/linkml_map/inference/inverter.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def invert(self, spec: TransformationSpecification) -> TransformationSpecification:
    """
    Invert a transformation specification.

    :param spec:
    :return:
    """
    logger.info("Inverting specification")
    inverted_spec = TransformationSpecification()
    for cd in spec.class_derivations:
        inverted_cd = self.invert_class_derivation(cd, spec)
        inverted_spec.class_derivations.append(inverted_cd)
    for ed in spec.enum_derivations.values():
        inverted_ed = self.invert_enum_derivation(ed, spec)
        inverted_spec.enum_derivations[inverted_ed.name] = inverted_ed
    return inverted_spec

invert_class_derivation(cd, spec)

Invert a class derivation.

:param cd: :param spec: :return:

Source code in src/linkml_map/inference/inverter.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def invert_class_derivation(self, cd: ClassDerivation, spec: TransformationSpecification) -> ClassDerivation:
    """
    Invert a class derivation.

    :param cd:
    :param spec:
    :return:
    """
    inverted_cd = ClassDerivation(name=cd.populated_from if cd.populated_from else cd.name, populated_from=cd.name)
    for sd in cd.slot_derivations.values():
        if sd.hide:
            # Hidden slots have no target counterpart, so they have nothing
            # to invert from.
            continue
        inverted_sd = self.invert_slot_derivation(sd, cd, spec)
        if inverted_sd:
            inverted_cd.slot_derivations[inverted_sd.name] = inverted_sd
        elif self.strict:
            msg = f"Cannot invert slot derivation: {sd.name}"
            raise NonInvertibleSpecificationError(msg)
    return inverted_cd

invert_enum_derivation(ed, spec)

Invert an enum derivation.

:param ed: :param spec: :return:

Source code in src/linkml_map/inference/inverter.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def invert_enum_derivation(self, ed: EnumDerivation, spec: TransformationSpecification) -> EnumDerivation:
    """
    Invert an enum derivation.

    :param ed:
    :param spec:
    :return:
    """
    inverted_ed = EnumDerivation(name=ed.populated_from if ed.populated_from else ed.name, populated_from=ed.name)
    if inverted_ed.expr:
        msg = "TODO: invert enum derivation with expression"
        raise NonInvertibleSpecificationError(msg)
    for pv_deriv in ed.permissible_value_derivations.values():
        inverted_pv_deriv = PermissibleValueDerivation(
            name=pv_deriv.populated_from if pv_deriv.populated_from else pv_deriv.name,
            populated_from=pv_deriv.name,
        )
        inverted_ed.permissible_value_derivations[inverted_pv_deriv.name] = inverted_pv_deriv
    return inverted_ed

invert_slot_derivation(sd, cd, spec)

Invert a slot derivation.

:param sd: :param cd: :param spec: :return:

Source code in src/linkml_map/inference/inverter.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def invert_slot_derivation(
    self, sd: SlotDerivation, cd: ClassDerivation, spec: TransformationSpecification
) -> SlotDerivation | None:
    """
    Invert a slot derivation.

    :param sd:
    :param cd:
    :param spec:
    :return:
    """
    populated_from = sd.populated_from
    if sd.expr:
        if re.match(r"^\w+$", sd.expr):
            populated_from = sd.expr
        else:
            if not self.strict:
                return None
            # TODO: add logic for reversible expressions
            msg = f"Cannot invert expression {sd.expr} in slot derivation: {sd.name}"
            raise NonInvertibleSpecificationError(msg)

    if not populated_from:
        # use defaults. TODO: decide on semantics of defaults
        populated_from = sd.name
    inverted_sd = SlotDerivation(name=populated_from, populated_from=sd.name)
    source_cls_name = cd.populated_from
    if (source_cls_name is None or source_cls_name in self.source_schemaview.all_classes()) and sd.populated_from:
        source_slot = self.source_schemaview.induced_slot(sd.populated_from, source_cls_name)
    else:
        source_slot = None
    if sd.range:
        inverted_sd.range = source_slot.range
        if source_slot.range in self.source_schemaview.all_classes():
            id_slot = self.source_schemaview.get_identifier_slot(source_slot.range, use_key=True)
            if id_slot:
                inverted_sd.dictionary_key = id_slot.name
    if source_slot and source_slot.multivalued:
        if source_slot.inlined_as_list:
            inverted_sd.cast_collection_as = CollectionType.MultiValuedList
        elif source_slot.inlined and source_slot.range in self.source_schemaview.all_classes():
            id_slot = self.source_schemaview.get_identifier_slot(source_slot.range, use_key=True)
            if id_slot:
                inverted_sd.cast_collection_as = CollectionType.MultiValuedDict
                inverted_sd.dictionary_key = id_slot.name
    if sd.unit_conversion:
        source_slot = self.source_schemaview.induced_slot(sd.populated_from, source_cls_name)
        target_unit = None
        target_unit_scheme = None
        if source_slot.unit is not None:
            for p in ["ucum_code", "symbol"]:
                target_unit = getattr(source_slot.unit, p, None)
                if target_unit is not None:
                    target_unit_scheme = p
                    break
        inverted_uc = UnitConversionConfiguration(target_unit=target_unit, target_unit_scheme=target_unit_scheme)
        if sd.unit_conversion.source_unit_slot:
            inverted_uc.target_unit_slot = sd.unit_conversion.source_unit_slot
        if sd.unit_conversion.source_magnitude_slot:
            inverted_uc.target_magnitude_slot = sd.unit_conversion.source_magnitude_slot
        if sd.unit_conversion.target_unit:
            inverted_uc.source_unit = sd.unit_conversion.target_unit
        inverted_sd.unit_conversion = inverted_uc
    if sd.stringification:
        inverted_sd.stringification = copy(sd.stringification)
        inverted_sd.stringification.reversed = not inverted_sd.stringification.reversed
    return inverted_sd

Schema Mapper

Translates a source schema and transformation specification into a target schema.

Source code in src/linkml_map/inference/schema_mapper.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
@dataclass
class SchemaMapper:
    """
    Translates a source schema and transformation specification into a target schema.
    """

    source_schemaview: SchemaView = None

    transformer: Transformer = None

    source_to_target_class_mappings: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list))

    slot_info: dict[tuple[str, str], Any] = field(default_factory=dict)

    def _copy_dict(
        self,
        copy_directive: CopyDirective,
        src_elements,
        tgt_elements,
    ) -> None:
        if copy_directive.copy_all:
            for element in src_elements:
                tgt_elements[element] = src_elements[element]
        if copy_directive.exclude:
            for element in src_elements:
                if element in copy_directive.exclude:
                    del tgt_elements[element]
        if copy_directive.exclude_all:
            elements_to_delete = list(tgt_elements)
            for element in elements_to_delete:
                del tgt_elements[element]
        if copy_directive.include:
            for element in copy_directive.include:
                if element in src_elements:
                    tgt_elements[element] = src_elements[element]

    def _copy_list(
        self,
        copy_directive: CopyDirective,
        src_elements,
        tgt_elements,
    ) -> None:
        if copy_directive.copy_all:
            for element in src_elements:
                if element not in tgt_elements:
                    tgt_elements.append(element)
        if copy_directive.exclude:
            for element in copy_directive.exclude:
                if element in tgt_elements:
                    tgt_elements.remove(element)
        if copy_directive.exclude_all:
            tgt_elements.clear()
        if copy_directive.include:
            for element in copy_directive.include:
                if element in src_elements and element not in tgt_elements:
                    tgt_elements.append(element)

    def _copy_schema(
        self,
        copy_directives: list[CopyDirective],
        source: SchemaDefinition,
        target: SchemaDefinition,
    ) -> SchemaDefinition:
        if type(copy_directives) is dict:
            copy_directives_list = copy_directives.values()
        else:
            copy_directives_list = copy_directives

        for copy_directive in copy_directives_list:
            for element_type in ["classes", "slots", "enums", "types"]:
                if not hasattr(source, element_type):
                    continue
                src_elements = getattr(source, element_type)
                tgt_elements = getattr(target, element_type)
                self._copy_dict(copy_directive, src_elements, tgt_elements)
        return target

    def _copy_class(
        self,
        copy_directives: list[CopyDirective],
        source: ClassDefinition,
        target: ClassDefinition,
    ) -> ClassDefinition:
        if type(copy_directives) is dict:
            copy_directives_list = copy_directives.values()
        else:
            copy_directives_list = copy_directives

        for copy_directive in copy_directives_list:
            if hasattr(source, "attributes"):
                # copy attributes (which is a dict)
                src_elements = source.attributes
                tgt_elements = target.attributes
                self._copy_dict(copy_directive, src_elements, tgt_elements)
            if hasattr(source, "slots"):
                # copy slots (which is a list)
                src_elements = source.slots
                tgt_elements = target.slots
                self._copy_list(copy_directive, src_elements, tgt_elements)
        return target

    def derive_schema(
        self,
        specification: TransformationSpecification | None = None,
        target_schema_id: str | None = None,
        target_schema_name: str | None = None,
        suffix="-derived",
    ) -> SchemaDefinition:
        """
        Use a transformation specification to generate a target/profile schema from a source schema.

        :param specification:
        :return:
        """
        if specification is None:
            specification = self.transformer.specification
        source_schemaview = self.source_schemaview
        source_schema = source_schemaview.schema
        if target_schema_id is None:
            target_schema_id = source_schema.id + suffix
        if target_schema_name is None:
            target_schema_name = source_schema.name + suffix
        target_schema = SchemaDefinition(id=target_schema_id, name=target_schema_name)
        if hasattr(specification, "copy_directives"):
            target_schema = self._copy_schema(
                specification.copy_directives,
                source_schema,
                target_schema,
            )
        for im in source_schema.imports:
            target_schema.imports.append(im)
        for prefix in source_schema.prefixes.values():
            target_schema.prefixes[prefix.prefix_prefix] = prefix
        for class_derivation in specification.class_derivations:
            class_definition = self._derive_class(class_derivation)
            existing = target_schema.classes.get(class_definition.name)
            if existing is not None:
                self._merge_class_definition(existing, class_definition)
            else:
                target_schema.classes[class_definition.name] = class_definition
        for enum_derivation in specification.enum_derivations.values():
            enum_definition = self._derive_enum(enum_derivation)
            target_schema.enums[enum_definition.name] = enum_definition
        target_schema.default_range = source_schema.default_range
        for cd in target_schema.classes.values():
            self._rewire_class(cd)
        for (cn, sn), info in self.slot_info.items():
            cd = target_schema.classes[cn]
            sd = cd.attributes[sn]
            for k, v in info.items():
                setattr(sd, k, v)
        return target_schema

    def _derive_class(self, class_derivation: ClassDerivation) -> ClassDefinition:
        """
        Derive a class from a class derivation.
        """
        populated_from = class_derivation.populated_from
        if not populated_from:
            populated_from = class_derivation.name
        logger.info(f"Populating {class_derivation.name} from {populated_from}")
        source_class = self.source_schemaview.get_class(populated_from)
        if source_class is None:
            logger.warning(f"No such class {populated_from}")
            target_class = ClassDefinition(name=class_derivation.name)
        else:
            target_class = copy(source_class)
            target_class.from_schema = None
            target_class.name = class_derivation.name
            target_class.slots = []
            target_class.attributes = {}
            target_class.slot_usage = {}
            if hasattr(class_derivation, "copy_directives"):
                target_class = self._copy_class(
                    class_derivation.copy_directives,
                    source_class,
                    target_class,
                )
        for slot_derivation in class_derivation.slot_derivations.values():
            if slot_derivation.hide:
                # Hidden slots are intermediates for slot() references at runtime
                # and have no corresponding attribute in the target schema.
                continue
            slot_definition = self._derive_slot(slot_derivation)
            target_class.attributes[slot_definition.name] = slot_definition
        if class_derivation.is_a:
            target_class.is_a = class_derivation.is_a
        if class_derivation.mixins:
            target_class.mixins = class_derivation.mixins
        if class_derivation.target_definition:
            spec_defn = ClassDefinition(name=target_class.name, **class_derivation.target_definition)
            for k, v in vars(spec_defn).items():
                curr_v = getattr(target_class, k, None)
                if curr_v is None or curr_v in ([], {}):
                    setattr(target_class, k, v)
        self.source_to_target_class_mappings[populated_from].append(target_class.name)
        if class_derivation.overrides:
            curr = json_dumper.to_dict(target_class)
            for k, v in class_derivation.overrides.items():
                curr[k] = v
            target_class = ClassDefinition(**curr)
        return target_class

    def _merge_class_definition(self, existing: ClassDefinition, incoming: ClassDefinition) -> None:
        """
        Merge an incoming ClassDefinition into an existing one.

        Used when multiple ClassDerivations target the same class name (e.g.
        two source tables both map to ``Condition``). Attributes from the
        incoming definition are added to the existing one; on conflict the
        incoming value wins with a warning.

        Merged fields: ``attributes``, ``slots``, ``mixins``, ``is_a``.
        Fields like ``slot_usage`` are not merged because ``_derive_class``
        resets them to empty on each derived ClassDefinition.

        :param existing: The ClassDefinition already in the target schema.
        :param incoming: The newly derived ClassDefinition to merge in.
        """
        for attr_name in list(incoming.attributes):
            if attr_name.startswith("_"):
                continue
            if attr_name in existing.attributes:
                logger.warning(
                    "Slot '%s' in class '%s' defined by multiple derivations; later derivation wins",
                    attr_name,
                    existing.name,
                )
            existing.attributes[attr_name] = incoming.attributes[attr_name]

        existing_slot_set = set(existing.slots)
        for slot in incoming.slots:
            if slot not in existing_slot_set:
                existing.slots.append(slot)
                existing_slot_set.add(slot)

        existing_mixin_set = set(existing.mixins)
        for mixin in incoming.mixins:
            if mixin not in existing_mixin_set:
                existing.mixins.append(mixin)
                existing_mixin_set.add(mixin)

        if incoming.is_a:
            if not existing.is_a:
                existing.is_a = incoming.is_a
            elif incoming.is_a != existing.is_a:
                logger.warning(
                    "Class '%s' has conflicting is_a: '%s' vs '%s'; keeping '%s'",
                    existing.name,
                    existing.is_a,
                    incoming.is_a,
                    existing.is_a,
                )

    def _derive_enum(self, enum_derivation: EnumDerivation) -> EnumDefinition:
        """
        Derive an enum from an enum derivation.

        :param enum_derivation:
        :return:
        """
        populated_from = enum_derivation.populated_from
        if not populated_from:
            populated_from = enum_derivation.name
        source_enum = self.source_schemaview.get_enum(populated_from)
        if source_enum is None:
            logger.warning(f"No such enum {populated_from}")
            target_enum = ClassDefinition(name=enum_derivation.name)
        else:
            target_enum = copy(source_enum)
            target_enum.from_schema = None
            target_enum.name = enum_derivation.name
            target_enum.slots = []
            target_enum.attributes = {}
            target_enum.slot_usage = {}
        for pv_derivation in enum_derivation.permissible_value_derivations.values():
            if pv_derivation.populated_from:
                pv = PermissibleValue(text=pv_derivation.populated_from)
                target_enum.permissible_values[pv.text] = pv
            elif pv_derivation.sources:
                for source in pv_derivation.sources:
                    pv = PermissibleValue(text=source)
                    target_enum.permissible_values[pv.text] = pv
            else:
                msg = f"Missing populated_from or sources for {pv_derivation}"
                raise ValueError(msg)
        if enum_derivation.mirror_source:
            for pv in source_enum.permissible_values.values():
                if pv.text not in target_enum.permissible_values:
                    target_enum.permissible_values[pv.text] = copy(pv)
        self.source_to_target_class_mappings[populated_from].append(target_enum.name)
        return target_enum

    def _derive_slot(self, slot_derivation) -> SlotDefinition:
        """
        Derive a slot from a slot derivation.
        """
        populated_from = slot_derivation.populated_from
        if not populated_from:
            populated_from = slot_derivation.name
        source_slot = self.source_schemaview.get_slot(populated_from)
        if source_slot is None:
            target_slot = SlotDefinition(name=slot_derivation.name)
        else:
            target_slot = copy(source_slot)
            target_slot.from_schema = None
            target_slot.owner = None
            target_slot.name = slot_derivation.name
        if slot_derivation.range:
            target_slot.range = slot_derivation.range
        if slot_derivation.target_definition:
            spec_defn = SlotDefinition(name=target_slot.name, **slot_derivation.target_definition)
            for k, v in vars(spec_defn).items():
                setattr(target_slot, k, v)
        if slot_derivation.unit_conversion:
            target_slot.unit = UnitOfMeasure(ucum_code=slot_derivation.unit_conversion.target_unit)
        if slot_derivation.stringification:
            if slot_derivation.stringification.reversed:
                target_slot.multivalued = True
            else:
                target_slot.multivalued = False
        if slot_derivation.dictionary_key:
            target_slot.inlined = True
            target_slot.inlined_as_list = False
            self.slot_info[(target_slot.range, slot_derivation.dictionary_key)] = {"identifier": True}
        if slot_derivation.cast_collection_as:
            if slot_derivation.cast_collection_as == CollectionType.MultiValued:
                target_slot.inlined = True
            elif slot_derivation.cast_collection_as == CollectionType.MultiValuedList:
                target_slot.inlined_as_list = True
            elif slot_derivation.cast_collection_as == CollectionType.MultiValuedDict:
                target_slot.inlined = True
                target_slot.inlined_as_list = False
        if slot_derivation.overrides:
            curr = json_dumper.to_dict(target_slot)
            for k, v in slot_derivation.overrides.items():
                curr[k] = v
            target_slot = SlotDefinition(**curr)
        return target_slot

    def _rewire_class(self, class_definition: ClassDefinition) -> None:
        if class_definition.is_a:
            class_definition.is_a = self._rewire_parent(class_definition, class_definition.is_a)
        mixins = [self._rewire_parent(class_definition, m) for m in class_definition.mixins]
        class_definition.mixins = [m for m in mixins if m is not None]

    def _rewire_parent(self, class_definition: ClassDefinition, parent: ClassDefinitionName) -> str | None:
        if parent in self.source_to_target_class_mappings:
            new_parents = self.source_to_target_class_mappings[parent]
            if len(new_parents) > 1:
                msg = f"Cannot rewire to non-isomorphic mappings {parent} => {new_parents}"
                raise ValueError(msg)
            if len(new_parents) == 1:
                return new_parents[0]
        parent_cls = self.source_schemaview.get_class(parent)
        if parent_cls.is_a:
            return self._rewire_parent(class_definition, parent_cls.is_a)
        return None

    def copy_attributes(
        self,
        target_element: Element,
        source_element: Element,
        copy_directive: CopyDirective,
    ) -> None:
        """
        Copy attributes from source to target according to a directive.

        :param target_element:
        :param source_element:
        :param copy_directive:
        :return:
        """
        for k, v in vars(source_element).items():
            included = False
            if copy_directive.include_all:
                included = True
            if k in copy_directive.include:
                included = True
            if k in copy_directive.exclude:
                included = False
            if included:
                setattr(target_element, k, v)

copy_attributes(target_element, source_element, copy_directive)

Copy attributes from source to target according to a directive.

:param target_element: :param source_element: :param copy_directive: :return:

Source code in src/linkml_map/inference/schema_mapper.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def copy_attributes(
    self,
    target_element: Element,
    source_element: Element,
    copy_directive: CopyDirective,
) -> None:
    """
    Copy attributes from source to target according to a directive.

    :param target_element:
    :param source_element:
    :param copy_directive:
    :return:
    """
    for k, v in vars(source_element).items():
        included = False
        if copy_directive.include_all:
            included = True
        if k in copy_directive.include:
            included = True
        if k in copy_directive.exclude:
            included = False
        if included:
            setattr(target_element, k, v)

derive_schema(specification=None, target_schema_id=None, target_schema_name=None, suffix='-derived')

Use a transformation specification to generate a target/profile schema from a source schema.

:param specification: :return:

Source code in src/linkml_map/inference/schema_mapper.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def derive_schema(
    self,
    specification: TransformationSpecification | None = None,
    target_schema_id: str | None = None,
    target_schema_name: str | None = None,
    suffix="-derived",
) -> SchemaDefinition:
    """
    Use a transformation specification to generate a target/profile schema from a source schema.

    :param specification:
    :return:
    """
    if specification is None:
        specification = self.transformer.specification
    source_schemaview = self.source_schemaview
    source_schema = source_schemaview.schema
    if target_schema_id is None:
        target_schema_id = source_schema.id + suffix
    if target_schema_name is None:
        target_schema_name = source_schema.name + suffix
    target_schema = SchemaDefinition(id=target_schema_id, name=target_schema_name)
    if hasattr(specification, "copy_directives"):
        target_schema = self._copy_schema(
            specification.copy_directives,
            source_schema,
            target_schema,
        )
    for im in source_schema.imports:
        target_schema.imports.append(im)
    for prefix in source_schema.prefixes.values():
        target_schema.prefixes[prefix.prefix_prefix] = prefix
    for class_derivation in specification.class_derivations:
        class_definition = self._derive_class(class_derivation)
        existing = target_schema.classes.get(class_definition.name)
        if existing is not None:
            self._merge_class_definition(existing, class_definition)
        else:
            target_schema.classes[class_definition.name] = class_definition
    for enum_derivation in specification.enum_derivations.values():
        enum_definition = self._derive_enum(enum_derivation)
        target_schema.enums[enum_definition.name] = enum_definition
    target_schema.default_range = source_schema.default_range
    for cd in target_schema.classes.values():
        self._rewire_class(cd)
    for (cn, sn), info in self.slot_info.items():
        cd = target_schema.classes[cn]
        sd = cd.attributes[sn]
        for k, v in info.items():
            setattr(sd, k, v)
    return target_schema