Source code for linkml.generators.sparqlgen

import logging
import os
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path

import click
from jinja2 import Template

from linkml._version import __version__
from linkml.utils.generator import Generator, shared_arguments
from linkml_runtime.linkml_model.meta import Prefix
from linkml_runtime.utils.formatutils import underscore
from linkml_runtime.utils.schemaview import SchemaView

logger = logging.getLogger(__name__)

template = """
{% for pfxn, pfx in schema.prefixes.items() -%}
PREFIX {{pfxn}}: <{{pfx.prefix_reference}}>
{% endfor %}

{% for cn, c in schema.classes.items() if not c.mixin and not c.abstract %}

## --
## Checks for {{ cn }}
## --

# @CHECK permitted_{{cn}}
SELECT ?g ?s ?p WHERE {
 GRAPH ?g {
  ?s rdf:type {{ schema_view.get_uri(cn) }} ;
     ?p ?o .
  FILTER ( ?p NOT IN (
   {% for sn in schema_view.class_slots(cn) -%}
   {{ schema_view.get_uri(schema_view.get_slot(sn, attributes=True)) }},
   {% endfor -%}
   rdf:type ))
 }
 {{ extra }}
} {{ limit }}


{% for slot in schema_view.class_induced_slots(cn) -%}

{% if slot.required %}
# @CHECK required_{{cn}}_{{slot.name}}
SELECT
  ?check
  ?graph
  ?subject
  ?predicate WHERE {
 GRAPH ?graph {
  ?subject rdf:type {{ schema_view.get_uri(cn) }} .
  FILTER NOT EXISTS { ?subject  {{ schema_view.get_uri(slot) }} ?o  }
 }
 VALUES ?check { linkml:required }
 VALUES ?predicate { {{schema_view.get_uri(slot)}} }
 {{ extra }}
}  {{ limit }}
{% endif %}

{% if slot.range in schema_view.all_classes() %}
# @CHECK object_range_{{cn}}_{{slot.name}}
SELECT
  ?check
  ?graph
  ?subject
  ?predicate
  ?object
WHERE {
 GRAPH ?graph {
  ?subject rdf:type {{ schema_view.get_uri(cn) }} ;
     ?predicate ?object .
  FILTER NOT EXISTS {
    ?object rdf:type ?otype .
    FILTER ( ?otype IN (
      {% for a in schema_view.class_descendants(slot.range) -%}
      {{ schema_view.get_uri(a) }}
      {{ ", " if not loop.last else "" }}
      {% endfor -%} ))
  }
 }
 VALUES ?check { linkml:range }
 VALUES ?predicate { {{ schema_view.get_uri(slot) }}  }
 {{ extra }}
}  {{ limit }}
{% endif %}

{%- endfor %}


## -- End of checks for {{ cn }}
{% endfor %}
"""

x = """
{% for sn in schema_view.class_slots(c.name) %}
     {{ schema.slots[sn].slot_uri }}
   {% endfor %}
"""


def materialize_schema(schemaview: SchemaView):
    schema = schemaview.schema
    if "rdf" not in schema.prefixes:
        schema.prefixes["rdf"] = Prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
    for scn in schemaview.imports_closure():
        for pfxn, pfx in schemaview.schema_map[scn].prefixes.items():
            if pfxn not in schema:
                schema.prefixes[pfxn] = pfx
    for cn, c in schemaview.all_classes().items():
        for a in list(c.attributes.values()):
            schema.slots[a.name] = a
            c.slots.append(a.name)
            del c.attributes[a.name]
    schemaview.set_modified()
    for cn, c in schemaview.all_classes().items():
        for s in schemaview.class_induced_slots(cn):
            if s.name not in c.slots:
                c.slots.append(s.name)
            c.slot_usage[s.name] = s
            s.slot_uri = schemaview.get_uri(s)


[docs] @dataclass class SparqlGenerator(Generator): """ Generates SPARQL queries that can be used for delayed validation """ # ClassVars generatorname = os.path.basename(__file__) valid_formats = ["sparql"] visit_all_class_slots = False uses_schemaloader = False # ObjectVars named_graphs: list[str] | None = None limit: int | None = None sparql: str | None = None def __post_init__(self): self.schemaview = SchemaView(self.schema) materialize_schema(self.schemaview) super().__post_init__() self.queries = self.generate_sparql(named_graphs=self.named_graphs, limit=self.limit) def generate_sparql(self, named_graphs=None, limit: int = None): template_obj = Template(template) extra = "" if named_graphs is not None: extra += f"FILTER( ?graph in ( {','.join(named_graphs)} ))" logger.info(f"Named Graphs = {named_graphs} // extra={extra}") if limit is not None and isinstance(limit, int): limit = f"LIMIT {limit}" else: limit = "" sparql = template_obj.render(schema_view=self.schemaview, schema=self.schema, limit=limit, extra=extra) self.sparql = sparql queries = self.split_sparql(sparql) return queries
[docs] def serialize(self, directory=None) -> str: if directory is not None: Path(directory).mkdir(parents=True, exist_ok=True) for qn, q in self.queries.items(): qpath = os.path.join(directory, f"{qn}.rq") with open(qpath, "w", encoding="UTF-8") as stream: stream.write(q) return self.sparql
@staticmethod def split_sparql(sparql: str) -> dict[str, str]: lines = sparql.split("\n") prolog = "" queries = defaultdict(str) q = None for line in lines: if line.startswith("# @"): q = underscore(line.replace("# @", "")) queries[q] = prolog + "\n" elif q is None: if line.lower().startswith("prefix"): prolog += line + "\n" else: queries[q] += line + "\n" return queries
@shared_arguments(SparqlGenerator) @click.command(name="sparql") @click.option("--dir", "-d", help="Directory in which queries will be deposited") @click.version_option(__version__, "-V", "--version") def cli(yamlfile, dir, **kwargs): """Generate SPARQL queries for validation This will generate a directory of queries that can be used for QC over a triplestore that is conformant to the same LinkML schema. Each query in the directory will be of the form CHECK_<ConstraintType>_<SchemaElement>.rq Example: gen-sparql -d ./sparql/ personinfo.yaml """ SparqlGenerator(yamlfile, **kwargs).serialize(directory=dir) if __name__ == "__main__": cli()