Source code for linkml.generators.sqltablegen

import logging
import os
from dataclasses import dataclass
from typing import Optional

import click
from linkml_runtime.dumpers import yaml_dumper
from linkml_runtime.linkml_model import SchemaDefinition, SlotDefinition
from linkml_runtime.utils.formatutils import camelcase, underscore
from linkml_runtime.utils.schemaview import SchemaView
from sqlalchemy import Column, ForeignKey, MetaData, Table, UniqueConstraint, create_mock_engine
from sqlalchemy.types import Boolean, Date, DateTime, Enum, Float, Integer, Text, Time

from linkml._version import __version__
from linkml.transformers.relmodel_transformer import ForeignKeyPolicy, RelationalModelTransformer
from linkml.utils.generator import Generator, shared_arguments
from linkml.utils.schemaloader import SchemaLoader


class SqlNamingPolicy(Enum):
    preserve = "preserve"
    underscore = "underscore"
    camelcase = "camelcase"


# TODO: move this up
METAMODEL_TYPE_TO_BASE = {
    "string": "str",
    "integer": "int",
    "boolean": "Bool",
    "float": "float",
    "double": "double",
    "decimal": "Decimal",
    "time": "XSDTime",
    "date": "XSDDate",
    "datetime": "XSDDateTime",
    "uriorcurie": "URIorCURIE",
    "uri": "URI",
    "ncname": "NCName",
    "objectidentifier": "ElementIdentifier",
    "nodeidentifier": "NodeIdentifier",
}

RANGEMAP = {
    "str": Text(),
    "string": Text(),
    "NCName": Text(),
    "URIorCURIE": Text(),
    "int": Integer(),
    "Decimal": Integer(),
    "double": Float(),
    "float": Float(),
    "Bool": Boolean(),
    "URI": Text(),
    "XSDTime": Time(),
    "XSDDateTime": DateTime(),
    "XSDDate": Date(),
}


