import csv
import inspect
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from types import ModuleType
from typing import Any, List, Optional, Type, Union
import click
import linkml_runtime.linkml_model.meta as metamodel
from linkml_runtime import SchemaView
from linkml_runtime.dumpers import yaml_dumper
from linkml_runtime.linkml_model import SchemaDefinition
from linkml_runtime.utils.compile_python import compile_python
from linkml_runtime.utils.enumerations import EnumDefinitionImpl
from linkml_runtime.utils.formatutils import underscore
from linkml_runtime.utils.introspection import package_schemaview
from linkml_runtime.utils.yamlutils import YAMLRoot
from pydantic import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.ext.associationproxy import _AssociationCollection
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from linkml._version import __version__
from linkml.generators.pythongen import PythonGenerator
from linkml.generators.sqlalchemygen import SQLAlchemyGenerator, TemplateEnum
from linkml.generators.sqltablegen import SQLTableGenerator
from linkml.utils import datautils, validation
from linkml.utils.datautils import (
_get_context,
_get_format,
_is_xsv,
dumpers_loaders,
get_dumper,
get_loader,
infer_index_slot,
infer_root_class,
)
[docs]@dataclass
class SQLStore:
"""
A wrapper for a SQLite database.
This provides two core operations for storing and retrieving data
- :meth:`dump`
- :meth:`load`
The wrapper transparently will take care of:
- mapping your LinkML schema into SQL Tables
- creating a SQL Alchemy ORM layer
- mapping your data/objects in any LinkML compliant data format (json. yaml, rdf) into ORM objects
"""
schema: Optional[Union[str, Path, SchemaDefinition]] = None
schemaview: Optional[SchemaView] = None
engine: Optional[Engine] = None
database_path: Union[str, Path] = None
use_memory: bool = False
"""https://docs.sqlalchemy.org/en/20/dialects/sqlite.html#using-a-memory-database-in-multiple-threads"""
module: Optional[ModuleType] = None
native_module: Optional[ModuleType] = None
include_schema_in_database: bool = False
def __post_init__(self):
if self.database_path is None and not self.use_memory:
raise ValueError("Must have database path or use_memory must be True")
if self.schema is not None and self.schemaview is None:
self.schemaview = SchemaView(self.schema)
[docs] def db_exists(self, create=True, force=False) -> Optional[str]:
"""
check if database exists, optionally create if not present
:param create: create if does not exist
:param force: recreate database, destroying any content if previously present
:return: path
"""
if self.use_memory:
db_exists = False
else:
if not self.database_path:
raise ValueError("database_path not set")
db_exists = os.path.exists(self.database_path)
if force or (create and not db_exists):
if self.use_memory:
self.engine = create_engine(
"sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool
)
else:
if force:
Path(self.database_path).unlink(missing_ok=True)
self.engine = create_engine(f"sqlite:///{self.database_path}")
with self.engine.connect() as con:
ddl = SQLTableGenerator(self.schema).generate_ddl()
con.connection.executescript(ddl)
if self.include_schema_in_database:
metamodel_sv = package_schemaview(metamodel.__name__)
meta_ddl = SQLTableGenerator(metamodel_sv.schema).generate_ddl()
con.connection.executescript(meta_ddl)
if self.use_memory:
return None
if not os.path.exists(self.database_path):
raise ValueError(f"No database: {self.database_path}")
return self.database_path
[docs] def compile(self) -> ModuleType:
"""
Compile SQLAlchemy object model
Uses Declarative by default.
Note that the SQLA model is different from the native dataclass model
:return: compiled module
"""
gen = SQLAlchemyGenerator(self.schema)
self.module = gen.compile_sqla(template=TemplateEnum.DECLARATIVE)
return self.module
[docs] def compile_native(self) -> ModuleType:
"""
Compile native python object model
:return: compiled module
"""
gen = PythonGenerator(yaml_dumper.dumps(self.schema))
self.native_module = gen.compile_module()
return self.native_module
[docs] def load(self, target_class: Union[str, Type[YAMLRoot]] = None) -> YAMLRoot:
"""
Loads a LinkML object from the wrapped SQLite database
:param target_class:
:return:
"""
return self.load_all(target_class=target_class)[0]
def load_all(self, target_class: Union[str, Type[YAMLRoot]] = None) -> List[YAMLRoot]:
if target_class is None:
target_class_name = infer_root_class(self.schemaview)
target_class = self.native_module.__dict__[target_class_name]
if self.engine is None:
self.engine = create_engine(f"sqlite:///{self.database_path}")
session_class = sessionmaker(bind=self.engine)
with session_class.begin() as session:
typ = self.to_sqla_type(target_class)
q = session.query(typ)
all_objs = q.all()
tmp = self.from_sqla(all_objs)
return tmp
[docs] def dump(self, element: YAMLRoot, append=True) -> None:
"""
Store an element in the database
:param element:
:param append:
:return:
"""
if self.engine is None:
raise ValueError("Must set self.engine")
session_class = sessionmaker(bind=self.engine)
with session_class.begin() as session:
nu_obj = self.to_sqla(element)
if not append:
session.query(type(nu_obj)).delete()
session.add(nu_obj)
session.commit()
def to_sqla_type(self, target_class: Type[YAMLRoot]) -> Any:
for n, nu_typ in inspect.getmembers(self.module):
if n == target_class.__name__:
return nu_typ
raise ValueError(f"Could not find: {target_class}")
def from_sqla_type(self, typ) -> Any:
for n, nu_typ in inspect.getmembers(self.native_module):
if n == typ.__name__:
return nu_typ
raise ValueError(f"Could not find: {typ}")
[docs] def to_sqla(self, obj: Union[YAMLRoot, list]) -> Any:
"""
Translate native LinkML object to SQLAlchemy declarative module
:param obj:
:return:
"""
if self.module is None:
self.compile()
if isinstance(obj, list):
nu_obj = [self.to_sqla(x) for x in obj]
if nu_obj:
return nu_obj
else:
return None
elif isinstance(obj, dict):
nu_obj = {}
for k, v in obj.items():
v2 = self.to_sqla(v)
if v2 is not None:
nu_obj[k] = v2
if nu_obj:
return nu_obj
else:
return None
# elif isinstance(obj, PermissibleValue):
# return str(obj.text)
elif isinstance(obj, EnumDefinitionImpl):
return str(obj)
elif isinstance(obj, YAMLRoot) or isinstance(obj, BaseModel):
typ = type(obj)
inst_args = {}
for k, v in vars(obj).items():
v2 = self.to_sqla(v)
if v2 is not None:
inst_args[k] = v2
for n, nu_typ in inspect.getmembers(self.module):
# TODO: make more efficient
if n == typ.__name__:
nu_obj = nu_typ(**inst_args)
return nu_obj
raise ValueError(f"Cannot find {typ.__name__} in {self.module}")
else:
return obj
[docs] def from_sqla(self, obj: Any) -> Optional[Union[YAMLRoot, List[YAMLRoot]]]:
"""
Translate from SQLAlchemy declarative module to native LinkML
:param obj: sqla object
:return: native dataclass object
"""
typ = type(obj)
nm = self.schemaview.class_name_mappings()
if typ.__name__ in nm:
cls = nm[typ.__name__]
else:
cls = None
if isinstance(obj, list) or isinstance(obj, _AssociationCollection):
nu_obj = [self.from_sqla(x) for x in obj]
if nu_obj:
return nu_obj
else:
return None
elif cls:
inst_args = {}
for sn in self.schemaview.class_slots(cls.name):
sn = underscore(sn)
v = getattr(obj, sn, None)
v2 = self.from_sqla(v)
if v2 is not None and v2 != [] and v2 != {}:
inst_args[sn] = v2
for n, nu_typ in inspect.getmembers(self.native_module):
# TODO: make more efficient
if n == typ.__name__:
nu_obj = nu_typ(**inst_args)
return nu_obj
raise ValueError(f"Cannot find {typ.__name__} in {self.native_module}")
else:
return obj
@click.group(name="sqldb")
@click.option("-v", "--verbose", count=True)
@click.option("-q", "--quiet")
@click.option(
"--csv-field-size-limit",
type=int,
help="""
Increase the default limit for maximum field size.
See https://docs.python.org/3/library/csv.html#csv.field_size_limit""",
)
@click.version_option(__version__, "-V", "--version")
def main(verbose: int, quiet: bool, csv_field_size_limit: int):
"""Run the LinkML SQL CLI."""
if verbose >= 2:
logging.basicConfig(level=logging.DEBUG)
elif verbose == 1:
logging.basicConfig(level=logging.INFO)
else:
logging.basicConfig(level=logging.WARNING)
if quiet:
logging.basicConfig(level=logging.ERROR)
if csv_field_size_limit:
csv.field_size_limit(csv_field_size_limit)
@main.command()
@click.option("--module", "-m", help="Path to python datamodel module")
@click.option("--db", "-D", help="Path to SQLite database file")
@click.option(
"--input-format",
"-f",
type=click.Choice(list(dumpers_loaders.keys())),
help="Input format. Inferred from input suffix if not specified",
)
@click.option(
"--target-class",
"-C",
help="name of class in datamodel that the root node instantiates",
)
@click.option("--index-slot", "-S", help="top level slot. Required for CSV dumping/loading")
@click.option("--schema", "-s", help="Path to schema specified as LinkML yaml")
@click.option(
"--validate/--no-validate",
default=True,
show_default=True,
help="Validate against the schema",
)
@click.option(
"--force/--no-force",
default=True,
show_default=True,
help="Force creation of a database if it does not exist",
)
@click.option(
"--glob/--no-glob",
default=False,
show_default=True,
help="Treat input as a quoted glob expression, e.g. 'data/*.json'",
)
@click.argument("inputs", nargs=-1)
def dump(
inputs,
module,
db,
target_class,
input_format=None,
schema=None,
validate=None,
force: bool = None,
glob: bool = None,
index_slot=None,
) -> None:
"""
Dumps data to a SQL store
Examples:
linkml-sqldb dump -s personinfo.yaml -D my_person_database.db personinfo_data01.yaml
"""
if module is None:
if schema is None:
raise Exception("must pass one of module OR schema")
else:
python_module = PythonGenerator(schema).compile_module()
else:
python_module = compile_python(module)
if schema is not None:
sv = SchemaView(schema)
if target_class is None:
if sv is None:
raise ValueError("Must specify schema if not target class is specified")
target_class = infer_root_class(sv)
if target_class is None:
raise Exception("target class not specified and could not be inferred")
py_target_class = python_module.__dict__[target_class]
endpoint = SQLStore(schema, database_path=db, include_schema_in_database=False)
endpoint.native_module = python_module
endpoint.db_exists(force=force)
endpoint.compile()
if glob:
import glob
inputs = [item for input in inputs for item in glob.glob(input)]
for input in inputs:
logging.info(f"Loading: {input}")
input_format = _get_format(input, input_format)
loader = get_loader(input_format)
inargs = {}
if datautils._is_rdf_format(input_format):
if sv is None:
raise Exception("Must pass schema arg")
inargs["schemaview"] = sv
inargs["fmt"] = input_format
if _is_xsv(input_format):
if index_slot is None:
index_slot = infer_index_slot(sv, target_class)
if index_slot is None:
raise Exception("--index-slot is required for CSV input")
inargs["index_slot"] = index_slot
inargs["schema"] = schema
obj = loader.load(source=input, target_class=py_target_class, **inargs)
if validate:
if schema is None:
raise Exception("--schema must be passed in order to validate. Suppress with --no-validate")
# TODO: use validator framework
validation.validate_object(obj, schema)
endpoint.dump(obj)
@main.command()
@click.option("--module", "-m", help="Path to python datamodel module")
@click.option("--db", "-D", help="Path to SQLite database file")
@click.option("--output", "-o", help="Path to output file")
@click.option(
"--output-format",
"-t",
type=click.Choice(list(dumpers_loaders.keys())),
help="Output format. Inferred from output suffix if not specified",
)
@click.option(
"--target-class",
"-C",
help="name of class in datamodel that the root node instantiates",
)
@click.option("--index-slot", "-S", help="top level slot. Required for CSV dumping/loading")
@click.option("--schema", "-s", help="Path to schema specified as LinkML yaml")
@click.option(
"--validate/--no-validate",
default=True,
show_default=True,
help="Validate against the schema",
)
@click.option(
"--force/--no-force",
default=True,
show_default=True,
help="Force creation of a database if it does not exist",
)
def load(
output,
output_format,
module,
db,
target_class,
input_format=None,
schema=None,
validate=None,
force: bool = None,
index_slot=None,
) -> None:
"""
Loads data from a SQL store
Examples:
linkml-sqldb load -s personinfo.yaml -D my_person_database.db -o my_data.yaml
"""
if module is None:
if schema is None:
raise Exception("must pass one of module OR schema")
else:
python_module = PythonGenerator(schema).compile_module()
else:
python_module = compile_python(module)
if schema is not None:
sv = SchemaView(schema)
if target_class is not None:
py_target_class = python_module.__dict__[target_class]
else:
py_target_class = None
endpoint = SQLStore(schema, database_path=db, include_schema_in_database=False)
endpoint.native_module = python_module
endpoint.compile()
obj = endpoint.load(py_target_class)
output_format = _get_format(output, output_format, default="json")
outargs = {}
if output_format == "json-ld":
if schema is not None:
context = [_get_context(schema)]
else:
raise Exception("Must pass in context OR schema for RDF output")
if context:
outargs["contexts"] = list(context)
outargs["fmt"] = "json-ld"
if output_format == "rdf" or output_format == "ttl":
if sv is None:
raise Exception("Must pass schema arg")
outargs["schemaview"] = sv
if _is_xsv(output_format):
if index_slot is None:
index_slot = infer_index_slot(sv, target_class)
if index_slot is None:
raise Exception("--index-slot is required for CSV output")
outargs["index_slot"] = index_slot
outargs["schema"] = schema
dumper = get_dumper(output_format)
if output is not None:
dumper.dump(obj, output, **outargs)
else:
print(dumper.dumps(obj, **outargs))
if __name__ == "__main__":
main()