Source code for schema_automator.generalizers.json_instance_generalizer

import logging

import click
from typing import Union, Dict, List, Any, Mapping, Collection
from collections import defaultdict
import json

import tomlkit
import yaml
import gzip

from dataclasses import dataclass, field

from linkml_runtime import SchemaView
from linkml_runtime.linkml_model import SchemaDefinition, SlotDefinitionName

from schema_automator.generalizers.generalizer import Generalizer
from schema_automator.generalizers.csv_data_generalizer import CsvDataGeneralizer
from linkml_runtime.utils.formatutils import camelcase

from schema_automator.utils.schemautils import write_schema


[docs] @dataclass class JsonDataGeneralizer(Generalizer): """ A generalizer that abstract from JSON instance data """ mappings: dict = None omit_null: bool = None inline_as_dict_slot_keys: Mapping[str, str] = None """Mapping between the name of a dict-inlined slot and the unique key for that entity """
[docs] def convert(self, input: Union[str, Dict], format: str = 'json', container_class_name='Container', **kwargs) -> SchemaDefinition: """ Generalizes from a JSON file :param input: :param format: :param container_class_name: :param kwargs: :return: """ csv_engine = CsvDataGeneralizer(**kwargs) if isinstance(input, dict): obj = input else: if format.endswith('.gz'): format = format.replace('.gz', '') stream = gzip.open(input) else: stream = open(input) with stream: if format == 'json': obj = json.load(stream) elif format == 'yaml': obj = yaml.safe_load(stream) elif format == 'toml': obj_str = "".join(stream.readlines()) toml_obj = tomlkit.parse(obj_str) json_str = json.dumps(toml_obj) obj = json.loads(json_str) else: raise Exception(f'bad format {format}') rows_by_table = defaultdict(list) self.rows_by_table = rows_by_table self._convert_obj(obj, table=container_class_name) schemas = [] for cn, rows_dict in rows_by_table.items(): schema = csv_engine.convert_dicts(rows_dict, cn, cn) schemas.append(schema) sv = SchemaView(schemas[0]) for s in schemas[1:]: sv.merge_schema(s) schema = sv.schema schema.classes[container_class_name].tree_root = True self.add_additional_info(schema) return schema
def _key_to_classname(self, k: str) -> str: return camelcase(k) def _convert_obj(self, obj, table='Container'): """ Recursively transform an object into flattened key-value lists :param obj: :param table: :return: """ if isinstance(obj, dict): row = defaultdict(set) for k, v in obj.items(): if v is None and self.omit_null: continue if self.inline_as_dict_slot_keys and k in self.inline_as_dict_slot_keys: key_name = self.inline_as_dict_slot_keys[SlotDefinitionName(k)] self.identifier_slots.append(key_name) #print(f"INLINED: {key_name} = {v}") v = self._inlined_dict_to_list(v, key_name) tbl_name = k if self.depluralize_class_names: singular_noun = self.inflect_engine.singular_noun(tbl_name) if singular_noun: logging.info(f"Depluralized: {tbl_name} => {singular_noun}") tbl_name = singular_noun tbl_name = camelcase(tbl_name) row[k] = self._convert_obj(v, table=tbl_name) self.rows_by_table[table].append(row) return f'$ref:{table}' elif isinstance(obj, list): new_list = [self._convert_obj(v, table=table) for v in obj] return new_list else: return obj def _inlined_dict_to_list(self, inlined_dict: Dict[str, dict], key_name: str) -> list: rows = [] for k, v in inlined_dict.items(): if isinstance(v, dict): rows.append({**v, key_name: k}) elif isinstance(v, list): raise ValueError(f"Cannot handle an inlined dict of form {inlined_dict} for key={k}") else: rows.append({key_name: k, f"{key_name}_value": v}) return rows def _as_name(self, v): v = str(v) for sep in ['#', '/']: if sep in v: return v.split(sep)[-1] return v
def parse_frontmatter_files(paths: List[str], text_slot='_text') -> Any: blocks = [] for path in paths: with open(path) as stream: state = 0 yamlstr = "" txt = "" for line in stream.readlines(): if line.startswith('---'): state += 1 else: if state == 1: yamlstr += line elif state == 2: txt += line elif state > 2: raise Exception(f'Limited to one frontmatter block per file') obj = yaml.safe_load(yamlstr) obj[text_slot] = txt blocks.append(obj) return blocks @click.command() @click.argument('input') @click.option('--container-class-name', help="name of root class") @click.option('--format', '-f', default='json', help="json or yaml (or json.gz or yaml.gz) or frontmatter") @click.option('--enum-columns', '-E', multiple=True, help='column(s) that is forced to be an enum') @click.option('--enum-mask-columns', multiple=True, help='column(s) that are excluded from being enums') @click.option('--max-enum-size', default=50, help='do not create an enum if more than max distinct members') @click.option('--enum-threshold', default=0.1, help='if the number of distinct values / rows is less than this, do not make an enum') @click.option('--omit-null/--no-omit-null', default=False, help="if true, ignore null values") def json2model(input, format, omit_null, **kwargs): """ Infer a model from JSON instance data """ ie = JsonDataGeneralizer(omit_null=omit_null) schema = ie.convert(input, dir=dir, format=format, **kwargs) write_schema(schema) @click.command() @click.argument('inputs', nargs=-1) @click.option('--container-class-name', help="name of root class") @click.option('--enum-columns', '-E', multiple=True, help='column(s) that is forced to be an enum') @click.option('--enum-mask-columns', multiple=True, help='column(s) that are excluded from being enums') @click.option('--max-enum-size', default=50, help='do not create an enum if more than max distinct members') @click.option('--enum-threshold', default=0.1, help='if the number of distinct values / rows is less than this, do not make an enum') @click.option('--omit-null/--no-omit-null', default=False, help="if true, ignore null values") def frontmatter2model(inputs, format, omit_null, **kwargs): """ Infer a model from frontmatter files """ ie = JsonDataGeneralizer(omit_null=omit_null) objs = parse_frontmatter_files(list(inputs)) schema = ie.convert({'objects': objs}, dir=dir, format=format, **kwargs) write_schema(schema) if __name__ == '__main__': json2model()