"""
Base class for all generators
Developer Notes
---------------
Subclasses of this class implement specific generators. Each generator is in one of two styles:
1. schemaloader-based, using a Visitor pattern (older)
2. schemaview-based, no visitor pattern (newer)
New generators should always using the latter approach
See: https://github.com/linkml/linkml/issues/923
"""
import abc
import logging
import os
import re
import sys
from dataclasses import dataclass, field
from functools import lru_cache
from pathlib import Path
from typing import Callable, ClassVar, Dict, List, Mapping, Optional, Set, TextIO, Type, Union, cast
import click
from click import Argument, Command, Option
from linkml_runtime import SchemaView
from linkml_runtime.linkml_model.meta import (
ClassDefinition,
ClassDefinitionName,
Definition,
Element,
ElementName,
EnumDefinition,
EnumDefinitionName,
PrefixPrefixPrefix,
SchemaDefinition,
SlotDefinition,
SlotDefinitionName,
SubsetDefinition,
SubsetDefinitionName,
TypeDefinition,
TypeDefinitionName,
)
from linkml_runtime.utils.formatutils import camelcase, underscore
from linkml_runtime.utils.namespaces import Namespaces
from linkml import LOCAL_METAMODEL_YAML_FILE
from linkml.utils.cli_utils import DEFAULT_LOG_LEVEL_INT, log_level_option
from linkml.utils.mergeutils import alias_root
from linkml.utils.schemaloader import SchemaLoader
from linkml.utils.typereferences import References
@lru_cache
def _resolved_metamodel(mergeimports):
if not os.path.exists(LOCAL_METAMODEL_YAML_FILE):
raise AssertionError(f"{LOCAL_METAMODEL_YAML_FILE} not found")
base_dir = str(Path(str(LOCAL_METAMODEL_YAML_FILE)).parent)
logging.debug(f"BASE={base_dir}")
metamodel = SchemaLoader(
LOCAL_METAMODEL_YAML_FILE,
importmap={"linkml": base_dir},
base_dir=base_dir,
mergeimports=mergeimports,
)
metamodel.resolve()
return metamodel
[docs]@dataclass
class Generator(metaclass=abc.ABCMeta):
"""
Base class for generators
For usage `Generator Docs <https://linkml.io/linkml/generators/>`_
"""
schema: Union[str, TextIO, SchemaDefinition, "Generator", Path]
"""metamodel compliant schema. Can be URI, file name, actual schema, another generator, an
open file or a pre-parsed schema"""
# ClassVars
generatorname: ClassVar[str] = None
""" Name of the generator. Override with os.path.basename(__file__)"""
generatorversion: ClassVar[str] = None # Generator version identifier
"""Version of the generator. Consider deprecating and instead use overall linkml version"""
uses_schemaloader: ClassVar[bool] = True
"""Old-style generator that uses the SchemaLoader and visitor pattern"""
# uses_schemaview: ClassVar[bool] = True
# """New-style generator that uses SchemaView"""
requires_metamodel: ClassVar[bool] = True
"""Generator queries an instance of the metamodel"""
valid_formats: ClassVar[List[str]] = []
"""Allowed formats - first format is default"""
visit_all_class_slots: ClassVar[bool] = False
"""Visitor ClassVar: False means only visit own slots, True means visit all slots"""
visits_are_sorted: ClassVar[bool] = True
"""Visitor ClassVar: True means visit basic types in alphabetial order, false in entry"""
sort_class_slots: ClassVar[bool] = False
"""Visitor ClassVar: True means visit class slots in alphabetical order"""
# Object-level Vars
schemaview: Optional[SchemaView] = None
"""A wrapper onto the schema object"""
format: Optional[str] = None
"""expected output format"""
file_extension: ClassVar[str] = None
metadata: bool = True
"""True means include date, generator, etc. information in source header if appropriate"""
useuris: Optional[bool] = None
"""True means declared class slot uri's are used. False means use model uris"""
log_level: Optional[int] = DEFAULT_LOG_LEVEL_INT
"""Logging level, 0 is minimum"""
mergeimports: Optional[bool] = True
"""True means merge non-linkml sources into importing package. False means separate packages"""
source_file_date: Optional[str] = None
"""Modification date of input source file"""
source_file_size: Optional[int] = None
"""Size of the source file in bytes"""
logger: Optional[logging.Logger] = None
"""Logger to use for logging messages"""
verbose: Optional[bool] = None
"""Verbosity"""
output: Optional[str] = None
"""Path to output file. Note all generators may not implement this
uniformly, see https://github.com/linkml/linkml/issues/923"""
namespaces: Optional[Namespaces] = None
"""All prefix expansions used"""
directory_output: bool = False
"""True means output is to a directory, False is to stdout"""
base_dir: str = None # Base directory of schema
"""Working directory or base URL of sources.
Setting this is necessary for correct retrieval of relative imports."""
metamodel_name_map: Dict[str, str] = None
"""Allows mapping of names of metamodel elements such as slot, etc"""
importmap: Optional[Union[str, Optional[Mapping[str, str]]]] = None
"""File name of import mapping file -- maps import name/uri to target"""
emit_prefixes: Set[str] = field(default_factory=lambda: set())
"""Prefixes to emit, for RDF-based generators"""
metamodel: SchemaLoader = None
"""A SchemaLoader instance that points to the LinkML metamodel (meta.yaml)"""
stacktrace: bool = False
"""True means print stack trace, false just error message"""
include: Optional[Union[str, Path, SchemaDefinition]] = None
"""If set, include extra schema outside of the imports mechanism"""
def __post_init__(self) -> None:
if not self.logger:
self.logger = logging.getLogger()
if self.log_level is not None:
self.logger.setLevel(self.log_level)
if self.format is None:
self.format = self.valid_formats[0]
if self.format not in self.valid_formats:
raise ValueError(f"Unrecognized format: {format}; known={self.valid_formats}")
# legacy: all generators should use "mergeimports"
# self.merge_imports = self.mergeimports
if not self.metadata:
self.source_file_date = None
self.source_file_size = None
if self.requires_metamodel:
self.metamodel = _resolved_metamodel(self.mergeimports)
schema = self.schema
if isinstance(schema, Path):
schema = str(schema)
# TODO: remove aliasing
self.emit_metadata = self.metadata
if self.uses_schemaloader:
self._initialize_using_schemaloader(schema)
else:
logging.info(f"Using SchemaView with im={self.importmap} // base_dir={self.base_dir}")
self.schemaview = SchemaView(schema, importmap=self.importmap, base_dir=self.base_dir)
if self.include:
if isinstance(self.include, (str, Path)):
self.include = SchemaView(self.include, importmap=self.importmap, base_dir=self.base_dir).schema
self.schemaview.merge_schema(self.include)
self.schema = self.schemaview.schema
self._init_namespaces()
def _initialize_using_schemaloader(self, schema: Union[str, TextIO, SchemaDefinition, "Generator"]):
# currently generators are very liberal in what they accept, including
# other generators.
# See https://github.com/linkml/linkml/issues/923 for discussion on how
# to simplify the overall framework
if isinstance(schema, Generator):
gen = schema
self.schema = gen.schema
self.synopsis = gen.synopsis
self.loaded = gen.loaded
self.namespaces = gen.namespaces
self.base_dir = gen.base_dir
self.importmap = gen.importmap
self.source_file_data = gen.source_file_date
self.source_file_size = gen.source_file_size
self.schema_location = gen.schema_location
self.schema_defaults = gen.schema_defaults
self.logger = gen.logger
else:
if isinstance(schema, SchemaDefinition):
# schemaloader based methods require schemas to have been created via SchemaLoader,
# which prepopulates some fields (e.g. definition_url). If the schema has not been processed through the
# loader, then roundtrip
schema = schema._as_dict
loader = SchemaLoader(
schema,
self.base_dir,
useuris=self.useuris,
importmap=self.importmap,
logger=self.logger,
mergeimports=self.mergeimports,
emit_metadata=self.metadata,
source_file_date=self.source_file_date,
source_file_size=self.source_file_size,
)
loader.resolve()
self.schema = loader.schema
self.synopsis = loader.synopsis
self.loaded = loader.loaded
self.namespaces = loader.namespaces
self.base_dir = loader.base_dir
self.importmap = loader.importmap
self.source_file_data = loader.source_file_date
self.source_file_size = loader.source_file_size
self.schema_location = loader.schema_location
self.schema_defaults = loader.schema_defaults
def _init_namespaces(self):
if self.namespaces is None:
self.namespaces = Namespaces()
for prefix in self.schema.prefixes.values():
self.namespaces[prefix.prefix_prefix] = prefix.prefix_reference
[docs] def serialize(self, **kwargs) -> str:
"""
Generate output in the required format
:param kwargs: Generator specific parameters
:return: Generated output
"""
out = ""
# the default is to use the Visitor Pattern; each individual generator may
# choose to override methods {visit,end}_{element}.
# See https://github.com/linkml/linkml/issues/923
sub_out = self.visit_schema(**kwargs)
if sub_out is not None:
out += sub_out
for sn, ss in (
sorted(self.schema.subsets.items(), key=lambda s: s[0].lower())
if self.visits_are_sorted
else self.schema.subsets.items()
):
sub_out = self.visit_subset(ss)
if sub_out is not None:
out += sub_out
for tn, typ in (
sorted(self.schema.types.items(), key=lambda s: s[0].lower())
if self.visits_are_sorted
else self.schema.types.items()
):
sub_out = self.visit_type(typ)
if sub_out is not None:
out += sub_out
for enum in (
sorted(self.schema.enums.values(), key=lambda e: e.name.lower())
if self.visits_are_sorted
else self.schema.enums.values()
):
sub_out = self.visit_enum(enum)
if sub_out is not None:
out += sub_out
for sn, slot in (
sorted(self.schema.slots.items(), key=lambda c: c[0].lower())
if self.visits_are_sorted
else self.schema.slots.items()
):
sub_out = self.visit_slot(self.aliased_slot_name(slot), slot)
if sub_out is not None:
out += sub_out
for cls in (
sorted(self.schema.classes.values(), key=lambda c: c.name.lower())
if self.visits_are_sorted
else self.schema.classes.values()
):
cls_out = self.visit_class(cls)
if cls_out:
if isinstance(cls_out, str):
out += cls_out
for slot in self.all_slots(cls) if self.visit_all_class_slots else self.own_slots(cls):
sub_out = self.visit_class_slot(cls, self.aliased_slot_name(slot), slot)
if sub_out is not None:
out += sub_out
sub_out = self.end_class(cls)
if sub_out is not None:
out += sub_out
sub_out = self.end_schema(**kwargs)
if sub_out is not None:
out += sub_out
return out
[docs] def visit_schema(self, **kwargs) -> Optional[str]:
"""Visited once at the beginning of generation
@param kwargs: Arguments passed through from CLI -- implementation dependent
"""
...
[docs] def end_schema(self, **kwargs) -> Optional[str]:
"""Visited once at the end of generation
@param kwargs: Arguments passed through from CLI -- implementation dependent
"""
...
[docs] def visit_class(self, cls: ClassDefinition) -> Optional[Union[str, bool]]:
"""Visited once per schema class
@param cls: class being visited
@return: Visit slots and end class. False means skip and go on
"""
return True
[docs] def end_class(self, cls: ClassDefinition) -> Optional[str]:
"""Visited after visit_class_slots (if visit_class returned true)
@param cls: class being visited
"""
...
[docs] def visit_class_slot(self, cls: ClassDefinition, aliased_slot_name: str, slot: SlotDefinition) -> Optional[str]:
"""Visited for each slot in a class. If class level visit_all_slots is true, this is visited once
for any class that is inherited (class itself, is_a, mixin, apply_to). Otherwise, just the own slots.
@param cls: containing class
@param aliased_slot_name: Aliased slot name. May not be unique across all class slots
@param slot: being visited
"""
...
[docs] def visit_slot(self, aliased_slot_name: str, slot: SlotDefinition) -> Optional[str]:
"""Visited once for every slot definition in the schema.
@param aliased_slot_name: Aliased name of the slot. May not be unique
@param slot: visited slot
"""
...
[docs] def visit_type(self, typ: TypeDefinition) -> Optional[str]:
"""Visited once for every type definition in the schema
@param typ: Type definition
"""
...
[docs] def visit_subset(self, subset: SubsetDefinition) -> Optional[str]:
"""Visited once for every subset definition in the schema
#param subset: Subset definition
"""
...
[docs] def visit_enum(self, enum: EnumDefinition) -> Optional[str]:
"""Visited once for every enum definition in the schema
@param enum: Enum definition
"""
...
# =============================
# Helper methods
# =============================
[docs] def own_slots(self, cls: Union[ClassDefinitionName, ClassDefinition]) -> List[SlotDefinition]:
"""Return the list of slots owned the class definition. An "own slot" is any ``cls`` slot that does not appear
in the class is_a parent. Own_slots include:
* any slot whose domain is cls
* slot_usage entries
* slots from mixins entries
* slots from apply_to entries
@param cls: class name or class definition name
@return: list of owned slots. List is sorted if sort_class_slots is true, otherwise in class order
"""
if not isinstance(cls, ClassDefinition):
cls = self.schema.classes[cls]
parent = self.schema.classes[cls.is_a] if cls.is_a else None
seen = set()
rval = []
for sname in cls.slots:
sname_base = alias_root(self.schema, sname)
if sname_base not in seen and (not parent or sname not in parent.slots):
slot = self.schema.slots[sname]
rval.append(slot)
seen.add(sname_base)
return sorted(rval, key=lambda s: s.name) if self.sort_class_slots else rval
[docs] def own_slot_names(self, cls: Union[ClassDefinitionName, ClassDefinition]) -> List[SlotDefinitionName]:
return [slot.name for slot in self.own_slots(cls)]
[docs] def all_slots(
self,
cls: Union[ClassDefinitionName, ClassDefinition],
*,
cls_slots_first: bool = False,
seen: Optional[Set[ClassDefinitionName]] = None,
) -> List[SlotDefinition]:
"""Return all slots that are part of the class definition. This includes all is_a, mixin and apply_to slots
but does NOT include slot_usage targets. If class B has a slot_usage entry for slot "s", only the slot
definition for the redefined slot will be included, not its base. Slots are added in the order they appear
in classes, with recursive is_a's being added first followed by mixins and finally apply_tos
@param cls: class definition or class definition name
@param cls_slots_first: True means return own slots at the top of the list
@param seen: List of slots already recorded. Used for internal recursion
@return: ordered list of slots in the class with slot usages removed
"""
if not isinstance(cls, ClassDefinition):
cls = self.schema.classes[cls]
if seen is None:
seen = set()
rval = []
parent = self.schema.classes[cls.is_a] if cls.is_a else None
if cls_slots_first:
for slot in self.own_slots(cls):
sname_base = alias_root(self.schema, slot.name)
if sname_base not in seen:
rval.append(slot)
seen.add(sname_base)
return rval + (self.all_slots(parent, cls_slots_first=cls_slots_first, seen=seen) if parent else [])
else:
for sname in cls.slots:
sname_base = alias_root(self.schema, sname)
if sname_base not in seen:
slot = self.schema.slots[sname]
rval.append(slot)
seen.add(sname_base)
return sorted(rval, key=lambda s: s.name) if self.sort_class_slots else rval
[docs] def parent(
self, element: Union[ClassDefinition, SlotDefinition]
) -> Optional[Union[ClassDefinition, SlotDefinition]]:
"""Return the parent of element, if any"""
return (
None
if element.is_a is None
else (
self.schema.classes[element.is_a]
if isinstance(element, ClassDefinition)
else self.schema.slots[element.is_a]
)
)
[docs] def ancestors(self, element: Union[ClassDefinition, SlotDefinition]) -> List[ElementName]:
"""Return an ordered list of ancestor names for the supplied slot or class
@param element: Slot or class name or definition
@return: Ordered list of ancestor names
"""
return [element.name] + ([] if element.is_a is None else self.ancestors(self.parent(element)))
[docs] def neighborhood(self, elements: Union[str, ElementName, List[ElementName]]) -> References:
"""Return a list of all slots, classes and types that touch any element in elements, including the element
itself
@param elements: Element names to do proximity with
@return: All slots and classes that touch element
"""
if isinstance(elements, (str, ElementName)):
elements = [elements]
touches = References()
for element in elements:
if element in self.schema.classes:
touches.classrefs.add(cast(ClassDefinitionName, element))
cls = self.schema.classes[cast(ClassDefinitionName, element)]
if cls.is_a:
touches.classrefs.add(cls.is_a)
# Mixins include apply_to's
touches.classrefs.update(set(cls.mixins))
for slotname in cls.slots:
slot = self.schema.slots[slotname]
if slot.range in self.schema.classes:
touches.classrefs.add(cast(ClassDefinitionName, slot.range))
elif slot.range in self.schema.types:
touches.typerefs.add(cast(TypeDefinitionName, slot.range))
for cv in self.schema.classes.values():
if cv.is_a == element:
touches.classrefs.add(cv.name)
if element in self.synopsis.rangerefs:
for slotname in self.synopsis.rangerefs[element]:
touches.slotrefs.add(slotname)
if self.schema.slots[slotname].domain:
touches.classrefs.add(self.schema.slots[slotname].domain)
if cls.in_subset:
touches.subsetrefs.update(cls.in_subset)
if element in self.schema.slots:
touches.slotrefs.add(cast(SlotDefinitionName, element))
slot = self.schema.slots[cast(SlotDefinitionName, element)]
touches.slotrefs.update(set(slot.mixins))
if slot.is_a:
touches.slotrefs.add(slot.is_a)
if element in self.synopsis.inverses:
touches.slotrefs.update(self.synopsis.inverses[cast(SlotDefinitionName, element)])
if slot.domain:
touches.classrefs.add(slot.domain)
if slot.range in self.schema.classes:
touches.classrefs.add(cast(ClassDefinitionName, slot.range))
elif slot.range in self.schema.types:
touches.typerefs.add(cast(TypeDefinitionName, slot.range))
if slot.in_subset:
touches.subsetrefs.update(slot.in_subset)
for sv in self.schema.slots.values():
if sv.is_a == element:
touches.slotrefs.add(sv.name)
if element in self.schema.types:
touches.typerefs.add(cast(TypeDefinitionName, element))
typ = self.schema.types[cast(TypeDefinitionName, element)]
if element in self.synopsis.rangerefs:
touches.slotrefs.update(self.synopsis.rangerefs[element])
if typ.typeof:
touches.typerefs.add(cast(TypeDefinitionName, typ.typeof))
if typ.in_subset:
touches.subsetrefs.update(typ.in_subset)
for tv in self.schema.types.values():
if tv.typeof == element:
touches.slotrefs.add(tv.name)
if element in self.schema.subsets:
touches.subsetrefs.add(cast(SubsetDefinitionName, element))
if element in self.synopsis.subsetrefs:
touches.update(self.synopsis.subsetrefs[cast(SubsetDefinitionName, element)])
if not bool(touches):
self.logger.warning(f"neighborhood({element}) - {element} is undefined")
return touches
[docs] def range_type_path(self, typ: TypeDefinition) -> List[str]:
"""
Return a formatted list of range types from the base up
:param typ: type definition whose name is to be formatted
:return: List of possible types with base at the leftmost
"""
formatted_typ_name = self.class_or_type_name(typ.name)
if typ.typeof:
return self.range_type_path(self.schema.types[cast(TypeDefinitionName, typ.typeof)]) + [formatted_typ_name]
elif typ.repr:
return [typ.repr, formatted_typ_name]
else:
return [formatted_typ_name]
[docs] def class_identifier(
self, def_or_name: Union[str, ClassDefinition, TypeDefinition]
) -> Optional[SlotDefinitionName]:
"""
Return the class identifier if any
:param def_or_name: class name or definition
:return: name of class key (or identifier) if one exists
"""
if isinstance(def_or_name, ClassDefinition):
cls = def_or_name
elif def_or_name in self.schema.classes:
cls = self.schema.classes[cast(ClassDefinitionName, def_or_name)]
else:
return None
for slotname in cls.slots:
slot = self.schema.slots[slotname]
if slot.identifier or slot.key:
return slotname
return None
[docs] @staticmethod
def enum_identifier_path(enum_or_enumname: Union[str, EnumDefinition]) -> List[str]:
"""Return an enum_identifier path"""
return [
"str",
camelcase(enum_or_enumname.name if isinstance(enum_or_enumname, EnumDefinition) else enum_or_enumname),
]
[docs] def class_identifier_path(self, cls_or_clsname: Union[str, ClassDefinition], force_non_key: bool) -> List[str]:
"""
Return the path closure to a class identifier if the class has a key and force_non_key is false otherwise
return a dictionary closure.
:param cls_or_clsname: class definition
:param force_non_key: True means inlined even if the class has a key
:return: path
"""
cls = (
cls_or_clsname
if isinstance(cls_or_clsname, ClassDefinition)
else self.schema.classes[ClassDefinitionName(cls_or_clsname)]
)
# Determine whether the class has a key
identifier_slot = None
if not force_non_key:
identifier_slot = self.class_identifier(cls)
# No key or inlined, its closure is a dictionary
if identifier_slot is None:
return ["dict", self.class_or_type_name(cls.name)]
# We're dealing with a reference
pathname = camelcase(cls.name + " " + self.aliased_slot_name(identifier_slot))
if cls.is_a:
parent_identifier_slot = self.class_identifier(cls.is_a)
if parent_identifier_slot:
return self.class_identifier_path(cls.is_a, False) + [pathname]
return self.slot_range_path(identifier_slot) + [pathname]
[docs] def slot_range_path(self, slot_or_name: Union[str, SlotDefinition]) -> List[str]:
"""
Return an ordered list of slot ranges from distal to proximal
:param slot_or_name: slot whose range is being typed
:return: ordered list of types from base type forward
"""
slot = (
slot_or_name
if isinstance(slot_or_name, SlotDefinition)
else self.schema.slots[cast(SlotDefinitionName, slot_or_name)]
)
if slot.range in self.schema.types:
# Type
return self.range_type_path(self.schema.types[cast(TypeDefinitionName, slot.range)])
elif slot.range in self.schema.enums:
return self.enum_identifier_path(slot.range)
else:
# Class
return self.class_identifier_path(slot.range, bool(slot.inlined))
[docs] def aliased_slot_name(self, slot: Union[SlotDefinitionName, SlotDefinition]) -> SlotDefinitionName:
"""Return the overloaded slot name -- the alias if one exists otherwise the actual name
@param slot: either a slot name or a definition
@return: overloaded name
"""
if isinstance(slot, str):
slot = self.schema.slots[cast(SlotDefinitionName, slot)]
return slot.alias if slot.alias else slot.name
[docs] def class_or_type_for(self, name: str) -> Optional[Element]:
"""
Return the corresponding class or type for name
"""
if name in self.schema.classes:
return self.schema.classes[ClassDefinitionName(name)]
elif name in self.schema.types:
return self.schema.types[TypeDefinitionName(name)]
elif name in self.schema.enums:
return self.schema.enums[EnumDefinitionName(name)]
return None
[docs] def class_or_type_name(self, name: str) -> str:
"""
Return the camelcase representation of clsname if it is a valid class or type. Prepend "Unknown"
if the name isn't valid
"""
if name in self.schema.classes:
return camelcase(name)
elif name in self.schema.types:
typ = self.schema.types[cast(TypeDefinitionName, name)]
if typ.typeof:
return camelcase(name)
else:
return typ.base
else:
return "Unknown_" + camelcase(name)
[docs] def slot_for(self, name: str) -> Optional[Element]:
return self.schema.slots.get(name)
[docs] def slot_name(self, name: str) -> str:
"""
Return the underscored version of the aliased slot name if name is a slot. Prepend ``unknown_`` if the name
isn't valid.
"""
slot = self.slot_for(name)
return underscore(self.aliased_slot_name(slot) if slot else ("unknown " + name))
[docs] def subset_for(self, name: str) -> Optional[Element]:
return self.schema.subsets.get(name)
[docs] def subset_name(self, name: str) -> str:
subset = self.subset_for(name)
return ("" if subset else "Unknown_") + camelcase(name)
[docs] def obj_for(self, el_or_elname: str, is_range_name: bool = False) -> Optional[Element]:
if is_range_name:
return (
self.class_or_type_for(el_or_elname)
if el_or_elname in self.schema.classes
or el_or_elname in self.schema.types
or el_or_elname == self.schema.default_range
else None
)
elif el_or_elname in self.schema.slots:
return self.slot_for(cast(SlotDefinitionName, el_or_elname))
elif el_or_elname in self.schema.subsets:
return self.subset_for(el_or_elname)
else:
return self.class_or_type_for(el_or_elname)
[docs] def default_prefix(self) -> Optional[str]:
"""Return the default prefix for the schema
@return: URI or NCNAME of default prefix"""
if "://" in self.schema.default_prefix:
return self.schema.default_prefix
else:
# Basic loader tests for valid default prefix
return self.schema.prefixes[PrefixPrefixPrefix(self.schema.default_prefix)].prefix_reference
# TODO: add lru cache once we get identity into the classes
[docs] def domain_slots(self, cls: ClassDefinition) -> List[SlotDefinition]:
"""Return all slots in the class definition that are owned by the class"""
domain_slots = []
for slot_name in cls.slots:
slot = self.schema.slots[slot_name]
# add any mixin ancestors here so that slots will be distributed to descendents correctly via mixin
# hierarchy.
mixin_ancestors = []
if cls.mixins:
for mixin in cls.mixins:
for ancestor in self.schemaview.class_ancestors(mixin, mixins=False):
if ancestor not in mixin_ancestors:
mixin_ancestors.append(ancestor)
for mixin_ancestor in mixin_ancestors:
if mixin_ancestor not in cls.mixins:
cls.mixins.append(mixin_ancestor)
# Check if the class is in the domain of the slot or if any of its mixins are in the domain
if cls.name in slot.domain_of or (set(cls.mixins).intersection(slot.domain_of)):
domain_slots.append(slot)
return domain_slots
[docs] def add_mappings(self, defn: Definition) -> None:
"""
Process any mappings in defn, adding all of the mappings prefixes to the namespace map
:param defn: Class or Slot Definition
"""
self.add_id_prefixes(defn)
mappings = (
defn.mappings
+ defn.related_mappings
+ defn.close_mappings
+ defn.narrow_mappings
+ defn.broad_mappings
+ defn.exact_mappings
)
# see https://github.com/linkml/linkml/pull/283
if isinstance(defn, ClassDefinition):
mappings.append(defn.class_uri)
if isinstance(defn, SlotDefinition):
mappings.append(defn.slot_uri)
for mapping in mappings:
if "://" in str(mapping):
mcurie = self.namespaces.curie_for(mapping)
if mcurie is None:
self.logger.warning(f"No namespace defined for URI: {mapping}")
return # Absolute path - no prefix/name
else:
mapping = mcurie
if ":" not in mapping or len(mapping.split(":")) != 2:
raise ValueError(f"Definition {defn.name} - unrecognized mapping: {mapping}")
ns = mapping.split(":")[0]
logging.debug(f"Adding {ns} from {mapping} // {defn}")
if ns:
self.add_prefix(ns)
[docs] def add_id_prefixes(self, element: Element) -> None:
for id_prefix in element.id_prefixes:
self.add_prefix(id_prefix)
[docs] def add_prefix(self, ncname: str) -> None:
"""Add a prefix to the list of prefixes to emit
@param ncname: name to add
"""
if ncname not in self.namespaces:
self.logger.warning(f"Unrecognized prefix: {ncname}")
self.namespaces[ncname] = f"http://example.org/UNKNOWN/{ncname}/"
self.emit_prefixes.add(ncname)
[docs] @staticmethod
def is_class_unconstrained(cls: ClassDefinition):
"""
Determine if the class is mapped to typing.Any, i.e the unconstrained class
:param cls: class definition
:return: true if the class is unconstrained
"""
return cls.class_uri == "linkml:Any"
[docs]def shared_arguments(g: Type[Generator]) -> Callable[[Command], Command]:
def verbosity_callback(ctx, param, verbose):
if verbose >= 2:
logging.basicConfig(level=logging.DEBUG, force=True)
elif verbose == 1:
logging.basicConfig(level=logging.INFO, force=True)
def stacktrace_callback(ctx, param, stacktrace):
if not stacktrace:
sys.tracebacklimit = 0
def decorator(f: Command) -> Command:
f.params.append(Argument(("yamlfile",), type=click.Path(exists=True, dir_okay=False)))
f.params.append(
Option(
("--format", "-f"),
type=click.Choice(g.valid_formats),
default=g.valid_formats[0],
show_default=True,
help="Output format",
)
)
f.params.append(
Option(
("--metadata/--no-metadata",),
default=True,
show_default=True,
help="Include metadata in output",
)
)
f.params.append(
Option(
("--useuris/--metauris",),
default=True,
show_default=True,
help="Use class and slot URIs over model uris",
)
)
f.params.append(Option(("--importmap", "-im"), type=click.File(), help="Import mapping file"))
log_level_option(f)
f.params.append(
Option(
("--verbose", "-v"),
count=True,
help="Verbosity. Takes precedence over --log_level.",
callback=verbosity_callback,
)
)
f.params.append(
Option(
("--mergeimports/--no-mergeimports",),
default=True,
help="Merge imports into source file (default=mergeimports)",
)
)
f.params.append(
Option(
("--stacktrace/--no-stacktrace",),
default=False,
show_default=True,
help="Print a stack trace when an error occurs",
callback=stacktrace_callback,
)
)
return f
return decorator