Source code for schema_automator.importers.frictionless_import_engine
import logging
from typing import Union, Dict, Tuple, List, Any, Optional
from dataclasses import dataclass
from linkml.utils.schema_builder import SchemaBuilder
from linkml_runtime.linkml_model.meta import SchemaDefinition, SlotDefinition, EnumDefinition, \
PermissibleValue, UniqueKey, ClassDefinition
from linkml_runtime.loaders import json_loader
from linkml_runtime.utils.formatutils import camelcase
from schema_automator.importers.import_engine import ImportEngine
import schema_automator.metamodels.frictionless as fl
TYPE_MAPPING = {
"string": "string",
"datetime": "datetime",
"boolean": "boolean",
"integer": "integer",
"number": "decimal",
}
def _desc(elt: Union[fl.Field, fl.Resource]) -> Optional[str]:
if elt.description:
return elt.description[0]
else:
return None
def _add_unique_keys(cls: ClassDefinition, name: str, slot_names: List[str]):
uk = UniqueKey(name, unique_key_slots=slot_names)
cls.unique_keys[name] = uk
[docs]
@dataclass
class FrictionlessImportEngine(ImportEngine):
"""
An ImportEngine that imports Frictionless data packages with schema information
See:
`Frictionless specs <https://specs.frictionlessdata.io/>`_
`Patterns <https://specs.frictionlessdata.io/patterns/>`_
"""
[docs]
def convert(self, file: str, id: str=None, name: str=None, **kwargs) -> SchemaDefinition:
"""
Converts one or more JSON files into a Schema
:param files:
:param kwargs:
:return:
"""
package: fl.Package = json_loader.load(file, target_class=fl.Package)
sb = SchemaBuilder()
schema = sb.schema
if id:
schema.id = id
if not name:
name = package.name
if name:
schema.name = name
schema.description = package.title
for resource in package.resources:
sb.add_class(resource.name)
cls = schema.classes[resource.name]
cls.description = _desc(resource)
cls.title = resource.title
tbl = resource.schema
for field in tbl.fields:
slot = SlotDefinition(field.name, description=_desc(field))
cls.attributes[slot.name] = slot
constraints = field.constraints
if constraints:
slot.required = constraints.required
slot.pattern = constraints.pattern
if constraints.unique is True:
_add_unique_keys(cls, f"{slot.name}_unique_key", [slot.name])
if field.enum:
e = self.add_enum(sb, field)
slot.range = e.name
elif field.type:
t = str(field.type)
if field.type == fl.TypeEnum(fl.TypeEnum.array):
slot.multivalued = True
else:
slot.range = TYPE_MAPPING[t]
if tbl.primaryKey:
pks = tbl.primaryKey
if len(pks) > 1:
_add_unique_keys(cls, f"{cls.name}_primary_key", [pks])
else:
cls.attributes[pks[0]].identifier = True
if tbl.foreignKeys:
for fk in tbl.foreignKeys:
fk_fields = fk.fields
if isinstance(fk_fields, list) and len(fk_fields) > 1:
logging.warning(f"Cannot handle compound FKs: {cls.name}.[{fk_fields}]")
else:
if isinstance(fk_fields, list):
fk_field = fk_fields[0]
else:
fk_field = fk_fields
if fk_field:
fk_slot = cls.attributes[fk_field]
fk_slot.range = fk.reference.resource
# assume fk.fields is the PK
sb.add_defaults()
if name:
schema.default_prefix = name
for c in schema.classes.values():
c.from_schema = 'http://example.org/'
return sb.schema
def add_enum(self, sb: SchemaBuilder, field: fl.Field) -> EnumDefinition:
name = camelcase(f"{field.name}_enum")
e = EnumDefinition(name)
for code in field.enum:
pv = PermissibleValue(code)
# TODO: this behavior may be specific to C2M2, make this configurable
if ":" in code:
toks = code.split(":")
if len(toks) == 2:
[prefix, short] = toks
pv = PermissibleValue(short, meaning=code)
sb.add_prefix(prefix, f"{sb.schema.id}/{prefix}/", replace_if_present=True)
e.permissible_values[pv.text] = pv
if e.name is sb.schema:
raise NotImplementedError(f"Cannot yet merge enums")
sb.schema.enums[e.name] = e
return e