Source code for linkml.generators.sparqlgen

import logging
import os
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional

import click
from jinja2 import Template
from linkml_runtime.linkml_model.meta import Prefix
from linkml_runtime.utils.formatutils import underscore
from linkml_runtime.utils.schemaview import SchemaView

from linkml._version import __version__
from linkml.utils.generator import Generator, shared_arguments

logger = logging.getLogger(__name__)

template = """
{% for pfxn, pfx in schema.prefixes.items() -%}
PREFIX {{pfxn}}: <{{pfx.prefix_reference}}>
{% endfor %}

{% for cn, c in schema.classes.items() if not c.mixin and not c.abstract %}

## --
## Checks for {{ cn }}
## --

# @CHECK permitted_{{cn}}
SELECT ?g ?s ?p WHERE {
 GRAPH ?g {
  ?s rdf:type {{ schema_view.get_uri(cn) }} ;
     ?p ?o .
  FILTER ( ?p NOT IN (
   {% for sn in schema_view.class_slots(cn) -%}
   {{ schema_view.get_uri(schema_view.get_slot(sn, attributes=True)) }},
   {% endfor -%}
   rdf:type ))
 }
 {{ extra }}
} {{ limit }}


{% for slot in schema_view.class_induced_slots(cn) -%}

{% if slot.required %}
# @CHECK required_{{cn}}_{{slot.name}}
SELECT
  ?check
  ?graph
  ?subject
  ?predicate WHERE {
 GRAPH ?graph {
  ?subject rdf:type {{ schema_view.get_uri(cn) }} .
  FILTER NOT EXISTS { ?subject  {{ schema_view.get_uri(slot) }} ?o  }
 }
 VALUES ?check { linkml:required }
 VALUES ?predicate { {{schema_view.get_uri(slot)}} }
 {{ extra }}
}  {{ limit }}
{% endif %}

{% if slot.range in schema_view.all_classes() %}
# @CHECK object_range_{{cn}}_{{slot.name}}
SELECT
  ?check
  ?graph
  ?subject
  ?predicate
  ?object
WHERE {
 GRAPH ?graph {
  ?subject rdf:type {{ schema_view.get_uri(cn) }} ;
     ?predicate ?object .
  FILTER NOT EXISTS {
    ?object rdf:type ?otype .
    FILTER ( ?otype IN (
      {% for a in schema_view.class_descendants(slot.range) -%}
      {{ schema_view.get_uri(a) }}
      {{ ", " if not loop.last else "" }}
      {% endfor -%} ))
  }
 }
 VALUES ?check { linkml:range }
 VALUES ?predicate { {{ schema_view.get_uri(slot) }}  }
 {{ extra }}
}  {{ limit }}
{% endif %}

{%- endfor %}


## -- End of checks for {{ cn }}
{% endfor %}
"""

x = """
{% for sn in schema_view.class_slots(c.name) %}
     {{ schema.slots[sn].slot_uri }}
   {% endfor %}
"""


def materialize_schema(schemaview: SchemaView):
    schema = schemaview.schema
    if "rdf" not in schema.prefixes:
        schema.prefixes["rdf"] = Prefix("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
    for scn in schemaview.imports_closure():
        for pfxn, pfx in schemaview.schema_map[scn].prefixes.items():
            if pfxn not in schema:
                schema.prefixes[pfxn] = pfx
    for cn, c in schemaview.all_classes().items():
        for a in list(c.attributes.values()):
            schema.slots[a.name] = a
            c.slots.append(a.name)
            del c.attributes[a.name]
    schemaview.set_modified()
    for cn, c in schemaview.all_classes().items():
        for s in schemaview.class_induced_slots(cn):
            if s.name not in c.slots:
                c.slots.append(s.name)
            c.slot_usage[s.name] = s
            s.slot_uri = schemaview.get_uri(s)


[docs]@dataclass class SparqlGenerator(Generator): """ Generates SPARQL queries that can be used for delayed validation """ # ClassVars generatorname = os.path.basename(__file__) valid_formats = ["sparql"] visit_all_class_slots = False uses_schemaloader = False # ObjectVars named_graphs: Optional[List[str]] = None limit: Optional[int] = None sparql: Optional[str] = None def __post_init__(self): self.schemaview = SchemaView(self.schema) materialize_schema(self.schemaview) super().__post_init__() self.queries = self.generate_sparql(named_graphs=self.named_graphs, limit=self.limit) def generate_sparql(self, named_graphs=None, limit: int = None): template_obj = Template(template) extra = "" if named_graphs is not None: extra += f'FILTER( ?graph in ( {",".join(named_graphs)} ))' logger.info(f"Named Graphs = {named_graphs} // extra={extra}") if limit is not None and isinstance(limit, int): limit = f"LIMIT {limit}" else: limit = "" sparql = template_obj.render(schema_view=self.schemaview, schema=self.schema, limit=limit, extra=extra) self.sparql = sparql queries = self.split_sparql(sparql) return queries
[docs] def serialize(self, directory=None) -> str: if directory is not None: Path(directory).mkdir(parents=True, exist_ok=True) for qn, q in self.queries.items(): qpath = os.path.join(directory, f"{qn}.rq") with open(qpath, "w", encoding="UTF-8") as stream: stream.write(q) return self.sparql
@staticmethod def split_sparql(sparql: str) -> Dict[str, str]: lines = sparql.split("\n") prolog = "" queries = defaultdict(str) q = None for line in lines: if line.startswith("# @"): q = underscore(line.replace("# @", "")) queries[q] = prolog + "\n" elif q is None: if line.lower().startswith("prefix"): prolog += line + "\n" else: queries[q] += line + "\n" return queries
@shared_arguments(SparqlGenerator) @click.command(name="sparql") @click.option("--dir", "-d", help="Directory in which queries will be deposited") @click.version_option(__version__, "-V", "--version") def cli(yamlfile, dir, **kwargs): """Generate SPARQL queries for validation This will generate a directory of queries that can be used for QC over a triplestore that is conformant to the same LinkML schema. Each query in the directory will be of the form CHECK_<ConstraintType>_<SchemaElement>.rq Example: gen-sparql -d ./sparql/ personinfo.yaml """ SparqlGenerator(yamlfile, **kwargs).serialize(directory=dir) if __name__ == "__main__": cli()