Source code for linkml.generators.yarrrmlgen

from __future__ import annotations

import os
from typing import Any, TextIO

import click
import yaml

from linkml._version import __version__
from linkml.utils.generator import Generator, shared_arguments
from linkml_runtime.linkml_model.meta import ClassDefinition, SchemaDefinition
from linkml_runtime.utils.schemaview import SchemaView

DEFAULT_SOURCE_JSON = "data.json~jsonpath"
DEFAULT_ITERATOR = "$.items[*]"


[docs] class YarrrmlGenerator(Generator): generatorname = os.path.basename(__file__) generatorversion = "0.3.0" valid_formats = ["yml", "yaml"] visit_all_class_slots = False def __init__(self, schema: str | TextIO | SchemaDefinition, format: str = "yml", **kwargs): raw_src = kwargs.pop("source", None) it = kwargs.pop("iterator_template", None) super().__init__(schema, **kwargs) self.schemaview = SchemaView(schema) self.schema: SchemaDefinition = self.schemaview.schema self.format = format self.source: str = self._infer_source_suffix(raw_src) if raw_src else DEFAULT_SOURCE_JSON self.iterator_template: str = it or DEFAULT_ITERATOR def _infer_source_suffix(self, path: str) -> str: p = (path or "").lower() if "~" in p: return path if p.endswith(".json"): return f"{path}~jsonpath" if p.endswith(".csv") or p.endswith(".tsv"): return f"{path}~csv" return path
[docs] def serialize(self, **args) -> str: data = yaml.safe_dump( self.as_dict(), sort_keys=False, allow_unicode=True, default_flow_style=False, indent=2, width=120 ) return data
def as_dict(self) -> dict[str, Any]: sv = self.schemaview mappings: dict[str, Any] = {} inline_owners: dict[str, list[tuple[str, str]]] = {} for owner in sv.all_classes().values(): for s in sv.class_induced_slots(owner.name): if not s.range: continue range_cls = sv.get_class(s.range) if range_cls is None: continue decl = sv.get_slot(s.name) inlined = getattr(decl or s, "inlined", None) if inlined is None: inlined = False if inlined: alias = decl.alias if decl and decl.alias else s.alias var = alias or s.name inline_owners.setdefault(range_cls.name, []).append((owner.name, var)) for cls in sv.all_classes().values(): mapping_dict: dict[str, Any] = {} if self._is_json_source(): if cls.name in inline_owners: owners = inline_owners[cls.name] if len(owners) > 1: raise ValueError( f"Inline class '{cls.name}' is used in multiple owners: " f"{[o[0] for o in owners]}. This is not supported." ) owner_name, slot_var = owners[0] owner_cls = sv.get_class(owner_name) owner_iterator = self._iterator_for_class(owner_cls) mapping_dict["sources"] = [[self.source, f"{owner_iterator}.{slot_var}"]] else: mapping_dict["sources"] = [[self.source, self._iterator_for_class(cls)]] else: mapping_dict["sources"] = [[self.source]] mapping_dict["s"] = self._subject_template_for_class(cls) mapping_dict["po"] = self._po_list_for_class(cls) mappings[str(cls.name)] = mapping_dict prefixes = self._prefixes_with_defaults() return {"prefixes": prefixes, "mappings": mappings} # helpers def _is_json_source(self) -> bool: return "~jsonpath" in (self.source or "") def _prefixes_with_defaults(self) -> dict[str, str]: px: dict[str, str] = {} if self.schema.prefixes: for p in self.schema.prefixes.values(): if p.prefix_prefix and p.prefix_reference: px[str(p.prefix_prefix)] = str(p.prefix_reference) px.setdefault("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") has_user_prefix = any(k not in ("rdf", "linkml") for k in px) if not has_user_prefix: px.setdefault("ex", "https://example.org/default#") if not self.schema.default_prefix: if "ex" in px: self.schema.default_prefix = "ex" else: for k in px: if k not in ("rdf", "linkml"): self.schema.default_prefix = k break return px def _iterator_for_class(self, c: ClassDefinition) -> str: return self.iterator_template.replace("{Class}", c.name) def _subject_template_for_class(self, c: ClassDefinition) -> str: sv = self.schemaview default_prefix = sv.schema.default_prefix or "ex" id_slot = sv.get_identifier_slot(c.name) if id_slot: return f"{default_prefix}:$({id_slot.name})" key_slot = sv.get_key_slot(c.name) if key_slot: return f"{default_prefix}:$({key_slot.name})" return f"{default_prefix}:{c.name}/$(subject_id)" def _po_list_for_class(self, c: ClassDefinition) -> list[dict[str, Any]]: sv = self.schemaview po: list[dict[str, Any]] = [] class_uri = sv.get_uri(c, expand=False) class_term = str(class_uri) if class_uri else f"{sv.schema.default_prefix or 'ex'}:{c.name}" po.append({"p": "rdf:type", "o": class_term}) default_prefix = sv.schema.default_prefix or "ex" for s in sv.class_induced_slots(c.name): decl = sv.get_slot(s.name) slot_uri = None if decl is not None and getattr(decl, "slot_uri", None): slot_uri = decl.slot_uri elif getattr(s, "slot_uri", None): slot_uri = s.slot_uri if slot_uri: pred = str(slot_uri) else: pred_uri = sv.get_uri(decl or s, expand=False) pred = str(pred_uri) if pred_uri is not None else f"{default_prefix}:{s.name}" alias = decl.alias if decl and decl.alias else s.alias var = alias or s.name is_obj = sv.get_class(s.range) is not None if s.range else False if is_obj: inlined = getattr(decl or s, "inlined", None) multivalued = getattr(decl or s, "multivalued", False) if inlined is None: inlined = False if inlined is False: if multivalued: po.append({"p": pred, "o": [{"value": f"$({var}[*])", "type": "iri"}]}) else: po.append({"p": pred, "o": {"value": f"$({var})", "type": "iri"}}) continue range_name = s.range range_id = sv.get_identifier_slot(range_name) or sv.get_key_slot(range_name) if not range_id: raise ValueError( f"Inline class '{range_name}' must define an identifier or key to support join-based linking." ) left = f"$({var}.{range_id.name})" right = f"$({range_id.name})" po.append( { "p": pred, "o": { "mapping": str(range_name), "condition": { "function": "equal", "parameters": [ ["str1", left, "s"], ["str2", right, "o"], ], }, }, } ) continue po.append({"p": pred, "o": f"$({var})"}) return po
@shared_arguments(YarrrmlGenerator) @click.command(name="yarrrml") @click.option( "--source", help="YARRRML source shorthand, e.g., data.json~jsonpath or data.csv~csv (TSV works too)", ) @click.option( "--iterator-template", help='JSONPath iterator template; supports {Class}, default: "$.items[*]"', ) @click.version_option(__version__, "-V", "--version") def cli(yamlfile, source, iterator_template, **args): """Generate YARRRML mappings from a LinkML schema.""" if source: args["source"] = source if iterator_template: args["iterator_template"] = iterator_template gen = YarrrmlGenerator(yamlfile, **args) print(gen.serialize(**args)) if __name__ == "__main__": cli()