importloggingimportosfromcollectionsimportdefaultdictfromdataclassesimportdataclassfrompathlibimportPathfromtypingimportOptionalimportclickfromjinja2importTemplatefromlinkml_runtime.linkml_model.metaimportPrefixfromlinkml_runtime.utils.formatutilsimportunderscorefromlinkml_runtime.utils.schemaviewimportSchemaViewfromlinkml._versionimport__version__fromlinkml.utils.generatorimportGenerator,shared_argumentslogger=logging.getLogger(__name__)template="""{% for pfxn, pfx in schema.prefixes.items() -%}PREFIX {{pfxn}}: <{{pfx.prefix_reference}}>{% endfor %}{% for cn, c in schema.classes.items() if not c.mixin and not c.abstract %}## --## Checks for {{ cn }}## --# @CHECK permitted_{{cn}}SELECT ?g ?s ?p WHERE { GRAPH ?g { ?s rdf:type {{ schema_view.get_uri(cn) }} ; ?p ?o . FILTER ( ?p NOT IN ( {% for sn in schema_view.class_slots(cn) -%} {{ schema_view.get_uri(schema_view.get_slot(sn, attributes=True)) }}, {% endfor -%} rdf:type )) } {{ extra }}} {{ limit }}{% for slot in schema_view.class_induced_slots(cn) -%}{% if slot.required %}# @CHECK required_{{cn}}_{{slot.name}}SELECT ?check ?graph ?subject ?predicate WHERE { GRAPH ?graph { ?subject rdf:type {{ schema_view.get_uri(cn) }} . FILTER NOT EXISTS { ?subject {{ schema_view.get_uri(slot) }} ?o } } VALUES ?check { linkml:required } VALUES ?predicate { {{schema_view.get_uri(slot)}} } {{ extra }}} {{ limit }}{% endif %}{% if slot.range in schema_view.all_classes() %}# @CHECK object_range_{{cn}}_{{slot.name}}SELECT ?check ?graph ?subject ?predicate ?objectWHERE { GRAPH ?graph { ?subject rdf:type {{ schema_view.get_uri(cn) }} ; ?predicate ?object . FILTER NOT EXISTS { ?object rdf:type ?otype . FILTER ( ?otype IN ( {% for a in schema_view.class_descendants(slot.range) -%} {{ schema_view.get_uri(a) }} {{ ", " if not loop.last else "" }} {% endfor -%} )) } } VALUES ?check { linkml:range } VALUES ?predicate { {{ schema_view.get_uri(slot) }} } {{ extra }}} {{ limit }}{% endif %}{%- endfor %}## -- End of checks for {{ cn }}{% endfor %}"""x="""{% for sn in schema_view.class_slots(c.name) %} {{ schema.slots[sn].slot_uri }} {% endfor %}"""defmaterialize_schema(schemaview:SchemaView):schema=schemaview.schemaif"rdf"notinschema.prefixes:schema.prefixes["rdf"]=Prefix("rdf","http://www.w3.org/1999/02/22-rdf-syntax-ns#")forscninschemaview.imports_closure():forpfxn,pfxinschemaview.schema_map[scn].prefixes.items():ifpfxnnotinschema:schema.prefixes[pfxn]=pfxforcn,cinschemaview.all_classes().items():forainlist(c.attributes.values()):schema.slots[a.name]=ac.slots.append(a.name)delc.attributes[a.name]schemaview.set_modified()forcn,cinschemaview.all_classes().items():forsinschemaview.class_induced_slots(cn):ifs.namenotinc.slots:c.slots.append(s.name)c.slot_usage[s.name]=ss.slot_uri=schemaview.get_uri(s)
[docs]@dataclassclassSparqlGenerator(Generator):""" Generates SPARQL queries that can be used for delayed validation """# ClassVarsgeneratorname=os.path.basename(__file__)valid_formats=["sparql"]visit_all_class_slots=Falseuses_schemaloader=False# ObjectVarsnamed_graphs:Optional[list[str]]=Nonelimit:Optional[int]=Nonesparql:Optional[str]=Nonedef__post_init__(self):self.schemaview=SchemaView(self.schema)materialize_schema(self.schemaview)super().__post_init__()self.queries=self.generate_sparql(named_graphs=self.named_graphs,limit=self.limit)defgenerate_sparql(self,named_graphs=None,limit:int=None):template_obj=Template(template)extra=""ifnamed_graphsisnotNone:extra+=f"FILTER( ?graph in ( {','.join(named_graphs)} ))"logger.info(f"Named Graphs = {named_graphs} // extra={extra}")iflimitisnotNoneandisinstance(limit,int):limit=f"LIMIT {limit}"else:limit=""sparql=template_obj.render(schema_view=self.schemaview,schema=self.schema,limit=limit,extra=extra)self.sparql=sparqlqueries=self.split_sparql(sparql)returnqueries
@shared_arguments(SparqlGenerator)@click.command(name="sparql")@click.option("--dir","-d",help="Directory in which queries will be deposited")@click.version_option(__version__,"-V","--version")defcli(yamlfile,dir,**kwargs):"""Generate SPARQL queries for validation This will generate a directory of queries that can be used for QC over a triplestore that is conformant to the same LinkML schema. Each query in the directory will be of the form CHECK_<ConstraintType>_<SchemaElement>.rq Example: gen-sparql -d ./sparql/ personinfo.yaml """SparqlGenerator(yamlfile,**kwargs).serialize(directory=dir)if__name__=="__main__":cli()