import logging
from abc import ABC
from collections import defaultdict
from copy import copy
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Dict,
Generic,
Iterator,
List,
Optional,
Sequence,
Type,
Union,
)
from linkml_store.api.types import CollectionType
from linkml_store.utils.format_utils import Format, load_objects, render_output
from linkml_store.utils.patch_utils import PatchDict
try:
from linkml.validator.report import Severity, ValidationResult
except ImportError:
ValidationResult = None
from linkml_runtime import SchemaView
from linkml_runtime.linkml_model import ClassDefinition, SchemaDefinition
from linkml_store.api.collection import Collection
from linkml_store.api.config import CollectionConfig, DatabaseConfig
from linkml_store.api.queries import Query, QueryResult
if TYPE_CHECKING:
from linkml_store.api.client import Client
logger = logging.getLogger(__name__)
LISTENER = Callable[[Collection, List[PatchDict]], None]
[docs]
class Database(ABC, Generic[CollectionType]):
"""
A Database provides access to named collections of data.
A database object is owned by a :ref:`Client`. The database
object uses a :ref:`handle` to know what kind of external
dataase system to connect to (e.g. duckdb, mongodb). The handle
is a string ``<DatabaseType>:<LocalLocator>``
The
database object may also have an :ref:`alias` that is mapped
to the handle.
Attaching a database
--------------------
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb:///:memory:", alias="test")
We can check the value of the handle:
>>> db.handle
'duckdb:///:memory:'
The alias can be used to retrieve the database object from the client
>>> assert db == client.get_database("test")
Creating a collection
---------------------
>>> collection = db.create_collection("Person")
>>> len(db.list_collections())
1
>>> db.get_collection("Person") == collection
True
>>> objs = [{"id": "P1", "name": "John", "age_in_years": 30}, {"id": "P2", "name": "Alice", "age_in_years": 25}]
>>> collection.insert(objs)
>>> qr = collection.find()
>>> len(qr.rows)
2
>>> qr.rows[0]["id"]
'P1'
>>> qr.rows[1]["name"]
'Alice'
>>> qr = collection.find({"name": "John"})
>>> len(qr.rows)
1
>>> qr.rows[0]["name"]
'John'
"""
_schema_view: Optional[SchemaView] = None
"""Schema for the database. May be transformed."""
_original_schema_view: Optional[SchemaView] = None
"""If a schema must be transformed, then the original is stored here."""
_collections: Optional[Dict[str, Collection]] = None
parent: Optional["Client"] = None
metadata: Optional[DatabaseConfig] = None
collection_class: ClassVar[Optional[Type[Collection]]] = None
listeners: Optional[List[LISTENER]] = None
[docs]
def __init__(self, handle: Optional[str] = None, metadata: Optional[DatabaseConfig] = None, **kwargs):
if metadata:
self.metadata = metadata
else:
self.metadata = DatabaseConfig(handle=handle, **kwargs)
if handle is not None and self.metadata.handle is not None and handle != self.metadata.handle:
raise ValueError(f"Handle mismatch: {handle} != {self.metadata.handle}")
self._initialize_schema()
self._initialize_collections()
def _initialize_schema(self, **kwargs):
db_config = self.metadata
if db_config.schema_location:
schema_location = db_config.schema_location.format(base_dir=self.parent.metadata.base_dir)
logger.info(f"Loading schema from: {schema_location}")
self.load_schema_view(schema_location)
if db_config.schema_dict:
schema_dict = copy(db_config.schema_dict)
if "id" not in schema_dict:
schema_dict["id"] = "tmp"
if "name" not in schema_dict:
schema_dict["name"] = "tmp"
self.set_schema_view(SchemaView(SchemaDefinition(**schema_dict)))
[docs]
def from_config(self, db_config: DatabaseConfig, **kwargs):
"""
Initialize a database from a configuration.
TODO: DEPRECATE
:param db_config: database configuration
:param kwargs: additional arguments
"""
self.metadata = db_config
self._initialize_schema()
self._initialize_collections()
return self
def _initialize_collections(self):
if not self.metadata.collections:
return
for k, collection_config in self.metadata.collections.items():
if collection_config.alias:
if collection_config.alias != k:
raise ValueError(f"Alias mismatch: {collection_config.alias} != {k}")
alias = k
typ = collection_config.type or alias
_collection = self.create_collection(typ, alias=alias, metadata=collection_config)
assert _collection.alias == alias
assert _collection.target_class_name == typ
if collection_config.attributes:
# initialize schema
sv = self.schema_view
cd = ClassDefinition(typ, attributes=collection_config.attributes)
sv.schema.classes[cd.name] = cd
sv.set_modified()
# assert collection.class_definition() is not None
@property
def recreate_if_exists(self) -> bool:
"""
Return whether to recreate the database if it already exists.
:return:
"""
return self.metadata.recreate_if_exists
@property
def handle(self) -> str:
"""
Return the database handle.
Examples:
- ``duckdb:///:memory:``
- ``duckdb:///tmp/test.db``
- ``mongodb://localhost:27017/``
:return:
"""
return self.metadata.handle
@property
def alias(self):
return self.metadata.alias
[docs]
def store(self, obj: Dict[str, Any], **kwargs):
"""
Store an object in the database.
The object is assumed to be a Dictionary of Collections.
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> db.store({"persons": [{"id": "P1", "name": "John", "age_in_years": 30}]})
>>> collection = db.get_collection("persons")
>>> qr = collection.find()
>>> qr.num_rows
1
:param obj: object to store
:param kwargs: additional arguments
"""
sv = self.schema_view
roots = [c for c in sv.all_classes().values() if c.tree_root]
root = roots[0] if roots else None
for k, v in obj.items():
logger.info(f"Storing collection {k}")
if root:
slot = sv.induced_slot(k, root.name)
if not slot:
raise ValueError(f"Cannot determine type for {k}")
else:
slot = None
if isinstance(v, dict):
logger.debug(f"Coercing dict to list: {v}")
v = [v]
if not isinstance(v, list):
continue
if not v:
continue
if slot:
logger.debug(f"Aligning to existing slot: {slot.name} range={slot.range}")
collection = self.get_collection(slot.name, type=slot.range, create_if_not_exists=True)
else:
collection = self.get_collection(k, create_if_not_exists=True)
logger.debug(f"Replacing using {collection.alias} {collection.target_class_name}")
collection.replace(v)
[docs]
def commit(self, **kwargs):
"""
Commit pending changes to the database.
:param kwargs:
:return:
"""
for coll in self.list_collections():
coll.commit()
[docs]
def close(self, **kwargs):
"""
Close the database.
:param kwargs:
:return:
"""
raise NotImplementedError()
@property
def _collection_class(self) -> Type[Collection]:
raise NotImplementedError()
[docs]
def create_collection(
self,
name: str,
alias: Optional[str] = None,
metadata: Optional[CollectionConfig] = None,
recreate_if_exists=False,
**kwargs,
) -> Collection:
"""
Create a new collection in the current database.
The collection must have a *Type*, and may have an *Alias*.
Examples:
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> collection = db.create_collection("Person", alias="persons")
>>> collection.alias
'persons'
>>> collection.target_class_name
'Person'
If alias is not provided, it defaults to the name of the type.
>>> collection = db.create_collection("Organization")
>>> collection.alias
'Organization'
:param name: name of the collection
:param alias: alias for the collection
:param metadata: metadata for the collection
:param recreate_if_exists: recreate the collection if it already exists
:param kwargs: additional arguments
"""
if not name:
raise ValueError(f"Collection name must be provided: alias: {alias} metadata: {metadata}")
collection_cls = self.collection_class
collection = collection_cls(name=name, parent=self, metadata=metadata)
if alias:
collection.metadata.alias = alias
if metadata and metadata.source:
collection.load_from_source()
if metadata and metadata.attributes:
sv = self.schema_view
schema = sv.schema
cd = ClassDefinition(name=metadata.type, attributes=metadata.attributes)
schema.classes[cd.name] = cd
if not self._collections:
self._collections = {}
if not alias:
alias = name
self._collections[alias] = collection
if recreate_if_exists:
logger.debug(f"Recreating collection {collection.alias}")
collection.delete_where({}, missing_ok=True)
return collection
[docs]
def list_collections(self, include_internal=False) -> Sequence[Collection]:
"""
List all collections.
Examples
--------
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> c1 = db.create_collection("Person")
>>> c2 = db.create_collection("Product")
>>> collections = db.list_collections()
>>> len(collections)
2
>>> [c.target_class_name for c in collections]
['Person', 'Product']
:param include_internal: include internal collections
:return: list of collections
"""
if not self._collections:
self.init_collections()
return [c for c in self._collections.values() if include_internal or not c.is_internal]
[docs]
def list_collection_names(self, **kwargs) -> Sequence[str]:
"""
List all collection names.
Examples
--------
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> c1 = db.create_collection("Person")
>>> c2 = db.create_collection("Product")
>>> collection_names = db.list_collection_names()
>>> len(collection_names)
2
>>> collection_names
['Person', 'Product']
"""
return [c.alias for c in self.list_collections(**kwargs)]
[docs]
def get_collection(
self, name: str, type: Optional[str] = None, create_if_not_exists=True, **kwargs
) -> "Collection":
"""
Get a named collection.
Examples
--------
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> collection = db.create_collection("Person")
>>> db.get_collection("Person") == collection
True
>>> db.get_collection("NonExistent", create_if_not_exists=False)
Traceback (most recent call last):
...
KeyError: 'Collection NonExistent does not exist'
:param name: name of the collection
:param type: target class name
:param create_if_not_exists: create the collection if it does not exist
"""
if not self._collections:
logger.debug("Initializing collections")
self.init_collections()
if name not in self._collections.keys():
if create_if_not_exists:
if type is None:
type = name
logger.debug(f"Creating new collection: {name} kwargs: {kwargs}")
self._collections[name] = self.create_collection(type, alias=name, **kwargs)
else:
raise KeyError(f"Collection {name} does not exist")
return self._collections[name]
[docs]
def init_collections(self):
"""
Initialize collections.
TODO: Not typically called directly: consider making this private
:return:
"""
raise NotImplementedError
[docs]
def query(self, query: Query, **kwargs) -> QueryResult:
"""
Run a query against the database.
Examples
--------
>>> from linkml_store.api.client import Client
>>> from linkml_store.api.queries import Query
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> collection = db.create_collection("Person")
>>> collection.insert([{"id": "P1", "name": "John"}, {"id": "P2", "name": "Alice"}])
>>> query = Query(from_table="Person", where_clause={"name": "John"})
>>> result = db.query(query)
>>> len(result.rows)
1
>>> result.rows[0]["id"]
'P1'
:param query:
:param kwargs:
:return:
"""
if query.from_table:
collection = self.get_collection(query.from_table)
return collection.query(query, **kwargs)
else:
raise NotImplementedError(f"Querying without a table is not supported in {self.__class__.__name__}")
@property
def schema_view(self) -> SchemaView:
"""
Return a schema view for the named collection.
If no explicit schema is provided, this will generalize one
Induced schema example:
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> collection = db.create_collection("Person", alias="persons")
>>> collection.insert([{"id": "P1", "name": "John", "age_in_years": 25}])
>>> schema_view = db.schema_view
>>> cd = schema_view.get_class("Person")
>>> cd.attributes["id"].range
'string'
>>> cd.attributes["age_in_years"].range
'integer'
We can reuse the same class:
>>> collection2 = db.create_collection("Person", alias="other_persons")
>>> collection2.class_definition().attributes["age_in_years"].range
'integer'
"""
if not self._schema_view:
self._initialize_schema()
if not self._schema_view:
self._schema_view = self.induce_schema_view()
return self._schema_view
[docs]
def set_schema_view(self, schema_view: Union[str, Path, SchemaView]):
"""
Set the schema view for the database.
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> sv = SchemaView("tests/input/countries/countries.linkml.yaml")
>>> db.set_schema_view(sv)
>>> cd = db.schema_view.schema.classes["Country"]
>>> sorted(cd.slots)
['capital', 'code', 'continent', 'languages', 'name']
>>> induced_slots = {s.name: s for s in sv.class_induced_slots("Country")}
>>> sorted(induced_slots.keys())
['capital', 'code', 'continent', 'languages', 'name']
>>> induced_slots["code"].identifier
True
Creating a new collection will align with the schema view:
>>> collection = db.create_collection("Country", "all_countries")
>>> sorted(collection.class_definition().slots)
['capital', 'code', 'continent', 'languages', 'name']
:param schema_view: can be either a path to the schema, or a SchemaView object
:return:
"""
if isinstance(schema_view, Path):
schema_view = str(schema_view)
if isinstance(schema_view, str):
schema_view = SchemaView(schema_view)
self._schema_view = schema_view
# self._schema_view = SchemaView(schema_view.materialize_derived_schema())
if not self._collections:
return
# align with induced schema
roots = [c for c in schema_view.all_classes().values() if c.tree_root]
if len(roots) == 0:
all_ranges = set()
for cn in schema_view.all_classes():
for slot in schema_view.class_induced_slots(cn):
if slot.range:
all_ranges.add(slot.range)
roots = [
c
for c in schema_view.all_classes().values()
if not all_ranges.intersection(schema_view.class_ancestors(c.name, reflexive=True))
]
if len(roots) == 1:
root = roots[0]
for slot in schema_view.class_induced_slots(root.name):
inlined = slot.inlined or slot.inlined_as_list
if inlined and slot.range:
if slot.name in self._collections:
coll = self._collections[slot.name]
coll.metadata.type = slot.range
[docs]
def load_schema_view(self, path: Union[str, Path]):
"""
Load a schema view from a file.
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> db.load_schema_view("tests/input/countries/countries.linkml.yaml")
>>> sv = db.schema_view
>>> cd = sv.schema.classes["Country"]
>>> sorted(cd.slots)
['capital', 'code', 'continent', 'languages', 'name']
>>> induced_slots = {s.name: s for s in sv.class_induced_slots("Country")}
>>> sorted(induced_slots.keys())
['capital', 'code', 'continent', 'languages', 'name']
>>> induced_slots["code"].identifier
True
Creating a new collection will align with the schema view:
>>> collection = db.create_collection("Country", "all_countries")
>>> sorted(collection.class_definition().slots)
['capital', 'code', 'continent', 'languages', 'name']
:param path:
:return:
"""
if isinstance(path, Path):
path = str(path)
self.set_schema_view(SchemaView(path))
[docs]
def induce_schema_view(self) -> SchemaView:
"""
Induce a schema view from a schema definition.
>>> from linkml_store.api.client import Client
>>> from linkml_store.api.queries import Query
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> collection = db.create_collection("Person")
>>> collection.insert([{"id": "P1", "name": "John", "age_in_years": 25},
... {"id": "P2", "name": "Alice", "age_in_years": 25}])
>>> schema_view = db.induce_schema_view()
>>> cd = schema_view.get_class("Person")
>>> cd.attributes["id"].range
'string'
>>> cd.attributes["age_in_years"].range
'integer'
:return: A schema view
"""
logger.info(f"Inducing schema view for {self.handle}")
from linkml_runtime.utils.schema_builder import SchemaBuilder
sb = SchemaBuilder()
for collection_name in self.list_collection_names():
coll = self.get_collection(collection_name)
sb.add_class(coll.target_class_name)
return SchemaView(sb.schema)
[docs]
def iter_validate_database(self, **kwargs) -> Iterator["ValidationResult"]:
"""
Validate the contents of the database.
An an example, let's create a database with a predefined schema
from the countries.linkml.yaml file:
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> db.load_schema_view("tests/input/countries/countries.linkml.yaml")
Let's introspect the schema to see what slots are applicable for the class "Country":
>>> sv = db.schema_view
>>> for slot in sv.class_induced_slots("Country"):
... print(slot.name, slot.range, slot.required)
name string True
code string True
capital string True
continent string True
languages Language None
Next we'll create a collection, binding it to the target class "Country", and insert
valid data:
>>> collection = db.create_collection("Country", "all_countries")
>>> obj = {"code": "US", "name": "United States", "continent": "North America", "capital": "Washington, D.C."}
>>> collection.insert([obj])
>>> list(db.iter_validate_database())
[]
Now let's insert some invalid data (missing required fields)
>>> collection.insert([{"code": "FR", "name": "France"}])
>>> for r in db.iter_validate_database():
... print(r.message[0:32])
'capital' is a required property
'continent' is a required proper
:param kwargs:
:return: iterator over validation results
"""
for collection in self.list_collections():
yield from collection.iter_validate_collection(**kwargs)
if self.metadata.ensure_referential_integrity:
yield from self._validate_referential_integrity(**kwargs)
def _validate_referential_integrity(self, **kwargs) -> Iterator["ValidationResult"]:
"""
Validate referential integrity of the database.
:param kwargs:
:return: iterator over validation results
"""
sv = self.schema_view
cmap = defaultdict(list)
for collection in self.list_collections():
if not collection.target_class_name:
raise ValueError(f"Collection {collection.name} has no target class")
cmap[collection.target_class_name].append(collection)
for collection in self.list_collections():
cd = collection.class_definition()
induced_slots = sv.class_induced_slots(cd.name)
slot_map = {s.name: s for s in induced_slots}
# rmap = {s.name: s.range for s in induced_slots}
sr_to_coll = {s.name: cmap.get(s.range, []) for s in induced_slots if s.range}
for obj in collection.find_iter():
for k, v in obj.items():
if k not in sr_to_coll:
continue
ref_colls = sr_to_coll[k]
if not ref_colls:
continue
if not isinstance(v, (str, int)):
continue
slot = slot_map[k]
found = False
for ref_coll in ref_colls:
ref_obj = ref_coll.get_one(v)
if ref_obj:
found = True
break
if not found:
yield ValidationResult(
type="ReferentialIntegrity",
severity=Severity.ERROR,
message=f"Referential integrity error: {slot.range} not found",
instantiates=slot.range,
instance=v,
)
[docs]
def drop(self, **kwargs):
"""
Drop the database and all collections.
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> path = Path("/tmp/test.db")
>>> path.parent.mkdir(exist_ok=True, parents=True)
>>> db = client.attach_database(f"duckdb:///{path}")
>>> db.store({"persons": [{"id": "P1", "name": "John", "age_in_years": 30}]})
>>> coll = db.get_collection("persons")
>>> coll.find({}).num_rows
1
>>> db.drop()
>>> db = client.attach_database("duckdb:///tmp/test.db", alias="test")
>>> coll = db.get_collection("persons")
>>> coll.find({}).num_rows
0
:param kwargs: additional arguments
"""
raise NotImplementedError()
[docs]
def import_database(
self,
location: str,
source_format: Optional[Union[str, Format]] = None,
collection_name: Optional[str] = None,
**kwargs,
):
"""
Import a database from a file or location.
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> db.import_database("tests/input/iris.csv", Format.CSV, collection_name="iris")
>>> db.list_collection_names()
['iris']
>>> collection = db.get_collection("iris")
>>> collection.find({}).num_rows
150
:param location: location of the file
:param source_format: source format
:param collection_name: (Optional) name of the collection, for data that is flat
:param kwargs: additional arguments
"""
if isinstance(source_format, str):
source_format = Format(source_format)
if isinstance(source_format, Format):
if source_format.is_dump_format() and source_format in [Format.SQLDUMP_DUCKDB, Format.DUMP_MONGODB]:
# import into a test instance
tmp_handle = source_format.value
client = self.parent
tmp_db = client.attach_database(tmp_handle, alias="tmp")
# TODO: check for infinite recursion
tmp_db.import_database(location, source_format=source_format)
obj = {}
for coll in tmp_db.list_collections():
qr = coll.find({}, limit=-1)
obj[coll.alias] = qr.rows
self.store(obj)
return
objects = load_objects(location, format=source_format)
if collection_name:
collection = self.get_collection(collection_name, create_if_not_exists=True)
collection.insert(objects)
else:
for obj in objects:
self.store(obj)
[docs]
def export_database(self, location: str, target_format: Optional[Union[str, Format]] = None, **kwargs):
"""
Export a database to a file or location.
>>> from linkml_store.api.client import Client
>>> client = Client()
>>> db = client.attach_database("duckdb", alias="test")
>>> db.import_database("tests/input/iris.csv", Format.CSV, collection_name="iris")
>>> db.export_database("/tmp/iris.yaml", Format.YAML)
:param location: location of the file
:param target_format: target format
:param kwargs: additional arguments
"""
obj = {}
if isinstance(target_format, str):
target_format = Format(target_format)
for coll in self.list_collections():
qr = coll.find({}, limit=-1)
obj[coll.alias] = qr.rows
logger.info(f"Exporting object with {len(obj)} collections to {location} in {target_format} format")
if isinstance(target_format, Format):
if target_format.is_dump_format() and target_format in [Format.SQLDUMP_DUCKDB, Format.DUMP_MONGODB]:
tmp_handle = target_format.value
client = self.parent
tmp_db = client.attach_database(tmp_handle, alias="tmp")
tmp_db.store(obj)
# TODO: check for infinite recursion
tmp_db.export_database(location, target_format=target_format)
return
if Path(location).is_dir():
raise ValueError(f"{location} is a directory; cannot write {target_format} to a dir")
with open(location, "w", encoding="utf-8") as stream:
stream.write(render_output(obj, format=target_format))
[docs]
def broadcast(self, source: Collection, patches: List[PatchDict]):
if not self.listeners:
return
for listener in self.listeners:
listener(source, patches)