Source code for linkml.generators.sqltablegen

from __future__ import annotations

import logging
import os
import re
from dataclasses import dataclass
from typing import Any

import click
from linkml_runtime.dumpers import yaml_dumper
from linkml_runtime.linkml_model import SchemaDefinition, SlotDefinition
from linkml_runtime.utils.formatutils import camelcase, underscore
from linkml_runtime.utils.schemaview import SchemaView
from sqlalchemy import Column, ForeignKey, MetaData, Table, UniqueConstraint, create_mock_engine
from sqlalchemy.dialects.oracle import VARCHAR2
from sqlalchemy.types import Boolean, Date, DateTime, Enum, Float, Integer, Text, Time

from linkml._version import __version__
from linkml.transformers.relmodel_transformer import ForeignKeyPolicy, RelationalModelTransformer
from linkml.utils.generator import Generator, shared_arguments

logger = logging.getLogger(__name__)


class SqlNamingPolicy(Enum):
    preserve = "preserve"
    underscore = "underscore"
    camelcase = "camelcase"


# TODO: move this up
METAMODEL_TYPE_TO_BASE = {
    "string": "str",
    "integer": "int",
    "boolean": "Bool",
    "float": "float",
    "double": "double",
    "decimal": "Decimal",
    "time": "XSDTime",
    "date": "XSDDate",
    "datetime": "XSDDateTime",
    "uriorcurie": "URIorCURIE",
    "uri": "URI",
    "ncname": "NCName",
    "objectidentifier": "ElementIdentifier",
    "nodeidentifier": "NodeIdentifier",
}

RANGEMAP = {
    "str": Text(),
    "string": Text(),
    "String": Text(),
    "NCName": Text(),
    "URIorCURIE": Text(),
    "int": Integer(),
    "Decimal": Integer(),
    "double": Float(),
    "float": Float(),
    "Bool": Boolean(),
    "URI": Text(),
    "XSDTime": Time(),
    "XSDDateTime": DateTime(),
    "XSDDate": Date(),
}


VARCHAR_REGEX = re.compile(r"VARCHAR2?(\((\d+)\))?")
ORACLE_MAX_VARCHAR_LENGTH = 4096


