Source code for schema_automator.generalizers.rdf_data_generalizer

import click
from collections import defaultdict
import os
from csv import DictWriter
from linkml_runtime import SchemaView
from linkml_runtime.linkml_model import SchemaDefinition

from rdflib import Graph, URIRef
from rdflib.namespace import RDF

from dataclasses import dataclass

from schema_automator.generalizers.generalizer import Generalizer
from schema_automator.generalizers.csv_data_generalizer import CsvDataGeneralizer
from schema_automator.utils.schemautils import write_schema


[docs] @dataclass class RdfDataGeneralizer(Generalizer): """ A generalizer that generalizes from source RDF turtle data """ mappings: dict = None
[docs] def convert(self, file: str, dir: str, **kwargs) -> SchemaDefinition: """ Generalizes from an RDF file :param file: :param dir: :param kwargs: :return: """ csv_engine = CsvDataGeneralizer() g = Graph() g.parse(file, **kwargs) self.mappings = {} paths = self.graph_to_tables(g, dir) schemas = [] for c, tsvfile in paths.items(): schemas.append(csv_engine.convert(tsvfile, class_name=c)) sv = SchemaView(schemas[0]) for s in schemas[1:]: sv.merge_schema(s) schema = sv.schema mappings = self.mappings for cn, c in schema.classes.items(): if cn in mappings: c.class_uri = mappings[cn] for sn, s in schema.slots.items(): if sn in mappings: s.slot_uri = mappings[sn] for en, e in schema.enums.items(): for pvn, pvo in e.permissible_values.items(): if pvn in mappings: pvo.meaning = mappings[pvn] return schema
def graph_to_tables(self, g: Graph, dir: str): mappings = self.mappings rows_by_table = defaultdict(list) for s,_,t_uriref in g.triples((None, RDF.type, None)): t = self._as_name(t_uriref) mappings[t] = str(t_uriref) #print(f'I={s} type={t}') row = defaultdict(set) row['id'] = {s} for _,p_uriref,o_uriref in g.triples((s, None, None)): p = self._as_name(p_uriref) o = self._as_name(o_uriref) mappings[p] = str(p_uriref) if isinstance(o_uriref, URIRef): mappings[o] = str(o_uriref) row[p].add(o) rows_by_table[t].append(row) #print(f' ROW={row}') paths = {} for t, rows in rows_by_table.items(): path = f'{os.path.join(dir, t)}.tsv' paths[t] = path fields = [] for row in rows: for k in row.keys(): if k not in fields: fields.append(k) v = list(row[k]) v = '|'.join(v) row[k] = v print(f'Writing {len(rows)} to {path}') with open(path, 'w') as stream: w = DictWriter(stream, delimiter='\t', fieldnames=fields) w.writeheader() for row in rows: w.writerow(row) return paths def _as_name(self, v): v = str(v) for sep in ['#', '/']: if sep in v: return v.split(sep)[-1] return v
@click.command() @click.argument('rdffile') @click.option('--dir', '-d', required=True) def rdf2model(rdffile, dir, **args): """ Infer a model from RDF instance data """ sie = RdfDataGeneralizer() if not os.path.exists(dir): os.makedirs(dir) schema = sie.convert(rdffile, dir=dir, format='ttl') write_schema(schema) if __name__ == '__main__': rdf2model()