Source code for schema_automator.importers.owl_import_engine

import click
import logging
import yaml
from typing import Union, Dict, Tuple, List, Any
from collections import defaultdict

from linkml_runtime.linkml_model import SchemaDefinition
from funowl.converters.functional_converter import to_python
from funowl import *

from dataclasses import dataclass
from schema_automator.importers.import_engine import ImportEngine
from schema_automator.utils.schemautils import write_schema


[docs] @dataclass class OwlImportEngine(ImportEngine): """ An ImportEngine that takes schema-style OWL and converts it to a LinkML schema """ mappings: dict = None include_unmapped_annotations = False
[docs] def convert(self, file: str, name: str = None, model_uri: str = None, identifier: str = None, **kwargs) -> \ SchemaDefinition: """ Converts an OWL schema-style ontology :param file: :param name: :param model_uri: :param identifier: :param kwargs: :return: """ self.mappings = {} doc = to_python(file) ontology = doc.ontology if len(ontology.axioms) == 0: raise Exception(f'Empty ontology in {file} (note: ontologies must be in functional syntax)') prefixes = doc.prefixDeclarations self.prefixes = prefixes if model_uri is None: model_uri = f'https://w3id.org/{name}/' if name is None: name = self.iri_to_name(ontology.iri) classes = {} slots = {} enums = {} types = {} schema_dict = { 'id': f'{ontology.iri}', 'name': name, 'description': name, 'imports': ['linkml:types'], 'prefixes': { 'linkml': 'https://w3id.org/linkml/', name: model_uri, }, 'default_prefix': name, 'types': types, 'classes': classes, 'slots': slots, 'enums': enums } self.schema = schema_dict isamap = defaultdict(set) slot_isamap = defaultdict(set) slot_usage_map = defaultdict(dict) single_valued_slots = set() for a in ontology.axioms: logging.debug(f'Axiom: {a}') if isinstance(a, SubClassOf): if isinstance(a.subClassExpression, Class): def set_slot_usage(p, k, v): if p not in slot_usage_map[child]: slot_usage_map[child][p] = {} slot_usage_map[child][p][k] = v def set_cardinality(p, min_card, max_card): if max_card is not None: if max_card == 1: set_slot_usage(p, 'multivalued', False) elif max_card > 1: set_slot_usage(p, 'multivalued', True) if min_card is not None: if min_card == 1: set_slot_usage(p, 'required', True) elif min_card == 0: set_slot_usage(p, 'required', False) else: set_slot_usage(p, 'multivalued', True) child = self.iri_to_name(a.subClassExpression) if isinstance(a.superClassExpression, Class): parent = self.iri_to_name(a.superClassExpression) isamap[child].add(parent) elif isinstance(a.superClassExpression, DataExactCardinality): x = a.superClassExpression card = x.card p = self.iri_to_name(x.dataPropertyExpression) set_cardinality(p, card, card) elif isinstance(a.superClassExpression, ObjectExactCardinality): x = a.superClassExpression card = x.card p = self.iri_to_name(x.objectPropertyExpression) set_cardinality(p, card, card) elif isinstance(a.superClassExpression, ObjectMinCardinality): x = a.superClassExpression p = self.iri_to_name(x.objectPropertyExpression) set_cardinality(p, x.min_, None) elif isinstance(a.superClassExpression, DataMinCardinality): x = a.superClassExpression p = self.iri_to_name(x.dataPropertyExpression) set_cardinality(p, x.min_, None) elif isinstance(a.superClassExpression, ObjectMaxCardinality): x = a.superClassExpression p = self.iri_to_name(x.objectPropertyExpression) set_cardinality(p, None, x.max_) elif isinstance(a.superClassExpression, DataMaxCardinality): x = a.superClassExpression p = self.iri_to_name(x.dataPropertyExpression) set_cardinality(p, None, x.max_) elif isinstance(a.superClassExpression, ObjectAllValuesFrom): x = a.superClassExpression p = self.iri_to_name(x.objectPropertyExpression) if isinstance(x.classExpression, Class): set_slot_usage(p, 'range', self.iri_to_name(x.classExpression)) else: logging.error(f'Cannot yet handle anonymous ranges: {x.classExpression}') elif isinstance(a.superClassExpression, ObjectSomeValuesFrom): x = a.superClassExpression p = self.iri_to_name(x.objectPropertyExpression) set_cardinality(p, 1, None) elif isinstance(a.superClassExpression, DataSomeValuesFrom): x = a.superClassExpression if len(x.dataPropertyExpressions) == 1: p = self.iri_to_name(x.dataPropertyExpressions[0]) set_cardinality(p, 1, None) else: logging.error(f'Cannot handle multiple data property expressions: {x}') elif isinstance(a.superClassExpression, DataAllValuesFrom): x = a.superClassExpression if len(x.dataPropertyExpressions) == 1: p = self.iri_to_name(x.dataPropertyExpressions[0]) r = x.dataRange if isinstance(r, DataOneOf): logging.error(f'TODO: enum for {r}') elif isinstance(r, Datatype): set_slot_usage(p, 'range', r) else: logging.error(f'Cannot handle range of {r}') else: logging.error(f'Cannot handle multiple data property expressions: {x}') elif isinstance(a.superClassExpression, DataHasValue): x = a.superClassExpression p = self.iri_to_name(x.dataPropertyExpression) #if p not in slot_usage_map[child]: # slot_usage_map[child][p] = {} lit = x.literal.v if isinstance(lit, TypedLiteral): lit = lit.literal set_slot_usage(p, 'equals_string', str(lit)) #slot_usage_map[child][p]['equals_string'] = str(lit) else: logging.error(f"cannot handle anon parent classes for {a}") else: logging.error(f"cannot handle anon child classes for {a}") # https://github.com/hsolbrig/funowl/issues/19 if isinstance(a, SubObjectPropertyOf): sub = a.subObjectPropertyExpression.v if isinstance(sub, ObjectPropertyExpression) and isinstance(sub.v, ObjectProperty): child = self.iri_to_name(sub.v) sup = a.superObjectPropertyExpression.v if isinstance(sup, ObjectPropertyExpression) and isinstance(sup.v, ObjectProperty): parent = self.iri_to_name(sup.v) slot_isamap[child].add(parent) else: logging.error(f"cannot handle anon object parent properties for {a}") else: logging.error(f"cannot handle anon object child properties for {a}") if isinstance(a, SubDataPropertyOf): sub = a.subDataPropertyExpression.v if isinstance(sub, DataProperty): child = self.iri_to_name(sub) sup = a.superDataPropertyExpression.v if isinstance(sup, DataProperty): parent = self.iri_to_name(sup) slot_isamap[child].add(parent) else: logging.error(f"cannot handle anon data parent properties for {a}") else: logging(f"cannot handle anon data child properties for {a}") if isinstance(a, SubAnnotationPropertyOf): child = self.iri_to_name(a.sub) parent = self.iri_to_name(a.super) slot_isamap[child].add(parent) # domains become slot declarations if isinstance(a, ObjectPropertyDomain) or isinstance(a, DataPropertyDomain): if isinstance(a, ObjectPropertyDomain): p = a.objectPropertyExpression.v else: p = a.dataPropertyExpression.v sn = self.iri_to_name(p) dc = a.classExpression if isinstance(dc, Class): c = self.iri_to_name(dc) self.class_info(c, 'slots', sn, True) #logging.error(f'Inferred {c} from domain of {p}') if isinstance(dc, ObjectUnionOf): for x in dc.classExpressions: if isinstance(x, Class): c = self.iri_to_name(x) self.class_info(c, 'slots', sn, True) if isinstance(a, ObjectPropertyRange): p = a.objectPropertyExpression.v sn = self.iri_to_name(p) rc = a.classExpression if isinstance(rc, Class): self.slot_info(sn, 'range', self.iri_to_name(rc)) if isinstance(a, DataPropertyRange): p = a.dataPropertyExpression.v sn = self.iri_to_name(p) rc = a.dataRange if isinstance(rc, Datatype): logging.error('TODO') #self.slot_info(sn, 'range', self.iri_to_name(rc)) if isinstance(a, AnnotationPropertyRange): self.slot_info(self.iri_to_name(a.property), 'range', self.iri_to_name(a.range)) if isinstance(a, Declaration): e = a.v uri_as_curie = str(e.v) if uri_as_curie.startswith(':'): uri_as_curie = f'{name}{uri_as_curie}' if type(e) == Class: cn = self.iri_to_name(e.v) self.class_info(cn, 'class_uri', uri_as_curie) if type(e) in [ObjectProperty, DataProperty, AnnotationProperty]: cn = self.iri_to_name(e.v) self.slot_info(cn, 'slot_uri', uri_as_curie) for c, parents in isamap.items(): parents = list(parents) p = parents.pop() self.class_info(c, 'is_a', p) for p in parents: self.class_info(c, 'mixins', p, True) for c, parents in slot_isamap.items(): parents = list(parents) p = parents.pop() self.slot_info(c, 'is_a', p) for p in parents: self.slot_info(c, 'mixins', p, True) for a in ontology.axioms: if isinstance(a, AnnotationAssertion): p = a.property strp = str(p) sub = a.subject.v val = a.value.v if isinstance(sub, IRI): sub = self.iri_to_name(sub) if isinstance(val, Literal): val = str(val.v) elif isinstance(val, IRI): val = val.v else: val = str(val) if sub in classes: t = 'classes' elif sub in slots: t = 'slots' else: logging.error(f'{sub} is not known') if t is not None: if strp == 'rdfs:comment': self.element_info(t, sub, 'comments', val, multivalued=True) elif strp == ':definition': self.element_info(t, sub, 'description', val, multivalued=False) elif strp == 'schema:rangeIncludes': range_cn = self.iri_to_name(val) logging.error(f'UNTESTED RANGE: schema.org {sub} {val} // {domain_cn}') self.add_range(sub, range_cn) elif strp == 'schema:domainIncludes': domain_cn = self.iri_to_name(val) logging.error(f'UNTESTED: schema.org {sub} {val} // {domain_cn}') if domain_cn not in self.schema['classes']: self.schema['classes'][domain_cn] = {} if 'slots' not in self.schema['classes'][domain_cn]: self.schema['classes'][domain_cn]['slots'] = [] self.schema['classes'][domain_cn]['slots'].append(sub) else: if self.include_unmapped_annotations: self.element_info(t, sub, 'comments', f'{p} = {val}', multivalued=True) for cn, usage in slot_usage_map.items(): schema_dict['classes'][cn]['slot_usage'] = usage for sn, s in schema_dict['slots'].items(): if 'multivalued' not in s: s['multivalued'] = sn not in single_valued_slots if 'range' in s: if isinstance(s['range'], list): rg = s['range'] if len(rg) == 0: del s['range'] elif len(rg) == 1: s['range'] = rg[0] else: del s['range'] s['any_of'] = [ {'range': x} for x in rg ] if identifier is not None: slots[identifier] = {'identifier': True, 'range': 'uriorcurie'} for c in classes.values(): if not c.get('is_a', None) and not c.get('mixins', []): if 'slots' not in c: c['slots'] = [] c['slots'].append(identifier) schema = SchemaDefinition(**schema_dict) return schema
def class_info(self, *args, **kwargs): self.element_info('classes', *args, **kwargs) def slot_info(self, *args, **kwargs): self.element_info('slots', *args, **kwargs) def add_range(self, sn, range_cn): if sn not in self.schema['slots']: self.schema['slots'][sn] = {} if 'range' not in self.schema['slots'][sn]: self.schema['slots'][sn]['range'] = [] self.schema['slots'][sn]['range'].append(range_cn) def element_info(self, type: str, cn: str, sn: str, v: Any, multivalued = False): if cn not in self.schema[type]: self.schema[type][cn] = {} c = self.schema[type][cn] if multivalued: if sn not in c: c[sn] = [] c[sn].append(v) else: if sn in c and v != c[sn]: logging.error(f'Overwriting {sn} for {c} to {v}') c[sn] = v def iri_to_name(self, v): n = self._as_name(v) if n != v: self.mappings[n] = v return n def _as_name(self, v): v = str(v) for sep in ['#', '/', ':']: if sep in v: return v.split(sep)[-1] return v
@click.command() @click.argument('owlfile') @click.option('--name', '-n', help="Schema name") @click.option('--identifier', '-I', help="Slot to use as identifier") @click.option('--model-uri', help="Model URI prefix") @click.option('--output', '-o', help="Path to saved yaml schema") def owl2model(owlfile, output, **args): """ Infer a model from OWL Ontology Note: input must be in functional syntax """ sie = OwlImportEngine() schema = sie.convert(owlfile, **args) write_schema(schema, output) if __name__ == '__main__': owl2model()