Source code for linkml.generators.shexgen

"""Generate ShEx definition of a model

"""

import os
import urllib.parse as urlparse
from dataclasses import dataclass, field
from typing import List, Optional, Union

import click
from jsonasobj import as_json as as_json_1
from linkml_runtime.linkml_model.meta import (
    ClassDefinition,
    ElementName,
    EnumDefinition,
    SlotDefinition,
    SlotDefinitionName,
    TypeDefinition,
)
from linkml_runtime.linkml_model.types import SHEX
from linkml_runtime.utils.formatutils import camelcase, sfx
from linkml_runtime.utils.metamodelcore import URIorCURIE
from rdflib import OWL, RDF, XSD, Graph, Namespace
from ShExJSG import ShExC
from ShExJSG.SchemaWithContext import Schema
from ShExJSG.ShExJ import IRIREF, EachOf, NodeConstraint, Shape, ShapeOr, TripleConstraint

from linkml import METAMODEL_NAMESPACE, METAMODEL_NAMESPACE_NAME
from linkml._version import __version__
from linkml.utils.generator import Generator, shared_arguments


[docs]@dataclass class ShExGenerator(Generator): # ClassVars generatorname = os.path.basename(__file__) generatorversion = "0.0.2" valid_formats = ["shex", "json", "rdf"] file_extension = "shex.rdf" visit_all_class_slots = False uses_schemaloader = True # ObjectVars shex: Schema = field(default_factory=lambda: Schema()) # ShEx Schema being generated shapes: List = field(default_factory=lambda: []) shape: Optional[Shape] = None # Current shape being defined list_shapes: List[IRIREF] = field(default_factory=lambda: []) # Shapes that have been defined as lists def __post_init__(self): super().__post_init__() if METAMODEL_NAMESPACE_NAME not in self.namespaces: self.namespaces[METAMODEL_NAMESPACE_NAME] = METAMODEL_NAMESPACE self.meta = Namespace( self.namespaces.join(self.namespaces[METAMODEL_NAMESPACE_NAME], "") ) # URI for the metamodel self.base = Namespace(self.namespaces.join(self.namespaces._base, "")) # Base URI for what is being modeled def generate_header(self) -> str: out = f"# metamodel_version: {self.schema.metamodel_version}\n" if self.schema.version: out += f"# version: {self.schema.version}\n" return out def visit_schema(self, **_): # Adjust the schema context to include the base model URI context = self.shex["@context"] self.shex["@context"] = [context, {"@base": self.namespaces._base}] # Emit all of the type definitions for typ in self.schema.types.values(): model_uri = self._class_or_type_uri(typ) if typ.uri: typ_type_uri = self.namespaces.uri_for(typ.uri) if typ_type_uri in (XSD.anyURI, SHEX.iri): self.shapes.append(NodeConstraint(id=model_uri, nodeKind="iri")) elif typ_type_uri == SHEX.nonLiteral: self.shapes.append(NodeConstraint(id=model_uri, nodeKind="nonliteral")) else: self.shapes.append(NodeConstraint(id=model_uri, datatype=self.namespaces.uri_for(typ.uri))) else: typeof_uri = self._class_or_type_uri(typ.typeof) self.shapes.append(Shape(id=model_uri, expression=typeof_uri)) if self.format != "json": return self.generate_header() def visit_class(self, cls: ClassDefinition) -> bool: self.shape = Shape() # Start with all the parent classes, mixins and appytos struct_ref_list = [cls.is_a] if cls.is_a else [] struct_ref_list += cls.mixins if cls.name in self.synopsis.applytorefs: for applier in self.synopsis.applytorefs[cls.name].classrefs: struct_ref_list.append(applier) for sr in struct_ref_list: self._add_constraint(self._class_or_type_uri(sr, "_tes")) self._add_constraint(self._type_arc(self.schema.classes[sr].class_uri, opt=True)) return True def end_class(self, cls: ClassDefinition) -> None: # On entry self.shape contains all of the triple expressions that define the body of the shape # Finish off the shape definition itself # If there is nothing yet, we're at the very root of things. Add in a final catch-all for any additional # type arcs. NOTE: Here is where you can sink other things as well if you want to ignore categories of things if self.shape.expression is None: self._add_constraint(TripleConstraint(predicate=RDF.type, min=0, max=-1)) self.shape.expression.id = self._class_or_type_uri(cls, "_tes") self.shape.expression = EachOf( expressions=[ self.shape.expression, self._type_arc(cls.class_uri, not bool(self.class_identifier(cls))), ] ) self.shape.closed = not (cls.abstract or cls.mixin) # If this class has subtypes, define the class as the union of its subtypes and itself (if not abstract) if cls.name in self.synopsis.isarefs: childrenExprs = [] for child_classname in sorted(list(self.synopsis.isarefs[cls.name].classrefs)): childrenExprs.append(self._class_or_type_uri(child_classname)) if not (cls.mixin or cls.abstract) or len(childrenExprs) == 1: childrenExprs.insert(0, self.shape) self.shapes.append(ShapeOr(id=self._class_or_type_uri(cls), shapeExprs=childrenExprs)) else: self.shapes.append(ShapeOr(id=self._class_or_type_uri(cls), shapeExprs=childrenExprs)) self.shape.id = self._class_or_type_uri(cls, "_struct") self.shapes.append(self.shape) else: self.shape.id = self._class_or_type_uri(cls) self.shapes.append(self.shape) def visit_class_slot( self, cls: ClassDefinition, aliased_slot_name: SlotDefinitionName, slot: SlotDefinition, ) -> None: if not (slot.identifier or slot.abstract or slot.mixin): constraint = TripleConstraint() self._add_constraint(constraint) constraint.predicate = self.namespaces.uri_for(slot.slot_uri) constraint.min = int(bool(slot.required)) constraint.max = 1 if not slot.multivalued else -1 if slot.range in self.schema.enums: # Handle permissible values from enums enum = self.schema.enums[slot.range] values = [] for value in enum.permissible_values.values(): if value.meaning: values.append(self.namespaces.uri_for(value.meaning)) else: value_uri = f"{self._class_or_type_uri(enum.name)}#{urlparse.quote(value.text)}" values.append(value_uri) if values: node_constraint = NodeConstraint( # id=self._class_or_type_uri(slot.range), values=values, ) constraint.valueExpr = node_constraint else: constraint.valueExpr = self._class_or_type_uri(slot.range) def end_schema(self, output: Optional[str] = None, **_) -> str: self.shex.shapes = self.shapes if self.shapes else [Shape()] shex = as_json_1(self.shex) if self.format == "rdf": g = Graph() g.parse(data=shex, format="json-ld", version="1.1") g.bind("owl", OWL) shex = g.serialize(format="turtle") elif self.format == "shex": g = Graph() self.namespaces.load_graph(g) shex = str(ShExC(self.shex, base=sfx(self.namespaces._base), namespaces=g)) if output: with open(output, "w", encoding="UTF-8") as outf: outf.write(shex) return shex def _class_or_type_uri( self, item: Union[TypeDefinition, ClassDefinition, ElementName], suffix: Optional[str] = "", ) -> URIorCURIE: # TODO: enums - figure this out if isinstance(item, (TypeDefinition, ClassDefinition, EnumDefinition)): cls_or_type = item else: cls_or_type = self.class_or_type_for(item) return self.namespaces.uri_for( self.namespaces.uri_or_curie_for( self.schema_defaults[cls_or_type.from_schema], camelcase(cls_or_type.name) + suffix, ) ) def _slot_uri(self, name: str, suffix: Optional[str] = "") -> URIorCURIE: slot = self.schema.slots[name] return self.namespaces.uri_for( self.namespaces.uri_or_curie_for(self.schema_defaults[slot.from_schema], camelcase(name) + suffix) ) def _add_constraint(self, constraint) -> None: # No constraints if not self.shape.expression: self.shape.expression = constraint # One constraint elif not isinstance(self.shape.expression, EachOf): self.shape.expression = EachOf(expressions=[self.shape.expression, constraint]) # Two or more constraints else: self.shape.expression.expressions.append(constraint) def _type_arc(self, target: URIorCURIE, opt: bool = False) -> TripleConstraint: return TripleConstraint( predicate=RDF.type, valueExpr=NodeConstraint(values=[IRIREF(self.namespaces.uri_for(target))]), min=0 if opt else 1, )
@shared_arguments(ShExGenerator) @click.command() @click.option("-o", "--output", help="Output file name") @click.version_option(__version__, "-V", "--version") def cli(yamlfile, **args): """Generate a ShEx Schema for a LinkML model""" print(ShExGenerator(yamlfile, **args).serialize(**args)) if __name__ == "__main__": cli()