[docs]@dataclass class SQLTableGenerator(Generator): """ A :class:`~linkml.utils.generator.Generator` for creating SQL DDL The basic algorithm for mapping a linkml schema S is as follows: - Each schema S corresponds to one database schema D (see SQLSchema) - Each Class C in S is mapped to a table T (see SQLTable) - Each slot S in each C is mapped to a column Col (see SQLColumn) if the direct_mapping attribute is set to true, then no further transformations are applied. Note that this means: - inline objects are modeled as Text strings - multivalued fields are modeled as single Text strings this direct mapping is useful for simple spreadsheet/denormalized representations of complex data. however, for other applications, additional transformations should occur. these are: MULTIVALUED SLOTS The relational model does not have direct representation of lists. These are normalized as follows. If the range of the slot is a class, and there are no other slots whose range is this class, and the slot is for a class that has a singular primary key, then a backref is added. E.g. if we have User 0..* Address, then add a field User_id to Address. When SQLAlchemy bindings are created, a backref mapping is added If the range of the slot is an enum or type, then a new linktable is created, and a backref added E.g. if a class User has a multivalues slot alias whose range is a string, then create a table user_aliases, with two columns (1) alias [a string] and (2) a backref to user Each mapped slot C.S has a range R ranges that are types (literals): * If R is a type, and the slot is NOT multivalued, do a direct type conversion * If R is a type, and the slot is multivalued: * do not include the mapped column * create a new table T_S, with 2 columns: S, and a backref to T ranges that are classes: * Ref = map_class_to_table(R) * if R is a class, and the slot is NOT multivalued, and Ref has a singular primary key: * Col.type = ForeignKey(Ref.PK) * if R is a class, and the slot is NOT multivalued, and Ref has NO singular primary key: * add a foreign key C.pk to Ref * add a backref C.S => Ref, C.pk * remove Col from T * If R is a class, and the slot IS multivalued """ # ClassVars generatorname = os.path.basename(__file__) generatorversion = "0.1.1" valid_formats = ["sql"] file_extension = "sql" uses_schemaloader = False # ObjectVars use_inherits: bool = False # postgresql supports inheritance dialect: str = "sqlite" inject_primary_keys: bool = True use_foreign_keys: bool = True rename_foreign_keys: bool = False direct_mapping: bool = False relative_slot_num: bool = False
[docs] def serialize(self, **kwargs) -> str: return self.generate_ddl(**kwargs)
def generate_ddl(self, naming_policy: SqlNamingPolicy = None, **kwargs) -> str: ddl_str = "" def dump(sql, *multiparams, **params): nonlocal ddl_str ddl_str += f"{str(sql.compile(dialect=engine.dialect)).rstrip()};" engine = create_mock_engine(f"{self.dialect}://./MyDb", strategy="mock", executor=dump) schema_metadata = MetaData() sqltr = RelationalModelTransformer(SchemaView(self.schema)) if not self.use_foreign_keys: sqltr.foreign_key_policy = ForeignKeyPolicy.NO_FOREIGN_KEYS tr_result = sqltr.transform( tgt_schema_name=kwargs.get("tgt_schema_name", None), top_class=kwargs.get("top_class", None) ) schema = tr_result.schema def sql_name(n: str) -> str: if naming_policy is not None: if naming_policy == SqlNamingPolicy.underscore: return underscore(n) elif naming_policy == SqlNamingPolicy.camelcase: return camelcase(n) elif naming_policy == SqlNamingPolicy.preserve: return n else: raise Exception(f"Unknown: {naming_policy}") else: return n def strip_newlines(txt: Optional[str]) -> str: if txt is None: return "" return txt.replace("\n", "") # Currently SQLite dialect in SQLA does not generate comments; see # https://github.com/sqlalchemy/sqlalchemy/issues/1546#issuecomment-1067389172 # As a workaround we add these as "--" comments via direct string manipulation include_comments = self.dialect == "sqlite" sv = SchemaView(schema) for cn, c in schema.classes.items(): if include_comments: ddl_str += f'-- # Class: "{cn}" Description: "{strip_newlines(c.description)}"\n' pk_slot = sv.get_identifier_slot(cn) if c.attributes: cols = [] for sn, s in c.attributes.items(): is_pk = "primary_key" in s.annotations if pk_slot: is_pk = sn == pk_slot.name # else: # is_pk = True ## TODO: use unique key args = [] if s.range in schema.classes and self.use_foreign_keys: fk = sql_name(self.get_foreign_key(s.range, sv)) args = [ForeignKey(fk)] field_type = self.get_sql_range(s, schema) col = Column( sql_name(sn), field_type, *args, primary_key=is_pk, nullable=not s.required, ) if include_comments: ddl_str += f"-- * Slot: {sn} Description: {strip_newlines(s.description)}\n" if s.description: col.comment = s.description cols.append(col) for uc_name, uc in c.unique_keys.items(): def _sql_name(sn: str): if sn in c.attributes: return sql_name(sn) else: # for candidate in c.attributes.values(): # if "original_slot" in candidate.annotations: # original = candidate.annotations["original_slot"] # if original.value == sn: # return sql_name(candidate.name) return None sql_names = [_sql_name(sn) for sn in uc.unique_key_slots] if any(sn is None for sn in sql_names): continue sql_uc = UniqueConstraint(*sql_names) cols.append(sql_uc) Table(sql_name(cn), schema_metadata, *cols, comment=str(c.description)) schema_metadata.create_all(engine) return ddl_str def get_sql_range(self, slot: SlotDefinition, schema: SchemaDefinition = None): """ returns a SQL Alchemy column type """ range = slot.range # if no SchemaDefinition is explicitly provided as an argument # then simply use the schema that is provided to the SQLTableGenerator() object if not schema: schema = SchemaLoader(data=self.schema).resolve() if range in schema.classes: # FK type should be the same as the identifier of the foreign key fk = SchemaView(schema).get_identifier_slot(range) if fk: return self.get_sql_range(fk, schema) else: return Text() if range in schema.enums: e = schema.enums[range] if e.permissible_values is not None: vs = [str(v) for v in e.permissible_values] return Enum(name=e.name, *vs) if range in METAMODEL_TYPE_TO_BASE: range_base = METAMODEL_TYPE_TO_BASE[range] elif range in schema.types: range_base = schema.types[range].base elif range is None: return Text() else: logging.error(f"Unknown range: {range} for {slot.name} = {slot.range}") return Text() if range_base in RANGEMAP: return RANGEMAP[range_base] else: logging.error(f"UNKNOWN range base: {range_base} for {slot.name} = {slot.range}") return Text() @staticmethod def get_foreign_key(cn: str, sv: SchemaView) -> str: pk = sv.get_identifier_slot(cn) # TODO: move this to SV if pk is None: for sn in sv.class_slots(cn): s = sv.induced_slot(sn, cn) if s.key: pk = s break if pk is None: raise Exception(f"No PK for {cn}") if pk.alias: pk_name = pk.alias else: pk_name = pk.name return f"{cn}.{pk_name}"
@shared_arguments(SQLTableGenerator) @click.command(name="sqltables") @click.option( "--dialect", default="sqlite", show_default=True, help="SQL-Alchemy dialect, e.g. sqlite, mysql+odbc", ) @click.option("--sqla-file", help="Path to sqlalchemy generated python") @click.option( "--relmodel-output", help="Path to intermediate LinkML YAML of transformed relational model", ) @click.option("--python-import", help="Python import header for generated sql-alchemy code") @click.option( "--use-foreign-keys/--no-use-foreign-keys", default=True, show_default=True, help="Emit FK declarations", ) @click.version_option(__version__, "-V", "--version") def cli( yamlfile, relmodel_output, sqla_file: str = None, python_import: str = None, dialect=None, use_foreign_keys=True, **args, ): """Generate SQL DDL representation""" if relmodel_output: sv = SchemaView(yamlfile) rtr = RelationalModelTransformer(sv) if not use_foreign_keys: rtr.foreign_key_policy = ForeignKeyPolicy.NO_FOREIGN_KEYS rtr_result = rtr.transform("foo") relmodel_schema = rtr_result.schema yaml_dumper.dump(relmodel_schema, to_file=relmodel_output) gen = SQLTableGenerator(yamlfile, use_foreign_keys=use_foreign_keys, **args) if dialect: gen.dialect = dialect print(gen.generate_ddl()) if sqla_file is not None: raise NotImplementedError("SQLAGen not implemented") if __name__ == "__main__": cli()