Source code for linkml.utils.schemaloader

import logging
import os
from collections import OrderedDict
from copy import deepcopy
from pathlib import Path
from typing import Dict, Iterator, List, Mapping, Optional, Set, TextIO, Tuple, Union, cast
from urllib.parse import urlparse

from jsonasobj2 import values
from linkml_runtime.linkml_model.meta import (
    ClassDefinition,
    ClassDefinitionName,
    ElementName,
    EnumDefinition,
    EnumDefinitionName,
    SchemaDefinition,
    SlotDefinition,
    SlotDefinitionName,
    TypeDefinition,
    TypeDefinitionName,
)
from linkml_runtime.utils.context_utils import parse_import_map
from linkml_runtime.utils.formatutils import camelcase, mangled_attribute_name, sfx, underscore
from linkml_runtime.utils.metamodelcore import Bool
from linkml_runtime.utils.namespaces import Namespaces
from linkml_runtime.utils.yamlutils import TypedNode

from linkml.utils.mergeutils import merge_classes, merge_schemas, merge_slots, slot_usage_name
from linkml.utils.rawloader import load_raw_schema
from linkml.utils.schemasynopsis import SchemaSynopsis


[docs]class SchemaLoader: def __init__( self, data: Union[str, TextIO, SchemaDefinition, dict, Path], base_dir: Optional[str] = None, namespaces: Optional[Namespaces] = None, useuris: Optional[bool] = None, importmap: Optional[Mapping[str, str]] = None, logger: Optional[logging.Logger] = None, mergeimports: Optional[bool] = True, emit_metadata: Optional[bool] = True, source_file_date: Optional[str] = None, source_file_size: Optional[int] = None, ) -> None: """Constructor - load and process a YAML or pre-processed schema :param data: YAML schema text, python dict loaded from yaml, URL, file name, open file or SchemaDefinition :param base_dir: base directory or URL where Schema came from :param namespaces: namespaces collector :param useuris: True means class_uri and slot_uri are identifiers. False means they are mappings. :param importmap: A map from import entries to URI or file name. :param logger: Target Logger, if any :param mergeimports: True means combine imports into single package. False means separate packages :param emit_metadata: True means include source file, size and date :param source_file_date: modification of source file :param source_file_size: size of source file """ self.logger = logger if logger is not None else logging.getLogger(self.__class__.__name__) if isinstance(data, SchemaDefinition): self.schema = data else: self.schema = load_raw_schema( data, base_dir=base_dir, merge_modules=mergeimports, source_file_date=source_file_date, source_file_size=source_file_size, ) # Map from URI to source and version tuple self.loaded: OrderedDict[str, Tuple[str, str]] = { self.schema.id: (self.schema.source_file, self.schema.version) } self.base_dir = self._get_base_dir(base_dir) self.namespaces = namespaces if namespaces else Namespaces() self.useuris = useuris if useuris is not None else True self.importmap = parse_import_map(importmap, self.base_dir) if importmap is not None else dict() self.source_file_date = source_file_date self.source_file_size = source_file_size self.synopsis: Optional[SchemaSynopsis] = None self.schema_location: Optional[str] = None self.schema_defaults: Dict[str, str] = {} # Map from schema URI to default namespace self.merge_modules = mergeimports self.emit_metadata = emit_metadata
[docs] def resolve(self) -> SchemaDefinition: """Reconcile a loaded schema, applying is_a, mixins, apply_to's and other such things. Also validate the content and load a SchemaSynopsis entry :return: Fully resolved definition """ if not self.schema.default_range: self.schema.default_range = "string" self.logger.info(f"Default_range not specified. Default set to '{self.schema.default_range}'") # Process the namespace declarations if not self.schema.default_prefix: self.schema.default_prefix = sfx(self.schema.id) self.schema_defaults[self.schema.id] = self.schema.default_prefix for prefix in self.schema.prefixes.values(): self.namespaces[prefix.prefix_prefix] = prefix.prefix_reference for cmap in self.schema.default_curi_maps: self.namespaces.add_prefixmap(cmap, include_defaults=False) # Process imports for imp in self.schema.imports: sname = self.importmap.get(str(imp), imp) # Import map may use CURIE # substitute CURIE only if we don't have a local file name with drive letter (windows) if not os.path.splitdrive(sname)[0]: if ":" in sname: # allow mapping of a prefix to a folder/directory toks = sname.split(":") pfx = toks[0] if pfx in self.importmap: sname = os.path.join(self.importmap[pfx], ":".join(toks[1:])) else: sname = self.namespaces.uri_for(sname) sname = self.importmap.get(str(sname), sname) # It may also use URI or other forms import_schemadefinition = load_raw_schema( sname + ".yaml", base_dir=os.path.dirname(self.schema.source_file) if self.schema.source_file else self.base_dir, merge_modules=self.merge_modules, emit_metadata=self.emit_metadata, ) loaded_schema = (str(sname), import_schemadefinition.version) if import_schemadefinition.id in self.loaded: # If we've already loaded this, make sure that we've got the same version if self.loaded[import_schemadefinition.id][1] != loaded_schema[1]: self.raise_value_error( f"Schema {import_schemadefinition.name} - version mismatch", import_schemadefinition.name, ) # Note: for debugging purposes we also check whether the version # came from the same spot. This should be loosened to # version only once we're sure that everything is working # TODO: The test below needs review -- there are cases where it # fails because self.loaded[...][0] has the full path name # and loaded_schema[0] is just the local name # if self.loaded[import_schemadefinition.id] != loaded_schema: # self.raise_value_error(f"Schema imported from different files: " # f"{self.loaded[import_schemadefinition.id][0]} : {loaded_schema[0]}") else: self.loaded[import_schemadefinition.id] = loaded_schema merge_schemas( self.schema, import_schemadefinition, imp, self.namespaces, merge_imports=self.merge_modules, ) self.schema_defaults[import_schemadefinition.id] = import_schemadefinition.default_prefix if not self.namespaces._default: if "://" in self.schema.default_prefix: self.namespaces._default = self.schema.default_prefix elif self.schema.default_prefix in self.namespaces: self.namespaces._default = self.namespaces[self.schema.default_prefix] else: self.raise_value_error( f"Default prefix: {self.schema.default_prefix} is not defined", self.schema.default_prefix, ) self.namespaces._base = ( self.schema.default_prefix if ":" in self.schema.default_prefix else self.namespaces[self.schema.default_prefix] ) # Promote embedded attribute definitions to first class slots. for cls in self.schema.classes.values(): for attribute in cls.attributes.values(): mangled_slot_name = mangled_attribute_name(cls.name, attribute.name) if mangled_slot_name in self.schema.slots: # mangled names are overwritten if a schema with attributes is passed in # TODO: handle this in a more graceful way # see https://github.com/linkml/linkml/issues/872 logging.warning( f'Class: "{cls.name}" attribute "{attribute.name}" - ' f"mangled name: {mangled_slot_name} already exists", ) new_slot = SlotDefinition(**attribute.__dict__) new_slot.domain_of.append(cls.name) new_slot.imported_from = cls.imported_from new_slot.from_schema = cls.from_schema if not new_slot.alias: new_slot.alias = attribute.name new_slot.name = mangled_slot_name self.schema.slots[new_slot.name] = new_slot cls.slots.append(mangled_slot_name) # Assign class slot ownership for cls in self.schema.classes.values(): if not isinstance(cls, ClassDefinition): name = cls["name"] if "name" in cls else "Unknown" self.raise_value_error( f'Class "{name} (type: {type(cls)})" definition is not a class definition', name, ) if isinstance(cls.slots, str): self.logger.warning(f"File: {self.schema.source_file} Class: {cls.name} Slots are not an array") cls.slots = [cls.slots] for slotname in cls.slots: if slotname in self.schema.slots: slot = self.schema.slots[cast(SlotDefinitionName, slotname)] slot.owner = cls.name if cls.name not in slot.domain_of: slot.domain_of.append(cls.name) else: self.raise_value_error(f'Class "{cls.name}" - unknown slot: "{slotname}"', slotname) # Process slots defined as slot usages self.process_slot_usage_definitions() # Massage initial set of slots for slot in self.schema.slots.values(): # Propagate domain to containing class if slot.domain and slot.domain in self.schema.classes: if slot.name not in self.schema.classes[slot.domain].slots: slot.owner = slot.name # self.schema.classes[slot.domain].slots.append(slot.name) elif slot.domain: self.raise_value_error( f"slot: {slot.name} - unrecognized domain ({slot.domain})", slot.domain, ) # Validate the slot range if ( slot.range is not None and slot.range not in self.schema.types and slot.range not in self.schema.classes and slot.range not in self.schema.enums ): self.raise_value_error(f"slot: {slot.name} - unrecognized range ({slot.range})", slot.range) # check constraints for usage of equals_string and equals_string_in self._check_equals_string(slot) # apply to --> mixins for cls in self.schema.classes.values(): for apply_to_cls in cls.apply_to: if apply_to_cls in self.schema.classes: self.schema.classes[apply_to_cls].mixins.append(cls.name) else: self.raise_value_error( f'Class "{cls.name}" unknown apply_to target: {apply_to_cls}', apply_to_cls, ) # Class URI's also count as (trivial) mappings if cls.class_uri is not None: cls.mappings.insert(0, cls.class_uri) if cls.class_uri is None or not self.useuris: from_schema = cls.from_schema if from_schema is None: from_schema = self.schema.id # if cls.from_schema is None: # raise Exception(f"Class has no from_schema: {cls}") suffixed_cls_schema = sfx(from_schema) cls.class_uri = self.namespaces.uri_or_curie_for( self.schema_defaults.get(cls.from_schema, suffixed_cls_schema), camelcase(cls.name), ) # Get the inverse ducks all in a row before we start filling other stuff in for slot in self.schema.slots.values(): if slot.inverse: inverse_slot = self.schema.slots.get(slot.inverse, None) if inverse_slot: if not inverse_slot.inverse: inverse_slot.inverse = slot.name elif inverse_slot.inverse != slot.name: self.raise_value_error( f"Slot {slot.name}.inverse ({slot.inverse}) does not match " f"slot {inverse_slot.name}.inverse ({inverse_slot.inverse})" ) else: self.raise_value_error(f"Slot {slot.name}.inverse ({slot.inverse}) is not defined") # Update slots with parental information merged_slots: List[SlotDefinitionName] = [] for slot in self.schema.slots.values(): if not slot.from_schema: slot.from_schema = self.schema.id self.merge_slot(slot, merged_slots) # Add default ranges if slot.range is None: # Inverses will be handled later on in the process if not slot.inverse: slot.range = self.schema.default_range # Update enums for enum in self.schema.enums.values(): if not enum.from_schema: enum.from_schema = self.schema.id # TODO: Need to add "is_a" to enums # self.merge_enum(enum, merged_enums) # Process the slot_usages for cls in self.schema.classes.values(): self.process_slot_usages(cls) if not cls.from_schema: cls.from_schema = self.schema.id # Merge class with its mixins and the like merged_classes: List[ClassDefinitionName] = [] for cls in self.schema.classes.values(): self.merge_class(cls, merged_classes) # Update types with parental information merged_types: List[TypeDefinitionName] = [] for typ in self.schema.types.values(): if not typ.base and not typ.typeof: self.raise_value_error( f'type "{typ.name}" must declare a type base or parent (typeof)', typ.name, ) if not typ.typeof and not typ.uri: self.raise_value_error(f'type "{typ.name}" does not declare a URI', typ.name) self.merge_type(typ, merged_types) if not typ.from_schema: typ.from_schema = self.schema.id # Update the subsets as needed for ss in self.schema.subsets.values(): if not ss.from_schema: ss.from_schema = self.schema.id # Massage initial set of slots for slot in self.schema.slots.values(): # Keys and identifiers must be present if bool(slot.key or slot.identifier): if slot.required is None: slot.required = True elif not slot.required: self.raise_value_error( f"slot: {slot.name} - key and identifier slots cannot be optional", slot.name, ) if slot.key and slot.identifier: self.raise_value_error( f"slot: {slot.name} - A slot cannot be both a key and identifier at the same time", slot.name, ) # Propagate domain to containing class if slot.domain and slot.domain in self.schema.classes: if slot.name not in self.schema.classes[slot.domain].slots and not slot.owner: slot.owner = slot.name # Slot domains to not appear # self.schema.classes[slot.domain].slots.append(slot.name) elif slot.domain: self.raise_value_error( f"slot: {slot.name} - unrecognized domain ({slot.domain})", slot.domain, ) # Keys and identifiers must be present if bool(slot.key or slot.identifier): if slot.required is None: slot.required = True elif not slot.required: self.raise_value_error( f"slot: {slot.name} - key and identifier slots cannot be optional", slot.name, ) # Validate the slot range if ( slot.range is not None and slot.range not in self.schema.types and slot.range not in self.schema.classes and slot.range not in self.schema.enums ): self.raise_value_error(f"slot: {slot.name} - unrecognized range ({slot.range})", slot.range) # check constraints for usage of equals_string and equals_string_in self._check_equals_string(slot) # Massage classes, propagating class slots entries domain back to the target slots for cls in self.schema.classes.values(): if not isinstance(cls, ClassDefinition): name = cls["name"] if "name" in cls else "Unknown" self.raise_value_error(f'Class "{name} (type: {type(cls)})" definition is not a class definition') if isinstance(cls.slots, str): self.logger.warning(f"File: {self.schema.source_file} Class: {cls.name} Slots are not an array") cls.slots = [cls.slots] for slotname in cls.slots: if slotname in self.schema.slots: slot = self.schema.slots[cast(SlotDefinitionName, slotname)] else: self.raise_value_error(f'Class "{cls.name}" - unknown slot: "{slotname}"', slotname) for slot in self.schema.slots.values(): if slot.from_schema is None: slot.from_schema = self.schema.id # Inline any class definitions that don't have identifiers. Note that keys ARE inlined if slot.range in self.schema.classes: range_class = self.schema.classes[cast(ClassDefinitionName, slot.range)] if slot.inlined_as_list or not any( [self.schema.slots[s].identifier or self.schema.slots[s].key for s in range_class.slots] ): slot.inlined = True if slot.slot_uri is not None: slot.mappings.insert(0, slot.slot_uri) # Assign missing predicates if slot.slot_uri is None or not self.useuris: slot.slot_uri = self.namespaces.uri_or_curie_for( self.schema_defaults.get(slot.from_schema, sfx(slot.from_schema)), self.slot_name_for(slot), ) if slot.subproperty_of and slot.subproperty_of not in self.schema.slots: self.raise_value_error( f'Slot: "{slot.name}" - subproperty_of: "{slot.subproperty_of}" ' f"does not reference a slot definition", slot.subproperty_of, ) # Evaluate any slot inverses def domain_range_alignment(fwd_slot: SlotDefinition, inverse_slot: SlotDefinition) -> bool: """Determine whether the range of fwd_slot is compatible with the domain of inverse_slot""" # TODO: Determine what to do about class and slot hierarchy if fwd_slot.range and fwd_slot.range not in self.schema.classes: raise ValueError( f"Slot '{fwd_slot.name}' range ({fwd_slot.range}) is not an class -- inverse is not possible" ) if fwd_slot.domain: if not inverse_slot.range: inverse_slot.range = fwd_slot.domain elif not domain_range_alignment(fwd_slot, inverse_slot): self.logger.warning(f"Slot: {slot.name} and inverse slot: {inverse_slot.name} are not compatible") return True # Get the inverse domains and ranges sorted for slot in self.schema.slots.values(): if slot.inverse: # Note that the inverse OF the inverse will be caught in this same iterator inverse_slot = self.schema.slots[slot.inverse] if not slot.range: if inverse_slot.domain: slot.range = inverse_slot.domain elif len(inverse_slot.domain_of): if len(inverse_slot.domain_of) > 1: dom_list = ", ".join(inverse_slot.domain_of) self.logger.warning( f"Slot {slot.name}.inverse ({inverse_slot.name}), " f"has multi domains ({dom_list}) Multi ranges not yet implemented" ) slot.range = inverse_slot.domain_of[0] else: raise ValueError( f"Unable to determine the range of slot `{slot.name}'. " f"Its inverse ({inverse_slot.name}) has no declared domain" ) elif not inverse_slot.domain and len(inverse_slot.domain_of) == 0: inverse_slot.domain = slot.range elif slot.range not in (inverse_slot.domain, inverse_slot.domain_of): self.logger.warning( f"Range of slot '{slot.name}' ({slot.range}) " f"does not line with the domain of its inverse ({inverse_slot.name})" ) # Check for duplicate class and type names def check_dups(s1: Set[ElementName], s2: Set[ElementName]) -> Tuple[List[ElementName], str]: if s1.isdisjoint(s2): return [], "" # Return an ordered list of d1/d1 tuples # For some curious reason, s1.intersection(s2) and s2.intersection(s1) BOTH yield s1 elements dups = sorted(s1.intersection(s2)) dup_locs = list() for dup in dups: dup_locs += [s1e for s1e in s1 if s1e == dup] dup_locs += [s2e for s2e in s2 if s2e == dup] return dup_locs, ", ".join(dups) classes = set(self.schema.classes.keys()) self.validate_item_names("class", classes) slots = set(self.schema.slots.keys()) self.validate_item_names("slot", slots) types = set(self.schema.types.keys()) self.validate_item_names("type", types) subsets = set(self.schema.subsets.keys()) self.validate_item_names("subset", subsets) enums = set(self.schema.enums.keys()) self.validate_item_names("enum", enums) # Check that the default range is valid default_range_needed = any(slot.range == self.schema.default_range for slot in self.schema.slots.values()) if ( default_range_needed and self.schema.default_range not in self.schema.types and self.schema.default_range not in self.schema.classes ): raise ValueError(f'Unknown default range: "{self.schema.default_range}"') # We are currently limited to one key per class for cls in self.schema.classes.values(): class_slots = [] for sn in cls.slots: slot = self.schema.slots[sn] if slot.key or slot.identifier: class_slots.append(sn) if len(class_slots) > 1: self.raise_value_error( f'Class "{cls.name}" - multiple keys/identifiers not allowed ({", ".join(class_slots)})', class_slots[1], ) # Check out all the namespaces self.check_prefixes() # Cannot have duplicate class or type keys dups, items = check_dups(types, classes) if items: self.raise_value_errors(f"Overlapping type and class names: {items}", dups) dups, items = check_dups(enums, classes) if items: self.raise_value_errors(f"Overlapping enum and class names: {items}", dups) dups, items = check_dups(types, enums) if items: self.raise_value_errors(f"Overlapping type and enum names: {items}", dups) dups, items = check_dups(slots, classes) if items: self.logger_warning(f"Overlapping slot and class names: {items}", dups) dups, items = check_dups(subsets, classes) if items: self.logger_warning(f"Overlapping subset and class names: {items}", dups) dups, items = check_dups(types, slots) if items: self.logger_warning(f"Overlapping type and slot names: {items}", dups) dups, items = check_dups(subsets, slots) if items: self.logger_warning(f"Overlapping subset and slot names: {items}", dups) dups, items = check_dups(subsets, types) if items: self.logger_warning(f"Overlapping subset and type names: {items}", dups) dups, items = check_dups(enums, slots) if items: self.logger_warning(f"Overlapping enum and slot names: {items}", dups) dups, items = check_dups(subsets, enums) if items: self.logger_warning(f"Overlapping subset and enum names: {items}", dups) # Check over the various enumeration constraints for enum in self.schema.enums.values(): if enum.code_set_version: if enum.code_set_tag: self.raise_value_errors( f'Enum: "{enum.name}" cannot have both version and tag', [enum.code_set_version, enum.code_set_tag], ) if not enum.code_set: self.raise_value_error( f'Enum: "{enum.name}" needs a code set to have a version', enum.name, ) if enum.code_set_tag: if not enum.code_set: self.raise_value_error(f'Enum: "{enum.name}" needs a code set to have a tag', enum.name) if enum.pv_formula: if not enum.code_set: self.raise_value_error( f'Enum: "{enum.name}" needs a code set to have a formula', enum.name, ) if enum.permissible_values: self.raise_value_error( f'Enum: "{enum.name}" can have a formula or permissible values but not both', enum.name, ) for slot in self.schema.slots.values(): if slot.range and slot.range in self.schema.enums: if slot.inlined or slot.inlined_as_list: self.raise_value_error( f'Slot: "{slot.name}" enumerations cannot be inlined', slot.range, ) # Make the source file relative if it is locally generated self.schema_location = self.schema.source_file if self.schema.source_file and "://" not in self.schema.source_file: self.schema.source_file = os.path.basename(self.schema.source_file) # Make sure there is only one tree_root tree_root = None for cls in self.schema.classes.values(): if cls.tree_root: if tree_root is not None: self.logger.warning(f"Duplicate tree_root: {cls.name} with {tree_root}") else: tree_root = cls.name self.synopsis = SchemaSynopsis(self.schema) errs = self.synopsis.errors() if errs: print("Warning: The following errors were encountered in the schema") for errline in errs: print("\t" + errline) print() for subset, referees in self.synopsis.subsetrefs.items(): if subset not in self.schema.subsets: self.raise_value_error(f"Subset: {subset} is not defined", subset) return self.schema
[docs] def validate_item_names(self, typ: str, names: List[str]) -> None: # TODO: add a more rigorous syntax check for item names for name in names: if ":" in name: raise self.raise_value_error(f'{typ}: "{name}" - ":" not allowed in identifier', name)
[docs] def merge_enum(self, enum: EnumDefinition, merged_enums: List[EnumDefinitionName]) -> None: """ Merge parent enumeration information into target enum :param enum: target enumeration :param merged_enums: list of enum names that have been merged. Used to do distal ancestor resolution """ if enum.name not in merged_enums: merged_enums.append(enum.name) if enum.is_a: if enum.is_a in self.schema.enums: self.merge_enum(self.schema.enums[enum.is_a], merged_enums) # merge_enums(self.schema, enum, self.schema.enums[enum.is_a], False) else: self.raise_value_error( f'Enum: "{enum.name}" - unknown is_a reference: {enum.is_a}', enum.is_a, )
[docs] def merge_slot(self, slot: SlotDefinition, merged_slots: List[SlotDefinitionName]) -> None: """ Merge parent slot information into target slot :param slot: target slot :param merged_slots: list of slot names that have been merged. Used to do a distal ancestor resolution """ if slot.name not in merged_slots: if slot.is_a: try: if slot.is_a in self.schema.slots: self.merge_slot(self.schema.slots[slot.is_a], merged_slots) merge_slots(slot, self.schema.slots[slot.is_a]) else: self.raise_value_error( f'Slot: "{slot.name}" - unknown is_a reference: {slot.is_a}', slot.is_a, ) except RecursionError: self.raise_value_error( f'Slot: "{slot.name}" - recursive is_a reference: {slot.is_a}', slot.is_a, ) for mixin in slot.mixins: if mixin in self.schema.slots: self.merge_slot(self.schema.slots[mixin], merged_slots) merge_slots(slot, self.schema.slots[mixin]) else: self.raise_value_error(f'Slot: "{slot.name}" - unknown mixin reference: {mixin}', mixin) merged_slots.append(slot.name)
[docs] def merge_class(self, cls: ClassDefinition, merged_classes: List[ClassDefinitionName]) -> None: """ Merge parent class information into target class :param cls: target class :param merged_classes: list of class names that have been merged. Used to do distal ancestor resolution """ if cls.name not in merged_classes: merged_classes.append(cls.name) if cls.is_a: if cls.is_a in self.schema.classes: self.merge_class(self.schema.classes[cls.is_a], merged_classes) merge_classes(self.schema, cls, self.schema.classes[cls.is_a], False) else: self.raise_value_error( f'Class: "{cls.name}" - unknown is_a reference: {cls.is_a}', cls.is_a, ) for mixin in cls.mixins: # Note that apply_to has been injected as a faux mixin, so it gets covered here if mixin in self.schema.classes: self.merge_class(self.schema.classes[mixin], merged_classes) merge_classes(self.schema, cls, self.schema.classes[mixin], True) else: self.raise_value_error(f'Class: "{cls.name}" - unknown mixin reference: {mixin}', mixin)
[docs] def process_slot_usage_definitions(self): """ Slot usages can be used to completely define slots. Iterate over the class hierarchy finding all slot definitions that are introduced strictly as usages and add them to the slots component """ visited: Set[ClassDefinitionName] = set() visited_usages: Set[SlotDefinitionName] = set() # Slots that are or will be mangled def located_aliased_parent_slot(owning_class: ClassDefinition, usage_slot: SlotDefinition) -> bool: """Determine whether we are overriding an attributes style slot in the parent class Preconditions: usage_slot is NOT in schema.slots """ usage_attribute_name = mangled_attribute_name(owning_class.name, usage_slot.name) if owning_class.is_a: parent_slot_name = mangled_attribute_name(owning_class.is_a, usage_slot.name) if parent_slot_name in self.schema.slots or parent_slot_name in visited_usages: usage_slot.is_a = parent_slot_name visited_usages.add(usage_attribute_name) return True for mixin in owning_class.mixins: mixin_slot_name = mangled_attribute_name(mixin, usage_slot.name) if mixin_slot_name in self.schema.slots or mixin_slot_name in visited_usages: usage_slot.is_a = mixin_slot_name visited_usages.add(usage_attribute_name) return True return False def visit(classname: ClassDefinitionName) -> None: cls = self.schema.classes.get(classname) if cls and cls.name not in visited: if cls.is_a: visit(cls.is_a) for mixin in cls.mixins: visit(mixin) for slot_usage in values(cls.slot_usage): if slot_usage.alias: self.raise_value_error( f'Class: "{cls.name}" - alias not permitted in slot_usage slot:' f" {slot_usage.alias}" ) if not located_aliased_parent_slot(cls, slot_usage): if slot_usage.name not in self.schema.slots: self.logger.info( f'class "{cls.name}" slot "{slot_usage.name}" ' f"does not reference an existing slot. New slot was created." ) # TODO: Consider tightening this up and only allowing usages on defined slots self.schema.slots[slot_usage.name] = slot_usage else: # TODO Make sure that the slot_usage.name is legal (occurs in an ancestor of the class pass visited.add(classname) for classname in self.schema.classes.keys(): visit(classname)
[docs] def process_slot_usages(self, cls: ClassDefinition) -> None: """ Connect any slot usage items :param cls: class to process :return: usage item """ for slotname, slot_usage in cls.slot_usage.items(): if slot_usage.alias: self.raise_value_error( f'Class: "{cls.name}" - alias not permitted in slot_usage slot:' f" {slot_usage.alias}" ) # Construct a new slot # If we've already assigned a parent, use it if slotname in self.schema.slots: base_slot = self.schema.slots[slotname] else: logging.error(f"slot_usage for undefined slot: {slotname}") base_slot = None parent_slot = self.schema.slots.get(slot_usage.is_a) # Follow the ancestry of the class to get the most proximal parent if not parent_slot: parent_slot = self.slot_definition_for(slotname, cls) if not parent_slot and slotname in self.schema.slots: parent_slot = self.schema.slots[slotname] if not parent_slot: # This test is here because it is really easy to break things in the slot merge utilities. It should # stay self.logger.error(f'class "{cls.name}" slot "{slotname}" -- error occurred. This should not happen') else: child_name = slot_usage_name(slotname, cls) slot_alias = parent_slot.alias if parent_slot.alias else slotname new_slot = SlotDefinition( name=child_name, alias=slot_alias, domain=cls.name, is_usage_slot=Bool(True), usage_slot_name=slotname, owner=cls.name, domain_of=[cls.name], imported_from=cls.imported_from, ) self.schema.slots[child_name] = new_slot merge_slots( new_slot, slot_usage, inheriting=False, skip=[ "name", "alias", "domain", "is_usage_slot", "usage_slot_name", "owner", "domain_of", ], ) # Copy the parent definition. If there is no parent definition, the slot is being defined # locally as a slot_usage if parent_slot is not None: new_slot.is_a = parent_slot.name merge_slots(new_slot, parent_slot) # This situation occurs when we are doing chained overrides. Kludgy, but it works... if parent_slot.name in cls.slots: if child_name in cls.slots: del cls.slots[cls.slots.index(child_name)] cls.slots[cls.slots.index(parent_slot.name)] = child_name elif child_name not in cls.slots: cls.slots.append(child_name) elif not new_slot.range: new_slot.range = self.schema.default_range # copy base slot metalsot values across, except where already # populated/overridden, OR where propagation to induced slots is # forbidden (inverses) if base_slot is not None: for metaslot_name in base_slot.__dict__.keys(): current_val = getattr(new_slot, metaslot_name) if not current_val and metaslot_name not in ["inverse"]: new_val = deepcopy(getattr(base_slot, metaslot_name)) if new_val: setattr(new_slot, metaslot_name, new_val)
[docs] def merge_type(self, typ: TypeDefinition, merged_types: List[TypeDefinitionName]) -> None: """ Merge parent type information into target type :param typ: target type :param merged_types: list of type names that have bee merged. """ if typ.name not in merged_types: if typ.typeof: if typ.typeof in self.schema.types: reftyp = self.schema.types[cast(TypeDefinitionName, typ.typeof)] self.merge_type(reftyp, merged_types) merge_slots(typ, reftyp, [SlotDefinitionName("imported_from")]) else: self.raise_value_error( f'Type: "{typ.name}" - unknown typeof reference: {typ.typeof}', typ.typeof, ) merged_types.append(typ.name)
[docs] def schema_errors(self) -> List[str]: return self.synopsis.errors() if self.synopsis else ["resolve() must be run before error check"]
[docs] def slot_definition_for(self, slotname: SlotDefinitionName, cls: ClassDefinition) -> Optional[SlotDefinition]: """Find the most proximal definition for slotname in the context of cls""" if cls.is_a: if cls.is_a not in self.schema.classes: self.raise_value_error(f"Unknown parent class: {cls.is_a}", cls.is_a) for sn in self.schema.classes[cls.is_a].slots: slot = self.schema.slots[sn] if (slot.usage_slot_name and slotname == slot.usage_slot_name) or ( not slot.usage_slot_name and slotname == slot.name ): return slot for mixin in cls.mixins: if mixin not in self.schema.classes: self.raise_value_error(f"Unknown mixin class: {mixin}", cls.is_a) for sn in self.schema.classes[mixin].slots: slot = self.schema.slots[sn] if slot.alias and slotname == slot.alias or slotname == slot.name: return slot if cls.is_a: defn = self.slot_definition_for(slotname, self.schema.classes[cls.is_a]) if defn: return defn for mixin in cls.mixins: defn = self.slot_definition_for(slotname, self.schema.classes[mixin]) if defn: return defn return None
[docs] def check_prefixes(self) -> None: """ Iterate over the entire schema checking all prefixes """ self.check_prefix(self.schema.default_prefix) for prefix in self.schema.emit_prefixes: self.check_prefix(prefix) for typ in self.schema.types.values(): self.check_prefix(typ.uri) for prefix in typ.mappings: self.check_prefix(prefix) for prefix in typ.id_prefixes: self.check_prefix(prefix) for slot in self.schema.slots.values(): self.check_prefix(slot.slot_uri) for prefix in slot.mappings: self.check_prefix(prefix) for prefix in slot.id_prefixes: self.check_prefix(prefix) for cls in self.schema.classes.values(): self.check_prefix(cls.class_uri) # Class URI's are inserted into mappings -- see line ~#184 for prefix in cls.mappings: if prefix != cls.class_uri: self.check_prefix(prefix) for prefix in cls.id_prefixes: self.check_prefix(prefix)
[docs] def check_prefix(self, prefix_or_curie_or_uri: str) -> None: prefix = self.namespaces.prefix_for(prefix_or_curie_or_uri, case_shift=False) if prefix: if prefix not in self.namespaces: self.logger.warning(f"{TypedNode.yaml_loc(prefix_or_curie_or_uri)}Unrecognized prefix: {prefix}") self.namespaces[prefix] = f"http://example.org/UNKNOWN/{prefix}/" else: case_adjusted_prefix = self.namespaces.prefix_for(prefix_or_curie_or_uri, case_shift=True) if case_adjusted_prefix != prefix: self.logger.warning( f"{TypedNode.yaml_loc(prefix_or_curie_or_uri)}" f"Prefix case mismatch - supplied: {prefix} " f"expected: {case_adjusted_prefix}" )
[docs] @staticmethod def slot_name_for(slot: SlotDefinition) -> str: return underscore(slot.alias if slot.alias else slot.name)
[docs] @staticmethod def raise_value_error(error: str, loc_str: Optional[Union[TypedNode, str]] = None) -> None: SchemaLoader.raise_value_errors(error, loc_str)
[docs] @staticmethod def raise_value_errors(error: str, loc_str: Optional[Union[str, TypedNode, Iterator[TypedNode]]]) -> None: if isinstance(loc_str, list): locs = "\n".join(TypedNode.yaml_loc(e, suffix="") for e in loc_str) raise ValueError(f"{locs} {error}") else: raise ValueError(f'{TypedNode.yaml_loc(loc_str, suffix="")} {error}')
[docs] def logger_warning( self, warning: str, loc_str: Optional[Union[str, TypedNode, Iterator[TypedNode]]], ) -> None: if isinstance(loc_str, list): locs = "\n\t".join(TypedNode.yaml_loc(e, suffix="") for e in loc_str) self.logger.warning(f"{warning}\n\t{locs}") else: self.logger.warning(f'{warning}\n\t{TypedNode.yaml_loc(loc_str, suffix="")}')
def _get_base_dir(self, stated_base: str) -> Optional[str]: if stated_base: return stated_base elif self.schema.source_file: if "://" in self.schema.source_file: parsed_url = urlparse(self.schema.source_file) self.schema.source_file = parsed_url.path.rsplit("/", 1)[-1] return parsed_url.path.split("/", 1)[0] else: rval = os.path.dirname(os.path.abspath(self.schema.source_file)) return rval else: return None def _check_equals_string(self, slot: SlotDefinition): if slot.equals_string or slot.equals_string_in: # Range "string" mandatory for "equals_string" and "equals_string_in" range = slot.range if not range: # range is not defined --> check default range range = self.schema.default_range if range != "string": self.raise_value_error( f"slot: {slot.name} - 'equals_string' and 'equals_string_in' requires range " f"'string' and not range '{range}'", slot.range, ) if slot.any_of: # It is not allowed to use any of and equals_string or equals_string_in in one slot definition, # as both are mapped to sh:in in SHACL self.raise_value_error( f"slot: {slot.name} - 'equals_string'/'equals_string_in' and 'any_of' are mutually exclusive", slot.name, )