Source code for linkml.generators.shexgen

"""Generate ShEx definition of a model"""

import os
import urllib.parse as urlparse
from dataclasses import dataclass, field
from typing import Optional, Union

import click
from jsonasobj import as_json as as_json_1
from rdflib import OWL, RDF, XSD, Graph, Namespace
from ShExJSG import ShExC
from ShExJSG.SchemaWithContext import Schema
from ShExJSG.ShExJ import IRIREF, EachOf, NodeConstraint, Shape, ShapeOr, TripleConstraint

from linkml import METAMODEL_NAMESPACE, METAMODEL_NAMESPACE_NAME
from linkml._version import __version__
from linkml.generators.common.subproperty import get_subproperty_values
from linkml.utils.generator import Generator, shared_arguments
from linkml_runtime.linkml_model.meta import (
    ClassDefinition,
    ElementName,
    EnumDefinition,
    SlotDefinition,
    SlotDefinitionName,
    TypeDefinition,
)
from linkml_runtime.linkml_model.types import SHEX
from linkml_runtime.utils.formatutils import camelcase, sfx
from linkml_runtime.utils.metamodelcore import URIorCURIE


[docs] @dataclass class ShExGenerator(Generator): # ClassVars generatorname = os.path.basename(__file__) generatorversion = "0.0.2" valid_formats = ["shex", "json", "rdf"] file_extension = "shex.rdf" visit_all_class_slots = False uses_schemaloader = True # ObjectVars shex: Schema = field(default_factory=lambda: Schema()) # ShEx Schema being generated shapes: list = field(default_factory=lambda: []) shape: Optional[Shape] = None # Current shape being defined list_shapes: list[IRIREF] = field(default_factory=lambda: []) # Shapes that have been defined as lists expand_subproperty_of: bool = True """If True, expand subproperty_of to NodeConstraint value lists with slot descendants""" def __post_init__(self): super().__post_init__() if METAMODEL_NAMESPACE_NAME not in self.namespaces: self.namespaces[METAMODEL_NAMESPACE_NAME] = METAMODEL_NAMESPACE self.meta = Namespace( self.namespaces.join(self.namespaces[METAMODEL_NAMESPACE_NAME], "") ) # URI for the metamodel self.base = Namespace(self.namespaces.join(self.namespaces._base, "")) # Base URI for what is being modeled def generate_header(self) -> str: out = f"# metamodel_version: {self.schema.metamodel_version}\n" if self.schema.version: out += f"# version: {self.schema.version}\n" return out def visit_schema(self, **_): # Adjust the schema context to include the base model URI context = self.shex["@context"] self.shex["@context"] = [context, {"@base": self.namespaces._base}] # Emit all of the type definitions for typ in self.schema.types.values(): model_uri = self._class_or_type_uri(typ) if typ.uri: typ_type_uri = self.namespaces.uri_for(typ.uri) if typ_type_uri in (XSD.anyURI, SHEX.iri): self.shapes.append(NodeConstraint(id=model_uri, nodeKind="iri")) elif typ_type_uri == SHEX.nonLiteral: self.shapes.append(NodeConstraint(id=model_uri, nodeKind="nonliteral")) else: self.shapes.append(NodeConstraint(id=model_uri, datatype=self.namespaces.uri_for(typ.uri))) else: typeof_uri = self._class_or_type_uri(typ.typeof) self.shapes.append(Shape(id=model_uri, expression=typeof_uri)) if self.format != "json": return self.generate_header() def visit_class(self, cls: ClassDefinition) -> bool: self.shape = Shape() # Start with all the parent classes, mixins and appytos struct_ref_list = [cls.is_a] if cls.is_a else [] struct_ref_list += cls.mixins if cls.name in self.synopsis.applytorefs: for applier in self.synopsis.applytorefs[cls.name].classrefs: struct_ref_list.append(applier) for sr in struct_ref_list: self._add_constraint(self._class_or_type_uri(sr, "_tes")) self._add_constraint(self._type_arc(self.schema.classes[sr].class_uri, opt=True)) return True def end_class(self, cls: ClassDefinition) -> None: # On entry self.shape contains all of the triple expressions that define the body of the shape # Finish off the shape definition itself # If there is nothing yet, we're at the very root of things. Add in a final catch-all for any additional # type arcs. NOTE: Here is where you can sink other things as well if you want to ignore categories of things if self.shape.expression is None: self._add_constraint(TripleConstraint(predicate=RDF.type, min=0, max=-1)) self.shape.expression.id = self._class_or_type_uri(cls, "_tes") self.shape.expression = EachOf( expressions=[ self.shape.expression, self._type_arc(cls.class_uri, not bool(self.class_identifier(cls))), ] ) self.shape.closed = not (cls.abstract or cls.mixin) # If this class has subtypes, define the class as the union of its subtypes and itself (if not abstract) if cls.name in self.synopsis.isarefs: childrenExprs = [] for child_classname in sorted(list(self.synopsis.isarefs[cls.name].classrefs)): childrenExprs.append(self._class_or_type_uri(child_classname)) if not (cls.mixin or cls.abstract) or len(childrenExprs) == 1: childrenExprs.insert(0, self.shape) self.shapes.append(ShapeOr(id=self._class_or_type_uri(cls), shapeExprs=childrenExprs)) else: self.shapes.append(ShapeOr(id=self._class_or_type_uri(cls), shapeExprs=childrenExprs)) self.shape.id = self._class_or_type_uri(cls, "_struct") self.shapes.append(self.shape) else: self.shape.id = self._class_or_type_uri(cls) self.shapes.append(self.shape) def visit_class_slot( self, cls: ClassDefinition, aliased_slot_name: SlotDefinitionName, slot: SlotDefinition, ) -> None: if not (slot.identifier or slot.abstract or slot.mixin): constraint = TripleConstraint() self._add_constraint(constraint) constraint.predicate = self.namespaces.uri_for(slot.slot_uri) constraint.min = int(bool(slot.required)) constraint.max = 1 if not slot.multivalued else -1 if slot.range in self.schema.enums: # Handle permissible values from enums enum = self.schema.enums[slot.range] values = [] for value in enum.permissible_values.values(): if value.meaning: values.append(self.namespaces.uri_for(value.meaning)) else: value_uri = f"{self._class_or_type_uri(enum.name)}#{urlparse.quote(value.text)}" values.append(value_uri) if values: node_constraint = NodeConstraint( # id=self._class_or_type_uri(slot.range), values=values, ) constraint.valueExpr = node_constraint elif self.expand_subproperty_of and slot.subproperty_of: # Handle subproperty_of constraint - restrict to slot descendants values = self._get_subproperty_values(slot) if values: constraint.valueExpr = NodeConstraint(values=values) else: constraint.valueExpr = self._class_or_type_uri(slot.range) else: constraint.valueExpr = self._class_or_type_uri(slot.range) def end_schema(self, output: Optional[str] = None, **_) -> str: self.shex.shapes = self.shapes if self.shapes else [Shape()] shex = as_json_1(self.shex) if self.format == "rdf": g = Graph() g.parse(data=shex, format="json-ld", version="1.1") g.bind("owl", OWL) shex = g.serialize(format="turtle") elif self.format == "shex": g = Graph() self.namespaces.load_graph(g) shex = str(ShExC(self.shex, base=sfx(self.namespaces._base), namespaces=g)) if output: with open(output, "w", encoding="UTF-8") as outf: outf.write(shex) return shex def _class_or_type_uri( self, item: Union[TypeDefinition, ClassDefinition, ElementName], suffix: Optional[str] = "", ) -> URIorCURIE: # TODO: enums - figure this out if isinstance(item, (TypeDefinition, ClassDefinition, EnumDefinition)): cls_or_type = item else: cls_or_type = self.class_or_type_for(item) return self.namespaces.uri_for( self.namespaces.uri_or_curie_for( self.schema_defaults[cls_or_type.from_schema], camelcase(cls_or_type.name) + suffix, ) ) def _slot_uri(self, name: str, suffix: Optional[str] = "") -> URIorCURIE: slot = self.schema.slots[name] return self.namespaces.uri_for( self.namespaces.uri_or_curie_for(self.schema_defaults[slot.from_schema], camelcase(name) + suffix) ) def _add_constraint(self, constraint) -> None: # No constraints if not self.shape.expression: self.shape.expression = constraint # One constraint elif not isinstance(self.shape.expression, EachOf): self.shape.expression = EachOf(expressions=[self.shape.expression, constraint]) # Two or more constraints else: self.shape.expression.expressions.append(constraint) def _type_arc(self, target: URIorCURIE, opt: bool = False) -> TripleConstraint: return TripleConstraint( predicate=RDF.type, valueExpr=NodeConstraint(values=[IRIREF(self.namespaces.uri_for(target))]), min=0 if opt else 1, ) def _get_subproperty_values(self, slot: SlotDefinition) -> list: """ Get all valid values from slot hierarchy for subproperty_of constraint. Following metamodel semantics: "any ontological child (related to X via an is_a relationship), is a valid value for the slot" Values are formatted as URIs for ShEx compatibility. :param slot: SlotDefinition with subproperty_of set :return: List of URI strings for NodeConstraint values """ from linkml_runtime.utils.schemaview import SchemaView sv = SchemaView(self.schema) # ShEx always uses full URIs return get_subproperty_values(sv, slot, expand_uri=True)
@shared_arguments(ShExGenerator) @click.command(name="shex") @click.option("-o", "--output", help="Output file name") @click.option( "--expand-subproperty-of/--no-expand-subproperty-of", default=True, show_default=True, help="If --expand-subproperty-of (default), slots with subproperty_of will generate NodeConstraint " "values containing all slot descendants. Use --no-expand-subproperty-of to disable this behavior.", ) @click.version_option(__version__, "-V", "--version") def cli(yamlfile, **args): """Generate a ShEx Schema for a LinkML model""" print(ShExGenerator(yamlfile, **args).serialize(**args)) if __name__ == "__main__": cli()