38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420 | @dataclass
class SchemaMapper:
"""
Translates a source schema and transformation specification into a target schema.
"""
source_schemaview: SchemaView = None
transformer: Transformer = None
source_to_target_class_mappings: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list))
slot_info: dict[tuple[str, str], Any] = field(default_factory=dict)
def _copy_dict(
self,
copy_directive: CopyDirective,
src_elements,
tgt_elements,
) -> None:
if copy_directive.copy_all:
for element in src_elements:
tgt_elements[element] = src_elements[element]
if copy_directive.exclude:
for element in src_elements:
if element in copy_directive.exclude:
del tgt_elements[element]
if copy_directive.exclude_all:
elements_to_delete = list(tgt_elements)
for element in elements_to_delete:
del tgt_elements[element]
if copy_directive.include:
for element in copy_directive.include:
if element in src_elements:
tgt_elements[element] = src_elements[element]
def _copy_list(
self,
copy_directive: CopyDirective,
src_elements,
tgt_elements,
) -> None:
if copy_directive.copy_all:
for element in src_elements:
if element not in tgt_elements:
tgt_elements.append(element)
if copy_directive.exclude:
for element in copy_directive.exclude:
if element in tgt_elements:
tgt_elements.remove(element)
if copy_directive.exclude_all:
tgt_elements.clear()
if copy_directive.include:
for element in copy_directive.include:
if element in src_elements and element not in tgt_elements:
tgt_elements.append(element)
def _copy_schema(
self,
copy_directives: list[CopyDirective],
source: SchemaDefinition,
target: SchemaDefinition,
) -> SchemaDefinition:
if type(copy_directives) is dict:
copy_directives_list = copy_directives.values()
else:
copy_directives_list = copy_directives
for copy_directive in copy_directives_list:
for element_type in ["classes", "slots", "enums", "types"]:
if not hasattr(source, element_type):
continue
src_elements = getattr(source, element_type)
tgt_elements = getattr(target, element_type)
self._copy_dict(copy_directive, src_elements, tgt_elements)
return target
def _copy_class(
self,
copy_directives: list[CopyDirective],
source: ClassDefinition,
target: ClassDefinition,
) -> ClassDefinition:
if type(copy_directives) is dict:
copy_directives_list = copy_directives.values()
else:
copy_directives_list = copy_directives
for copy_directive in copy_directives_list:
if hasattr(source, "attributes"):
# copy attributes (which is a dict)
src_elements = source.attributes
tgt_elements = target.attributes
self._copy_dict(copy_directive, src_elements, tgt_elements)
if hasattr(source, "slots"):
# copy slots (which is a list)
src_elements = source.slots
tgt_elements = target.slots
self._copy_list(copy_directive, src_elements, tgt_elements)
return target
def derive_schema(
self,
specification: TransformationSpecification | None = None,
target_schema_id: str | None = None,
target_schema_name: str | None = None,
suffix="-derived",
) -> SchemaDefinition:
"""
Use a transformation specification to generate a target/profile schema from a source schema.
:param specification:
:return:
"""
if specification is None:
specification = self.transformer.specification
source_schemaview = self.source_schemaview
source_schema = source_schemaview.schema
if target_schema_id is None:
target_schema_id = source_schema.id + suffix
if target_schema_name is None:
target_schema_name = source_schema.name + suffix
target_schema = SchemaDefinition(id=target_schema_id, name=target_schema_name)
if hasattr(specification, "copy_directives"):
target_schema = self._copy_schema(
specification.copy_directives,
source_schema,
target_schema,
)
for im in source_schema.imports:
target_schema.imports.append(im)
for prefix in source_schema.prefixes.values():
target_schema.prefixes[prefix.prefix_prefix] = prefix
for class_derivation in specification.class_derivations:
class_definition = self._derive_class(class_derivation)
existing = target_schema.classes.get(class_definition.name)
if existing is not None:
self._merge_class_definition(existing, class_definition)
else:
target_schema.classes[class_definition.name] = class_definition
for enum_derivation in specification.enum_derivations.values():
enum_definition = self._derive_enum(enum_derivation)
target_schema.enums[enum_definition.name] = enum_definition
target_schema.default_range = source_schema.default_range
for cd in target_schema.classes.values():
self._rewire_class(cd)
for (cn, sn), info in self.slot_info.items():
cd = target_schema.classes[cn]
sd = cd.attributes[sn]
for k, v in info.items():
setattr(sd, k, v)
return target_schema
def _derive_class(self, class_derivation: ClassDerivation) -> ClassDefinition:
"""
Derive a class from a class derivation.
"""
populated_from = class_derivation.populated_from
if not populated_from:
populated_from = class_derivation.name
logger.info(f"Populating {class_derivation.name} from {populated_from}")
source_class = self.source_schemaview.get_class(populated_from)
if source_class is None:
logger.warning(f"No such class {populated_from}")
target_class = ClassDefinition(name=class_derivation.name)
else:
target_class = copy(source_class)
target_class.from_schema = None
target_class.name = class_derivation.name
target_class.slots = []
target_class.attributes = {}
target_class.slot_usage = {}
if hasattr(class_derivation, "copy_directives"):
target_class = self._copy_class(
class_derivation.copy_directives,
source_class,
target_class,
)
for slot_derivation in class_derivation.slot_derivations.values():
if slot_derivation.hide:
# Hidden slots are intermediates for slot() references at runtime
# and have no corresponding attribute in the target schema.
continue
slot_definition = self._derive_slot(slot_derivation)
target_class.attributes[slot_definition.name] = slot_definition
if class_derivation.is_a:
target_class.is_a = class_derivation.is_a
if class_derivation.mixins:
target_class.mixins = class_derivation.mixins
if class_derivation.target_definition:
spec_defn = ClassDefinition(name=target_class.name, **class_derivation.target_definition)
for k, v in vars(spec_defn).items():
curr_v = getattr(target_class, k, None)
if curr_v is None or curr_v in ([], {}):
setattr(target_class, k, v)
self.source_to_target_class_mappings[populated_from].append(target_class.name)
if class_derivation.overrides:
curr = json_dumper.to_dict(target_class)
for k, v in class_derivation.overrides.items():
curr[k] = v
target_class = ClassDefinition(**curr)
return target_class
def _merge_class_definition(self, existing: ClassDefinition, incoming: ClassDefinition) -> None:
"""
Merge an incoming ClassDefinition into an existing one.
Used when multiple ClassDerivations target the same class name (e.g.
two source tables both map to ``Condition``). Attributes from the
incoming definition are added to the existing one; on conflict the
incoming value wins with a warning.
Merged fields: ``attributes``, ``slots``, ``mixins``, ``is_a``.
Fields like ``slot_usage`` are not merged because ``_derive_class``
resets them to empty on each derived ClassDefinition.
:param existing: The ClassDefinition already in the target schema.
:param incoming: The newly derived ClassDefinition to merge in.
"""
for attr_name in list(incoming.attributes):
if attr_name.startswith("_"):
continue
if attr_name in existing.attributes:
logger.warning(
"Slot '%s' in class '%s' defined by multiple derivations; later derivation wins",
attr_name,
existing.name,
)
existing.attributes[attr_name] = incoming.attributes[attr_name]
existing_slot_set = set(existing.slots)
for slot in incoming.slots:
if slot not in existing_slot_set:
existing.slots.append(slot)
existing_slot_set.add(slot)
existing_mixin_set = set(existing.mixins)
for mixin in incoming.mixins:
if mixin not in existing_mixin_set:
existing.mixins.append(mixin)
existing_mixin_set.add(mixin)
if incoming.is_a:
if not existing.is_a:
existing.is_a = incoming.is_a
elif incoming.is_a != existing.is_a:
logger.warning(
"Class '%s' has conflicting is_a: '%s' vs '%s'; keeping '%s'",
existing.name,
existing.is_a,
incoming.is_a,
existing.is_a,
)
def _derive_enum(self, enum_derivation: EnumDerivation) -> EnumDefinition:
"""
Derive an enum from an enum derivation.
:param enum_derivation:
:return:
"""
populated_from = enum_derivation.populated_from
if not populated_from:
populated_from = enum_derivation.name
source_enum = self.source_schemaview.get_enum(populated_from)
if source_enum is None:
logger.warning(f"No such enum {populated_from}")
target_enum = ClassDefinition(name=enum_derivation.name)
else:
target_enum = copy(source_enum)
target_enum.from_schema = None
target_enum.name = enum_derivation.name
target_enum.slots = []
target_enum.attributes = {}
target_enum.slot_usage = {}
for pv_derivation in enum_derivation.permissible_value_derivations.values():
if pv_derivation.populated_from:
pv = PermissibleValue(text=pv_derivation.populated_from)
target_enum.permissible_values[pv.text] = pv
elif pv_derivation.sources:
for source in pv_derivation.sources:
pv = PermissibleValue(text=source)
target_enum.permissible_values[pv.text] = pv
else:
msg = f"Missing populated_from or sources for {pv_derivation}"
raise ValueError(msg)
if enum_derivation.mirror_source:
for pv in source_enum.permissible_values.values():
if pv.text not in target_enum.permissible_values:
target_enum.permissible_values[pv.text] = copy(pv)
self.source_to_target_class_mappings[populated_from].append(target_enum.name)
return target_enum
def _derive_slot(self, slot_derivation) -> SlotDefinition:
"""
Derive a slot from a slot derivation.
"""
populated_from = slot_derivation.populated_from
if not populated_from:
populated_from = slot_derivation.name
source_slot = self.source_schemaview.get_slot(populated_from)
if source_slot is None:
target_slot = SlotDefinition(name=slot_derivation.name)
else:
target_slot = copy(source_slot)
target_slot.from_schema = None
target_slot.owner = None
target_slot.name = slot_derivation.name
if slot_derivation.range:
target_slot.range = slot_derivation.range
if slot_derivation.target_definition:
spec_defn = SlotDefinition(name=target_slot.name, **slot_derivation.target_definition)
for k, v in vars(spec_defn).items():
setattr(target_slot, k, v)
if slot_derivation.unit_conversion:
target_slot.unit = UnitOfMeasure(ucum_code=slot_derivation.unit_conversion.target_unit)
if slot_derivation.stringification:
if slot_derivation.stringification.reversed:
target_slot.multivalued = True
else:
target_slot.multivalued = False
if slot_derivation.dictionary_key:
target_slot.inlined = True
target_slot.inlined_as_list = False
self.slot_info[(target_slot.range, slot_derivation.dictionary_key)] = {"identifier": True}
if slot_derivation.cast_collection_as:
if slot_derivation.cast_collection_as == CollectionType.MultiValued:
target_slot.inlined = True
elif slot_derivation.cast_collection_as == CollectionType.MultiValuedList:
target_slot.inlined_as_list = True
elif slot_derivation.cast_collection_as == CollectionType.MultiValuedDict:
target_slot.inlined = True
target_slot.inlined_as_list = False
if slot_derivation.overrides:
curr = json_dumper.to_dict(target_slot)
for k, v in slot_derivation.overrides.items():
curr[k] = v
target_slot = SlotDefinition(**curr)
return target_slot
def _rewire_class(self, class_definition: ClassDefinition) -> None:
if class_definition.is_a:
class_definition.is_a = self._rewire_parent(class_definition, class_definition.is_a)
mixins = [self._rewire_parent(class_definition, m) for m in class_definition.mixins]
class_definition.mixins = [m for m in mixins if m is not None]
def _rewire_parent(self, class_definition: ClassDefinition, parent: ClassDefinitionName) -> str | None:
if parent in self.source_to_target_class_mappings:
new_parents = self.source_to_target_class_mappings[parent]
if len(new_parents) > 1:
msg = f"Cannot rewire to non-isomorphic mappings {parent} => {new_parents}"
raise ValueError(msg)
if len(new_parents) == 1:
return new_parents[0]
parent_cls = self.source_schemaview.get_class(parent)
if parent_cls.is_a:
return self._rewire_parent(class_definition, parent_cls.is_a)
return None
def copy_attributes(
self,
target_element: Element,
source_element: Element,
copy_directive: CopyDirective,
) -> None:
"""
Copy attributes from source to target according to a directive.
:param target_element:
:param source_element:
:param copy_directive:
:return:
"""
for k, v in vars(source_element).items():
included = False
if copy_directive.include_all:
included = True
if k in copy_directive.include:
included = True
if k in copy_directive.exclude:
included = False
if included:
setattr(target_element, k, v)
|