import logging
import os
from dataclasses import dataclass
from typing import Callable
import click
from jsonasobj2 import JsonObj, as_dict
from linkml_runtime.linkml_model.meta import ClassDefinition, ElementName
from linkml_runtime.utils.formatutils import underscore
from linkml_runtime.utils.schemaview import SchemaView
from linkml_runtime.utils.yamlutils import TypedNode, extended_float, extended_int, extended_str
from rdflib import BNode, Graph, Literal, URIRef
from rdflib.collection import Collection
from rdflib.namespace import RDF, SH, XSD
from linkml._version import __version__
from linkml.generators.shacl.shacl_data_type import ShaclDataType
from linkml.generators.shacl.shacl_ifabsent_processor import ShaclIfAbsentProcessor
from linkml.utils.generator import Generator, shared_arguments
logger = logging.getLogger(__name__)
[docs]@dataclass
class ShaclGenerator(Generator):
# ClassVars
closed: bool = True
"""True means add 'sh:closed=true' to all shapes, except of mixin shapes and shapes, that have parents"""
suffix: str = None
"""parameterized suffix to be appended. No suffix per default."""
include_annotations: bool = False
"""True means include all class / slot / type annotations in generated Node or Property shapes"""
exclude_imports: bool = False
"""If True, elements from imported ontologies won't be included in the generator's output"""
generatorname = os.path.basename(__file__)
generatorversion = "0.0.1"
valid_formats = ["ttl"]
file_extension = "shacl.ttl"
visit_all_class_slots = False
uses_schemaloader = True
def __post_init__(self) -> None:
self.schemaview = SchemaView(self.schema)
super().__post_init__()
self.generate_header()
def generate_header(self) -> str:
out = f"\n# metamodel_version: {self.schema.metamodel_version}"
if self.schema.version:
out += f"\n# version: {self.schema.version}"
return out
[docs] def serialize(self, **args) -> str:
g = self.as_graph()
data = g.serialize(format="turtle" if self.format in ["owl", "ttl"] else self.format)
return data
def as_graph(self) -> Graph:
sv = self.schemaview
g = Graph()
g.bind("sh", SH)
ifabsent_processor = ShaclIfAbsentProcessor(sv)
for pfx in self.schema.prefixes.values():
g.bind(str(pfx.prefix_prefix), pfx.prefix_reference)
for c in sv.all_classes(imports=not self.exclude_imports).values():
def shape_pv(p, v):
if v is not None:
g.add((class_uri_with_suffix, p, v))
class_uri = URIRef(sv.get_uri(c, expand=True))
class_uri_with_suffix = class_uri
if self.suffix is not None:
class_uri_with_suffix += self.suffix
shape_pv(RDF.type, SH.NodeShape)
shape_pv(SH.targetClass, class_uri) # TODO
if self.closed:
if c.mixin or c.abstract:
shape_pv(SH.closed, Literal(False))
else:
shape_pv(SH.closed, Literal(True))
else:
shape_pv(SH.closed, Literal(False))
if c.title is not None:
shape_pv(SH.name, Literal(c.title))
if c.description is not None:
shape_pv(SH.description, Literal(c.description))
shape_pv(SH.ignoredProperties, self._build_ignored_properties(g, c))
if c.annotations and self.include_annotations:
self._add_annotations(shape_pv, c)
order = 0
for s in sv.class_induced_slots(c.name):
# fixed in linkml-runtime 1.1.3
if s.name in sv.element_by_schema_map():
slot_uri = URIRef(sv.get_uri(s, expand=True))
else:
pfx = sv.schema.default_prefix
slot_uri = URIRef(sv.expand_curie(f"{pfx}:{underscore(s.name)}"))
pnode = BNode()
shape_pv(SH.property, pnode)
def prop_pv(p, v):
if v is not None:
g.add((pnode, p, v))
def prop_pv_literal(p, v):
if v is not None:
g.add((pnode, p, Literal(v)))
prop_pv(SH.path, slot_uri)
prop_pv_literal(SH.order, order)
order += 1
prop_pv_literal(SH.name, s.title)
prop_pv_literal(SH.description, s.description)
# minCount
if s.minimum_cardinality:
prop_pv_literal(SH.minCount, s.minimum_cardinality)
elif s.exact_cardinality:
prop_pv_literal(SH.minCount, s.exact_cardinality)
elif s.required:
prop_pv_literal(SH.minCount, 1)
# maxCount
if s.maximum_cardinality:
prop_pv_literal(SH.maxCount, s.maximum_cardinality)
elif s.exact_cardinality:
prop_pv_literal(SH.maxCount, s.exact_cardinality)
elif not s.multivalued:
prop_pv_literal(SH.maxCount, 1)
prop_pv_literal(SH.minInclusive, s.minimum_value)
prop_pv_literal(SH.maxInclusive, s.maximum_value)
all_classes = sv.all_classes()
if s.any_of:
# It is not allowed to use any of and equals_string or equals_string_in in one
# slot definition, as both are mapped to sh:in in SHACL
if s.equals_string or s.equals_string_in:
error = "'equals_string'/'equals_string_in' and 'any_of' are mutually exclusive"
raise ValueError(f'{TypedNode.yaml_loc(str(s), suffix="")} {error}')
or_node = BNode()
prop_pv(SH["or"], or_node)
range_list = []
for any in s.any_of:
r = any.range
if r in all_classes:
class_node = BNode()
def cl_node_pv(p, v):
if v is not None:
g.add((class_node, p, v))
self._add_class(cl_node_pv, r)
range_list.append(class_node)
elif r in sv.all_types():
t_node = BNode()
def t_node_pv(p, v):
if v is not None:
g.add((t_node, p, v))
self._add_type(t_node_pv, r)
range_list.append(t_node)
elif r in sv.all_enums():
en_node = BNode()
def en_node_pv(p, v):
if v is not None:
g.add((en_node, p, v))
self._add_enum(g, en_node_pv, r)
range_list.append(en_node)
else:
st_node = BNode()
def st_node_pv(p, v):
if v is not None:
g.add((st_node, p, v))
add_simple_data_type(st_node_pv, r)
range_list.append(st_node)
Collection(g, or_node, range_list)
else:
prop_pv_literal(SH.hasValue, s.equals_number)
r = s.range
if s.equals_string or s.equals_string_in:
# Check if range is "string" as this is mandatory for "equals_string" and "equals_string_in"
if r != "string":
raise ValueError(
f"slot: \"{slot_uri}\" - 'equals_string' and 'equals_string_in'"
f" require range 'string' and not '{r}'"
)
if r in all_classes:
self._add_class(prop_pv, r)
if sv.get_identifier_slot(r) is not None:
prop_pv(SH.nodeKind, SH.IRI)
else:
prop_pv(SH.nodeKind, SH.BlankNodeOrIRI)
elif r in sv.all_types():
self._add_type(prop_pv, r)
elif r in sv.all_enums():
self._add_enum(g, prop_pv, r)
else:
add_simple_data_type(prop_pv, r)
if s.pattern:
prop_pv(SH.pattern, Literal(s.pattern))
if s.equals_string:
# Map equal_string and equal_string_in to sh:in
self._and_equals_string(g, prop_pv, [s.equals_string])
if s.equals_string_in:
# Map equal_string and equal_string_in to sh:in
self._and_equals_string(g, prop_pv, s.equals_string_in)
if s.annotations and self.include_annotations:
self._add_annotations(prop_pv, s)
default_value = ifabsent_processor.process_slot(s, c)
if default_value:
prop_pv(SH.defaultValue, default_value)
return g
def _add_class(self, func: Callable, r: ElementName) -> None:
sv = self.schemaview
range_ref = sv.get_uri(r, expand=True)
func(SH["class"], URIRef(range_ref))
def _add_enum(self, g: Graph, func: Callable, r: ElementName) -> None:
sv = self.schemaview
enum = sv.get_enum(r)
pv_node = BNode()
Collection(
g,
pv_node,
[
URIRef(sv.expand_curie(pv.meaning)) if pv.meaning else Literal(pv_name)
for pv_name, pv in enum.permissible_values.items()
],
)
func(SH["in"], pv_node)
def _add_type(self, func: Callable, r: ElementName) -> None:
func(SH.nodeKind, SH.Literal)
sv = self.schemaview
rt = sv.get_type(r)
if rt.uri:
func(SH.datatype, URIRef(sv.get_uri(rt, expand=True)))
if rt.pattern:
func(SH.pattern, Literal(rt.pattern))
if rt.annotations and self.include_annotations:
self._add_annotations(func, rt)
else:
logger.error(f"No URI for type {rt.name}")
def _and_equals_string(self, g: Graph, func: Callable, values: list) -> None:
pv_node = BNode()
Collection(
g,
pv_node,
[Literal(v) for v in values],
)
func(SH["in"], pv_node)
def _add_annotations(self, func: Callable, item) -> None:
# TODO: migrate some of this logic to SchemaView
sv = self.schemaview
annotations = item.annotations
# item could be a class, slot or type
# annotation type could be dict (on types) or JsonObj (on slots)
if type(annotations) is JsonObj:
annotations = as_dict(annotations)
for a in annotations.values():
# If ':' is in the tag, treat it as a CURIE, otherwise string Literal
if ":" in a["tag"]:
N_predicate = URIRef(sv.expand_curie(a["tag"]))
else:
N_predicate = Literal(a["tag"], datatype=XSD.string)
# If the value is a string and ':' is in the value, treat it as a CURIE,
# otherwise treat as Literal with derived XSD datatype
if type(a["value"]) is extended_str and ":" in a["value"]:
N_object = URIRef(sv.expand_curie(a["value"]))
else:
N_object = Literal(a["value"], datatype=self._getXSDtype(a["value"]))
func(N_predicate, N_object)
def _getXSDtype(self, value):
value_type = type(value)
if value_type is bool:
return XSD.boolean
elif value_type is extended_str:
return XSD.string
elif value_type is extended_int:
return XSD.integer
elif value_type is extended_float:
# TODO: distinguish between xsd:decimal and xsd:double?
return XSD.decimal
else:
return None
def _and_equals_string(self, g: Graph, func: Callable, values: list) -> None:
pv_node = BNode()
Collection(
g,
pv_node,
[Literal(v) for v in values],
)
func(SH["in"], pv_node)
def _build_ignored_properties(self, g: Graph, c: ClassDefinition) -> BNode:
def collect_child_properties(class_name: str, output: set) -> None:
for childName in self.schemaview.class_children(class_name, imports=True, mixins=False, is_a=True):
output.update(
{
URIRef(self.schemaview.get_uri(prop, expand=True))
for prop in self.schemaview.class_slots(childName)
}
)
collect_child_properties(childName, output)
child_properties = set()
collect_child_properties(c.name, child_properties)
class_slot_uris = {
URIRef(self.schemaview.get_uri(prop, expand=True)) for prop in self.schemaview.class_slots(c.name)
}
ignored_properties = child_properties.difference(class_slot_uris)
list_node = BNode()
ignored_properties.add(RDF.type)
Collection(g, list_node, list(ignored_properties))
return list_node
def add_simple_data_type(func: Callable, r: ElementName) -> None:
for datatype in list(ShaclDataType):
if datatype.linkml_type == r:
func(SH.datatype, datatype.uri_ref)
@shared_arguments(ShaclGenerator)
@click.command(name="shacl")
@click.option(
"--closed/--non-closed",
default=True,
show_default=True,
help="Use '--closed' to generate closed SHACL shapes. Use '--non-closed' to generate open SHACL shapes.",
)
@click.option(
"-s",
"--suffix",
default=None,
show_default=True,
help="Use --suffix to append given string to SHACL class name (e. g. --suffix Shape: Person becomes PersonShape).",
)
@click.option(
"--include-annotations/--exclude-annotations",
default=False,
show_default=True,
help="Use --include-annotations to include annotations of slots, types, and classes in the generated SHACL shapes.",
)
@click.option(
"--exclude-imports/--include-imports",
default=False,
show_default=True,
help="Use --exclude-imports to exclude imported elements from the generated SHACL shapes. This is useful when "
"extending a substantial ontology to avoid large output files.",
)
@click.version_option(__version__, "-V", "--version")
def cli(yamlfile, **args):
"""Generate SHACL turtle from a LinkML model"""
gen = ShaclGenerator(yamlfile, **args)
print(gen.serialize())
if __name__ == "__main__":
cli()