[docs]@dataclass class SQLTableGenerator(Generator): """ A :class:`~linkml.utils.generator.Generator` for creating SQL DDL. The basic algorithm for mapping a linkml schema S is as follows: - Each schema S corresponds to one database schema D (see SQLSchema) - Each Class C in S is mapped to a table T (see sqlalchemy.Table) - Each slot S in each C is mapped to a column Col (see sqlalchemy.Column) If the direct_mapping attribute is set to true, then no further transformations are applied. Note that this means: - inline objects are modeled as Text strings - multivalued fields are modeled as single Text strings This direct mapping is useful for simple spreadsheet/denormalized representations of complex data. However, for other applications, additional transformations should occur. These are: MULTIVALUED SLOTS The relational model does not have direct representation of lists. These are normalized as follows: If the range of the slot is a class, and there are no other slots whose range is this class, and the slot is for a class that has a singular primary key, then a backref is added. E.g. if we have User 0..* Address, then add a field User_id to Address. When SQLAlchemy bindings are created, a backref mapping is added. If the range of the slot is an enum or type, then a new linktable is created, and a backref added. E.g. if a class User has a multivalued slot alias whose range is a string, then create a table user_aliases, with two columns (1) alias [a string] and (2) a backref to user Each mapped slot C.S has a range R ranges that are types (literals): * If R is a type, and the slot is NOT multivalued, do a direct type conversion * If R is a type, and the slot is multivalued: * do not include the mapped column * create a new table T_S, with 2 columns: S, and a backref to T ranges that are classes: * Ref = map_class_to_table(R) * if R is a class, and the slot is NOT multivalued, and Ref has a singular primary key: * Col.type = ForeignKey(Ref.PK) * if R is a class, and the slot is NOT multivalued, and Ref has NO singular primary key: * add a foreign key C.pk to Ref * add a backref C.S => Ref, C.pk * remove Col from T * If R is a class, and the slot IS multivalued """ # ClassVars generatorname = os.path.basename(__file__) generatorversion = "0.1.1" valid_formats = ["sql"] file_extension = "sql" uses_schemaloader = False # ObjectVars use_inherits: bool = False # postgresql supports inheritance dialect: str = "sqlite" inject_primary_keys: bool = True use_foreign_keys: bool = True rename_foreign_keys: bool = False direct_mapping: bool = False relative_slot_num: bool = False default_length_oracle: int = ORACLE_MAX_VARCHAR_LENGTH generate_abstract_class_ddl: bool = True
[docs] def serialize(self, **kwargs: dict[str, Any]) -> str: return self.generate_ddl(**kwargs)
def generate_ddl(self, naming_policy: SqlNamingPolicy = None, **kwargs: dict[str, Any]) -> str: """ Generate a DDL using the schema in self.schema. :param naming_policy: naming policy for columns, defaults to None :type naming_policy: SqlNamingPolicy, optional :return: the DDL as a string :rtype: str """ ddl_str = "" def dump(sql, *multiparams, **params): nonlocal ddl_str ddl_str += f"{str(sql.compile(dialect=engine.dialect)).rstrip()};" engine = create_mock_engine(f"{self.dialect}://./MyDb", strategy="mock", executor=dump) schema_metadata = MetaData() sqltr = RelationalModelTransformer(SchemaView(self.schema)) if not self.use_foreign_keys: sqltr.foreign_key_policy = ForeignKeyPolicy.NO_FOREIGN_KEYS tr_result = sqltr.transform(tgt_schema_name=kwargs.get("tgt_schema_name"), top_class=kwargs.get("top_class")) schema = tr_result.schema def sql_name(n: str) -> str: if not naming_policy or naming_policy == SqlNamingPolicy.preserve: return n if naming_policy == SqlNamingPolicy.underscore: return underscore(n) if naming_policy == SqlNamingPolicy.camelcase: return camelcase(n) msg = f"Unknown: {naming_policy}" raise Exception(msg) def strip_newlines(txt: str | None) -> str: if txt is None: return "" return txt.replace("\n", "") # Currently SQLite dialect in SQLA does not generate comments; see # https://github.com/sqlalchemy/sqlalchemy/issues/1546#issuecomment-1067389172 # As a workaround we add these as "--" comments via direct string manipulation include_comments = self.dialect == "sqlite" sv = SchemaView(schema) # Iterate through the attributes in each class, creating Column objects. # This includes generating the appropriate column name, converting the range # into an SQL type, and adding a foreign key notation if appropriate. for cn, c in schema.classes.items(): if include_comments: if c.abstract: ddl_str += f'-- # Abstract Class: "{cn}" Description: "{strip_newlines(c.description)}"\n' else: ddl_str += f'-- # Class: "{cn}" Description: "{strip_newlines(c.description)}"\n' pk_slot = sv.get_identifier_slot(cn) if c.attributes: cols = [] for sn, s in c.attributes.items(): is_pk = "primary_key" in s.annotations if pk_slot: is_pk = sn == pk_slot.name # else: # is_pk = True ## TODO: use unique key args = [] if s.range in schema.classes and self.use_foreign_keys: fk = sql_name(self.get_id_or_key(s.range, sv)) args = [ForeignKey(fk)] field_type = self.get_sql_range(s, schema) col = Column( sql_name(sn), field_type, *args, primary_key=is_pk, nullable=not s.required, ) if include_comments: ddl_str += f"-- * Slot: {sn} Description: {strip_newlines(s.description)}\n" if s.description: col.comment = s.description cols.append(col) # convert unique_keys into a uniqueness constraint for the table for uc in c.unique_keys.values(): sql_names = [sql_name(sn) if sn in c.attributes else None for sn in uc.unique_key_slots] if any(sn is None for sn in sql_names): continue sql_uc = UniqueConstraint(*sql_names) cols.append(sql_uc) if not c.abstract or (c.abstract and self.generate_abstract_class_ddl): Table(sql_name(cn), schema_metadata, *cols, comment=str(c.description)) schema_metadata.create_all(engine) return ddl_str def get_oracle_sql_range(self, slot: SlotDefinition) -> Text | VARCHAR2 | None: """ Generate the appropriate range for Oracle SQL. :param slot: the slot under examination :type slot: SlotDefinition :return: appropriate type for Oracle SQL or None :rtype: Text | VARCHAR2 | None """ slot_range = slot.range if not slot_range: return None if slot_range.lower() in ["str", "string"]: # string type data should be represented as a VARCHAR2 return VARCHAR2(self.default_length_oracle) # check whether the slot range matches the regex "VARCHAR2?(\((\d+)\))?" match = re.match(VARCHAR_REGEX, slot_range) if match: # match.group(2) is the digits in brackets after VARCHAR # i.e. a defined length for the VARCHAR if match.group(2) and int(match.group(2)) > ORACLE_MAX_VARCHAR_LENGTH: msg = ( "Warning: range exceeds maximum Oracle VARCHAR length, " f"CLOB type will be returned: {slot_range} for {slot.name} = {slot.range}" ) logger.info(msg) return Text() # set the length to either the varchar length (as defined in the slot_range) # or the default return VARCHAR2(match.group(2) or self.default_length_oracle) # use standard SQL range matching for anything else return None def get_sql_range(self, slot: SlotDefinition, schema: SchemaDefinition = None): """Get the slot range as a SQL Alchemy column type.""" slot_range = slot.range if self.dialect == "oracle": range_type = self.get_oracle_sql_range(slot) if range_type: return range_type if slot_range is None: return Text() # if no SchemaDefinition is explicitly provided as an argument # then simply use the schema that is provided to the SQLTableGenerator() object if not schema: schema = self.schema sv = SchemaView(schema) if slot_range in sv.all_classes(): # FK type should be the same as the identifier of the foreign key fk = sv.get_identifier_slot(slot_range) if fk: return self.get_sql_range(fk, sv.schema) return Text() if slot_range in sv.all_enums(): e = sv.all_enums()[slot_range] if e.permissible_values is not None: vs = [str(v) for v in e.permissible_values] return Enum(name=e.name, *vs) if slot_range in METAMODEL_TYPE_TO_BASE: range_base = METAMODEL_TYPE_TO_BASE[slot_range] elif slot_range in sv.all_types(): range_base = sv.all_types()[slot_range].base else: logger.error(f"Unknown range: {slot_range} for {slot.name} = {slot.range}") return Text() if range_base in RANGEMAP: return RANGEMAP[range_base] logger.error(f"Unknown range base: {range_base} for {slot.name} = {slot.range}") return Text() @staticmethod def get_id_or_key(cn: str, sv: SchemaView) -> str: """Given a named class, retrieve the identifier or key slot.""" pk = sv.get_identifier_slot(cn, use_key=True) if pk is None: msg = f"No PK for {cn}" raise Exception(msg) pk_name = pk.alias if pk.alias else pk.name return f"{cn}.{pk_name}"
@shared_arguments(SQLTableGenerator) @click.command(name="sqltables") @click.option( "--dialect", default="sqlite", show_default=True, help="SQL-Alchemy dialect, e.g. sqlite, mysql+odbc", ) @click.option("--sqla-file", help="Path to sqlalchemy generated python") @click.option( "--relmodel-output", help="Path to intermediate LinkML YAML of transformed relational model", ) @click.option("--python-import", help="Python import header for generated sql-alchemy code") @click.option( "--use-foreign-keys/--no-use-foreign-keys", default=True, show_default=True, help="Emit FK declarations", ) @click.option( "--default_length_oracle", default=ORACLE_MAX_VARCHAR_LENGTH, show_default=True, help="Default length of varchar based arguments for oracle dialects", ) @click.option( "--generate_abstract_class_ddl", default=True, show_default=True, help="A manual override to omit the abstract classes, set to true as a default for testing sake", ) @click.version_option(__version__, "-V", "--version") def cli( yamlfile: str, relmodel_output: str, sqla_file: str | None = None, python_import: str = None, dialect: str | None = None, use_foreign_keys: bool = True, **args, ): """Generate SQL DDL representation.""" if relmodel_output: sv = SchemaView(yamlfile) rtr = RelationalModelTransformer(sv) if not use_foreign_keys: rtr.foreign_key_policy = ForeignKeyPolicy.NO_FOREIGN_KEYS rtr_result = rtr.transform("foo") relmodel_schema = rtr_result.schema yaml_dumper.dump(relmodel_schema, to_file=relmodel_output) gen = SQLTableGenerator(yamlfile, use_foreign_keys=use_foreign_keys, **args) if dialect: gen.dialect = dialect print(gen.generate_ddl()) if sqla_file is not None: raise NotImplementedError("SQLAGen not implemented") if __name__ == "__main__": cli()