Source code for linkml_store.api.collection

"""A structure for representing collections of similar objects."""

import hashlib
import logging
from collections import defaultdict
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    ClassVar,
    Dict,
    Generic,
    Iterable,
    Iterator,
    List,
    Optional,
    TextIO,
    Tuple,
    Type,
    Union,
)

import numpy as np
from linkml_runtime import SchemaView
from linkml_runtime.linkml_model import ClassDefinition, SlotDefinition
from linkml_runtime.linkml_model.meta import ArrayExpression
from pydantic import BaseModel

from linkml_store.api.types import DatabaseType
from linkml_store.index import get_indexer
from linkml_store.utils.format_utils import load_objects, load_objects_from_url
from linkml_store.utils.object_utils import clean_empties
from linkml_store.utils.patch_utils import PatchDict, apply_patches_to_list, patches_from_objects_lists

try:
    from linkml.validator.report import ValidationResult
except ImportError:
    ValidationResult = None

from linkml_store.api.config import CollectionConfig
from linkml_store.api.queries import Query, QueryResult
from linkml_store.index.indexer import Indexer

if TYPE_CHECKING:
    from linkml_store.api.database import Database

logger = logging.getLogger(__name__)

OBJECT = Union[Dict[str, Any], BaseModel, Type]

DEFAULT_FACET_LIMIT = 100
IDENTIFIER = str
FIELD_NAME = str


[docs] class Collection(Generic[DatabaseType]): """ A collection is an organized set of objects of the same or similar type. - For relational databases, a collection is typically a table - For document databases such as MongoDB, a collection is the native type - For a file system, a collection could be a single tabular file such as Parquet or CSV. Collection objects are typically not created directly - instead they are generated from a parent :class:`.Database` object: >>> from linkml_store import Client >>> client = Client() >>> db = client.attach_database("duckdb", alias="test") >>> collection = db.create_collection("Person") >>> objs = [{"id": "P1", "name": "John", "age_in_years": 30}, {"id": "P2", "name": "Alice", "age_in_years": 25}] >>> collection.insert(objs) """ # name: str parent: Optional[DatabaseType] = None _indexers: Optional[Dict[str, Indexer]] = None _initialized: Optional[bool] = None # hidden: Optional[bool] = False metadata: Optional[CollectionConfig] = None default_index_name: ClassVar[str] = "simple"
[docs] def __init__( self, name: str, parent: Optional["Database"] = None, metadata: Optional[CollectionConfig] = None, **kwargs ): self.parent = parent if metadata: self.metadata = metadata else: self.metadata = CollectionConfig(type=name, **kwargs) if not self.metadata.alias: self.metadata.alias = name if not self.metadata.type: self.metadata.type = name
# if name is not None and self.metadata.name is not None and name != self.metadata.name: # raise ValueError(f"Name mismatch: {name} != {self.metadata.name}") @property def hidden(self) -> bool: """ True if the collection is hidden. An example of a hidden collection is a collection that indexes another collection :return: True if the collection is hidden """ # return self.metadata.hidden @property def target_class_name(self): """ Return the name of the class that this collection represents This MUST be a LinkML class name >>> from linkml_store import Client >>> client = Client() >>> db = client.attach_database("duckdb", alias="test") >>> collection = db.create_collection("Person", alias="persons") >>> collection.target_class_name 'Person' >>> collection = db.create_collection("Organization") >>> collection.target_class_name 'Organization' >>> collection.alias 'Organization' :return: name of the class which members of this collection instantiate """ # TODO: this is a shim layer until we can normalize on this if self.metadata.type: return self.metadata.type return self.alias @property def alias(self): """ Return the primary name/alias used for the collection. This MAY be the name of the LinkML class, but it may be desirable to have an alias, for example "persons" which collects all instances of class Person. >>> from linkml_store import Client >>> client = Client() >>> db = client.attach_database("duckdb", alias="test") >>> collection = db.create_collection("Person", alias="persons") >>> collection.alias 'persons' If no explicit alias is provided, then the target class name is used: >>> from linkml_store import Client >>> client = Client() >>> db = client.attach_database("duckdb", alias="test") >>> collection = db.create_collection("Person") >>> collection.alias 'Person' The alias SHOULD be used for Table names in SQL. For nested data, the alias SHOULD be used as the key; e.g .. code-block:: json { "persons": [ { "name": "Alice" }, { "name": "Bob" } ] } :return: """ # TODO: this is a shim layer until we can normalize on this if self.metadata.alias: return self.metadata.alias return self.target_class_name
[docs] def replace(self, objs: Union[OBJECT, List[OBJECT]], **kwargs): """ Replace entire collection with objects. >>> from linkml_store import Client >>> client = Client() >>> db = client.attach_database("duckdb", alias="test") >>> collection = db.create_collection("Person") >>> objs = [{"id": "P1", "name": "John", "age_in_years": 30}, {"id": "P2", "name": "Alice", "age_in_years": 25}] >>> collection.insert(objs) :param objs: :param kwargs: :return: """ self.delete_where({}) self.insert(objs, **kwargs)
[docs] def insert(self, objs: Union[OBJECT, List[OBJECT]], **kwargs): """ Add one or more objects to the collection. >>> from linkml_store import Client >>> client = Client() >>> db = client.attach_database("duckdb", alias="test") >>> collection = db.create_collection("Person") >>> objs = [{"id": "P1", "name": "John", "age_in_years": 30}, {"id": "P2", "name": "Alice", "age_in_years": 25}] >>> collection.insert(objs) :param objs: :param kwargs: :return: """ raise NotImplementedError
def _pre_query_hook(self, query: Optional[Query] = None, **kwargs): logger.info(f"Pre-query hook (state: {self._initialized}; Q= {query}") if not self._initialized: self._materialize_derivations() self._initialized = True def _pre_insert_hook(self, objs: List[OBJECT], **kwargs): if self.metadata.validate_modifications: errors = list(self.iter_validate_collection(objs)) if errors: raise ValueError(f"Validation errors: {errors}") def _post_insert_hook(self, objs: List[OBJECT], **kwargs): self._initialized = True patches = [{"op": "add", "path": "/0", "value": obj} for obj in objs] self._broadcast(patches, **kwargs) self._post_modification_hook(**kwargs) def _post_delete_hook(self, **kwargs): self._post_modification_hook(**kwargs) def _post_modification_hook(self, **kwargs): for indexer in self.indexers.values(): ix_collection_name = self.get_index_collection_name(indexer) ix_collection = self.parent.get_collection(ix_collection_name) # Currently updating the source triggers complete reindexing # TODO: make this more efficient by only deleting modified ix_collection.delete_where({})
[docs] def delete(self, objs: Union[OBJECT, List[OBJECT]], **kwargs) -> Optional[int]: """ Delete one or more objects from the collection. First let's set up a collection: >>> from linkml_store import Client >>> client = Client() >>> db = client.attach_database("duckdb", alias="test") >>> collection = db.create_collection("Person") >>> objs = [{"id": "P1", "name": "John", "age_in_years": 30}, {"id": "P2", "name": "Alice", "age_in_years": 25}] >>> collection.insert(objs) >>> collection.find({}).num_rows 2 Now let's delete an object: >>> collection.delete(objs[0]) >>> collection.find({}).num_rows 1 Deleting the same object again should have no effect: >>> collection.delete(objs[0]) >>> collection.find({}).num_rows 1 :param objs: :param kwargs: :return: """ raise NotImplementedError
[docs] def delete_where(self, where: Optional[Dict[str, Any]] = None, missing_ok=True, **kwargs) -> Optional[int]: """ Delete objects that match a query. First let's set up a collection: >>> from linkml_store import Client >>> client = Client() >>> db = client.attach_database("duckdb", alias="test") >>> collection = db.create_collection("Person") >>> objs = [{"id": "P1", "name": "John", "age_in_years": 30}, {"id": "P2", "name": "Alice", "age_in_years": 25}] >>> collection.insert(objs) Now let's delete an object: >>> collection.delete_where({"id": "P1"}) >>> collection.find({}).num_rows 1 Match everything: >>> collection.delete_where({}) >>> collection.find({}).num_rows 0 :param where: where conditions :param missing_ok: if True, do not raise an error if the collection does not exist :param kwargs: :return: number of objects deleted (or -1 if unsupported) """ raise NotImplementedError
[docs] def update(self, objs: Union[OBJECT, List[OBJECT]], **kwargs): """ Update one or more objects in the collection. :param objs: :param kwargs: :return: """ raise NotImplementedError
def _create_query(self, **kwargs) -> Query: return Query(from_table=self.alias, **kwargs)
[docs] def query(self, query: Query, **kwargs) -> QueryResult: """ Run a query against the collection. First let's load a collection: >>> from linkml_store import Client >>> from linkml_store.utils.format_utils import load_objects >>> client = Client() >>> db = client.attach_database("duckdb") >>> collection = db.create_collection("Country") >>> objs = load_objects("tests/input/countries/countries.jsonl") >>> collection.insert(objs) Now let's run a query: TODO :param query: :param kwargs: :return: """ self._pre_query_hook() return self.parent.query(query, **kwargs)
[docs] def query_facets( self, where: Optional[Dict] = None, facet_columns: List[str] = None, facet_limit=DEFAULT_FACET_LIMIT, **kwargs ) -> Dict[str, List[Tuple[Any, int]]]: """ Run a query to get facet counts for one or more columns. This function takes a database connection, a Query object, and a list of column names. It generates and executes a facet count query for each specified column and returns the results as a dictionary where the keys are the column names and the values are pandas DataFrames containing the facet counts. The facet count query is generated by modifying the original query's WHERE clause to exclude conditions directly related to the facet column. This allows for counting the occurrences of each unique value in the facet column while still applying the other filtering conditions. :param con: A DuckDB database connection. :param query: A Query object representing the base query. :param facet_columns: A list of column names to get facet counts for. :param facet_limit: :return: A dictionary where keys are column names and values are tuples containing the facet counts for each unique value in the respective column. """ raise NotImplementedError
[docs] def get(self, ids: Optional[List[IDENTIFIER]], **kwargs) -> QueryResult: """ Get one or more objects by ID. :param ids: :param kwargs: :return: """ id_field = self.identifier_attribute_name if not id_field: raise ValueError(f"No identifier for {self.name}") if len(ids) == 1: return self.find({id_field: ids[0]}) else: return self.find({id_field: {"$in": ids}})
[docs] def get_one(self, id: IDENTIFIER, **kwargs) -> Optional[OBJECT]: """ Get one object by ID. :param id: :param kwargs: :return: """ if not id: raise ValueError("Must pass an ID") id_field = self.identifier_attribute_name if not id_field: raise ValueError(f"No identifier for {self.name}") w = {id_field: id} qr = self.find(w) if qr.num_rows == 1: return qr.rows[0] return None
[docs] def find(self, where: Optional[Any] = None, **kwargs) -> QueryResult: """ Find objects in the collection using a where query. As an example, first load a collection: >>> from linkml_store import Client >>> from linkml_store.utils.format_utils import load_objects >>> client = Client() >>> db = client.attach_database("duckdb") >>> collection = db.create_collection("Country") >>> objs = load_objects("tests/input/countries/countries.jsonl") >>> collection.insert(objs) Now let's find all objects: >>> qr = collection.find({}) >>> qr.num_rows 20 We can do a more restrictive query: >>> qr = collection.find({"code": "FR"}) >>> qr.num_rows 1 >>> qr.rows[0]["name"] 'France' :param where: :param kwargs: :return: """ query = self._create_query(where_clause=where) self._pre_query_hook(query) return self.query(query, **kwargs)
[docs] def find_iter(self, where: Optional[Any] = None, page_size=100, **kwargs) -> Iterator[OBJECT]: """ Find objects in the collection using a where query. :param where: :param kwargs: :return: """ total_rows = None offset = 0 if page_size < 1: raise ValueError(f"Invalid page size: {page_size}") while True: qr = self.find(where=where, offset=offset, limit=page_size, **kwargs) if total_rows is None: total_rows = qr.num_rows if not qr.rows: return for row in qr.rows: yield row offset += page_size if offset >= total_rows: break return
[docs] def search( self, query: str, where: Optional[Any] = None, index_name: Optional[str] = None, limit: Optional[int] = None, mmr_relevance_factor: Optional[float] = None, **kwargs, ) -> QueryResult: """ Search the collection using a text-based index index. Example: >>> from linkml_store import Client >>> from linkml_store.utils.format_utils import load_objects >>> client = Client() >>> db = client.attach_database("duckdb") >>> collection = db.create_collection("Country") >>> objs = load_objects("tests/input/countries/countries.jsonl") >>> collection.insert(objs) Now let's index, using the simple trigram-based index >>> index = get_indexer("simple") >>> _ = collection.attach_indexer(index) Now let's find all objects: >>> qr = collection.search("France") >>> score, top_obj = qr.ranked_rows[0] >>> assert score > 0.1 >>> top_obj["code"] 'FR' :param query: :param where: :param index_name: :param limit: :param kwargs: :return: """ self._pre_query_hook() if index_name is None: if len(self.indexers) == 1: index_name = list(self.indexers.keys())[0] else: logger.warning("Multiple indexes found. Using default index.") index_name = self.default_index_name ix_coll = self.parent.get_collection(self._index_collection_name(index_name)) if index_name not in self.indexers: logger.debug(f"Indexer not found: {index_name} -- creating") ix = get_indexer(index_name) if not self._indexers: self._indexers = {} self._indexers[index_name] = ix ix = self.indexers.get(index_name) if not ix: raise ValueError(f"No index named {index_name}") logger.debug(f"Using indexer {type(ix)} with name {index_name}") if ix_coll.size() == 0: logger.info(f"Index {index_name} is empty; indexing all objects") all_objs = self.find(limit=-1).rows if all_objs: # print(f"Index {index_name} is empty; indexing all objects {len(all_objs)}") self.index_objects(all_objs, index_name, replace=True, **kwargs) assert ix_coll.size() > 0 qr = ix_coll.find(where=where, limit=-1, **kwargs) index_col = ix.index_field # TODO: optimize this for large indexes vector_pairs = [(row, np.array(row[index_col], dtype=float)) for row in qr.rows] results = ix.search(query, vector_pairs, limit=limit, mmr_relevance_factor=mmr_relevance_factor, **kwargs) for r in results: del r[1][index_col] new_qr = QueryResult(num_rows=len(results)) new_qr.ranked_rows = results new_qr.rows = [r[1] for r in results] return new_qr
@property def is_internal(self) -> bool: """ Check if the collection is internal. Internal collections are hidden by default. Examples of internal collections include shadow "index" collections :return: """ if not self.alias: raise ValueError(f"Collection has no alias: {self} // {self.metadata}") return self.alias.startswith("internal__")
[docs] def exists(self) -> Optional[bool]: """ Check if the collection exists. :return: """ cd = self.class_definition() return cd is not None and cd.attributes
[docs] def load_from_source(self, load_if_exists=False): """ Load objects from the source location. :param load_if_exists: :return: """ if not load_if_exists and self.exists(): return metadata = self.metadata if metadata.source: source = metadata.source kwargs = source.arguments or {} if source.local_path: objects = load_objects( metadata.source.local_path, format=source.format, expected_type=source.expected_type, compression=source.compression, select_query=source.select_query, **kwargs, ) elif metadata.source.url: objects = load_objects_from_url( metadata.source.url, format=source.format, expected_type=source.expected_type, compression=source.compression, select_query=source.select_query, **kwargs, ) else: raise ValueError("No source local_path or url provided") self.insert(objects)
def _check_if_initialized(self) -> bool: return self._initialized def _materialize_derivations(self, **kwargs): metadata = self.metadata if not metadata.derived_from: logger.info(f"No metadata for {self.alias}; no derivations") return if self._check_if_initialized(): logger.info(f"Already initialized {self.alias}; no derivations") return parent_db = self.parent client = parent_db.parent # cd = self.class_definition() for derivation in metadata.derived_from: # TODO: optimize this; utilize underlying engine logger.info(f"Deriving from {derivation}") if derivation.database: db = client.get_database(derivation.database) else: db = parent_db if derivation.collection: coll = db.get_collection(derivation.collection) else: coll = self coll.class_definition() source_obj_iter = coll.find_iter(derivation.where or {}) mappings = derivation.mappings if not mappings: raise ValueError(f"No mappings for {self.name}") target_class_name = self.target_class_name from linkml_map.session import Session session = Session() session.set_source_schema(db.schema_view.schema) session.set_object_transformer( { "class_derivations": { target_class_name: { "populated_from": coll.target_class_name, "slot_derivations": mappings, }, } }, ) logger.debug(f"Session Spec: {session.object_transformer}") tr_objs = [] for source_obj in source_obj_iter: tr_obj = session.transform(source_obj, source_type=coll.target_class_name) tr_objs.append(tr_obj) if not tr_objs: raise ValueError(f"No objects derived from {coll.name}") self.insert(tr_objs) self.commit()
[docs] def size(self) -> int: """ Return the number of objects in the collection. :return: The number of objects in the collection. """ return self.find({}, limit=1).num_rows
[docs] def rows_iter(self) -> Iterable[OBJECT]: """ Return an iterator over the objects in the collection. :return: """ yield from self.find({}, limit=-1).rows
[docs] def rows(self) -> List[OBJECT]: """ Return a list of objects in the collection. :return: """ return list(self.rows_iter())
[docs] def ranked_rows(self) -> List[Tuple[float, OBJECT]]: """ Return a list of objects in the collection, with scores. """ return [(n, obj) for n, obj in enumerate(self.rows_iter())]
[docs] def attach_indexer( self, index: Union[Indexer, str], name: Optional[str] = None, auto_index=True, **kwargs ) -> Indexer: """ Attach an index to the collection. As an example, first let's create a collection in a database: >>> from linkml_store import Client >>> from linkml_store.utils.format_utils import load_objects >>> client = Client() >>> db = client.attach_database("duckdb") >>> collection = db.create_collection("Country") >>> objs = load_objects("tests/input/countries/countries.jsonl") >>> collection.insert(objs) We will create two indexes - one that indexes the whole object (default behavior), the other one indexes the name only >>> full_index = get_indexer("simple") >>> full_index.name = "full" >>> name_index = get_indexer("simple", text_template="{name}") >>> name_index.name = "name" >>> _ = collection.attach_indexer(full_index) >>> _ = collection.attach_indexer(name_index) Now let's find objects using the full index, using the string "France". We expect the country France to be the top hit, but the score will be less than zero because we did not match all fields in the object. >>> qr = collection.search("France", index_name="full") >>> score, top_obj = qr.ranked_rows[0] >>> assert score > 0.1 >>> assert score < 0.5 >>> top_obj["code"] 'FR' Now using the name index >>> qr = collection.search("France", index_name="name") >>> score, top_obj = qr.ranked_rows[0] >>> assert score > 0.99 >>> top_obj["code"] 'FR' :param index: :param name: :param auto_index: Automatically index all objects in the collection :param kwargs: :return: """ if isinstance(index, str): index = get_indexer(index) if name: index.name = name if not index.name: index.name = type(index).__name__.lower() index_name = index.name if not index_name: raise ValueError("Index must have a name") if not self._indexers: self._indexers = {} self._indexers[index_name] = index if auto_index: all_objs = self.find(limit=-1).rows logger.info(f"Auto-indexing {len(all_objs)} objects") self.index_objects(all_objs, index_name, replace=True, **kwargs) return index
[docs] def get_index_collection_name(self, indexer: Indexer) -> str: return self._index_collection_name(indexer.name)
def _index_collection_name(self, index_name: str) -> str: """ Create a name for a special collection that holds index data :param index_name: :param indexer: :return: """ return f"internal__index__{self.alias}__{index_name}"
[docs] def index_objects(self, objs: List[OBJECT], index_name: str, replace=False, **kwargs): """ Index a list of objects using a specified index. By default, the indexed objects will be stored in a shadow collection in the same database, with additional fields for the index vector :param objs: :param index_name: e.g. simple, llm :param replace: :param kwargs: :return: """ ix = self._indexers.get(index_name, None) if not ix: raise ValueError(f"No index named {index_name}") ix_coll_name = self._index_collection_name(index_name) ix_coll = self.parent.get_collection(ix_coll_name, create_if_not_exists=True) vectors = [list(float(e) for e in v) for v in ix.objects_to_vectors(objs)] objects_with_ix = [] index_col = ix.index_field for obj, vector in zip(objs, vectors): # TODO: id field objects_with_ix.append({**obj, **{index_col: vector}}) if replace: schema = self.parent.schema_view.schema logger.info(f"Checking if {ix_coll_name} is in {schema.classes.keys()}") if ix_coll_name in schema.classes: ix_coll.delete_where() ix_coll.insert(objects_with_ix, **kwargs) ix_coll.commit()
[docs] def list_index_names(self) -> List[str]: """ Return a list of index names :return: """ return list(self._indexers.keys())
@property def indexers(self) -> Dict[str, Indexer]: """ Return a list of indexers :return: """ return self._indexers if self._indexers else {}
[docs] def peek(self, limit: Optional[int] = None) -> QueryResult: """ Return the first N objects in the collection :param limit: :return: """ q = self._create_query() return self.query(q, limit=limit)
[docs] def class_definition(self) -> Optional[ClassDefinition]: """ Return the class definition for the collection. If no schema has been explicitly set, and the native database does not have a schema, then a schema will be induced from the objects in the collection. :return: """ sv: SchemaView = self.parent.schema_view if sv: cls = sv.get_class(self.target_class_name) # if not cls: # logger.warning(f"{self.target_class_name} not in {sv.all_classes().keys()} ") # cls = sv.schema.classes[self.target_class_name] if cls and not cls.attributes: if not sv.class_induced_slots(cls.name): for att in self._induce_attributes(): cls.attributes[att.name] = att sv.set_modified() return cls return None
def _induce_attributes(self) -> List[SlotDefinition]: result = self.find({}, limit=-1) cd = self.induce_class_definition_from_objects(result.rows, max_sample_size=None) return list(cd.attributes.values()) @property def identifier_attribute_name(self) -> Optional[str]: """ Return the name of the identifier attribute for the collection. AKA the primary key. :return: The name of the identifier attribute, if one exists. """ cd = self.class_definition() if cd: for att in self.parent.schema_view.class_induced_slots(cd.name): if att.identifier: return att.name return None
[docs] def set_identifier_attribute_name(self, name: str): """ Set the name of the identifier attribute for the collection. AKA the primary key. :param name: The name of the identifier attribute. """ cd = self.class_definition() if not cd: raise ValueError(f"Cannot find class definition for {self.target_class_name}") id_att = None candidates = [] sv: SchemaView = self.parent.schema_view cls = sv.get_class(cd.name) existing_id_slot = sv.get_identifier_slot(cls.name) if existing_id_slot: if existing_id_slot.name == name: return existing_id_slot.identifier = False for att in cls.attributes.values(): candidates.append(att.name) if att.name == name: att.identifier = True id_att = att else: att.identifier = False if not id_att: raise ValueError(f"No attribute found with name {name} in {candidates}") sv.set_modified()
[docs] def object_identifier(self, obj: OBJECT, auto=True) -> Optional[IDENTIFIER]: """ Return the identifier for an object. :param obj: :param auto: If True, generate an identifier if one does not exist. :return: """ pk = self.identifier_attribute_name if pk in obj: return obj[pk] elif auto: # TODO: use other unique keys if no primary key as_str = str(obj) md5 = hashlib.md5(as_str.encode()).hexdigest() return md5 else: return None
[docs] def induce_class_definition_from_objects( self, objs: List[OBJECT], max_sample_size: Optional[int] = None ) -> ClassDefinition: """ Induce a class definition from a list of objects. This uses a heuristic procedure to infer the class definition from a list of objects. In general it is recommended you explicitly provide a schema. :param objs: :param max_sample_size: :return: """ # TODO: use schemaview if max_sample_size is None: max_sample_size = 10 if not self.target_class_name: raise ValueError(f"No target_class_name for {self.alias}") cd = ClassDefinition(self.target_class_name) keys = defaultdict(list) for obj in objs[0:max_sample_size]: if isinstance(obj, BaseModel): obj = obj.model_dump() if not isinstance(obj, dict): logger.warning(f"Skipping non-dict object: {obj}") continue for k, v in obj.items(): keys[k].append(v) for k, vs in keys.items(): if k == "_id": continue multivalueds = [] inlineds = [] rngs = [] exact_dimensions_list = [] for v in vs: if v is None: continue if isinstance(v, np.ndarray): rngs.append("float") exact_dimensions_list.append(v.shape) break if isinstance(v, list): # sample first item. TODO: more robust strategy v = v[0] if v else None multivalueds.append(True) elif isinstance(v, dict): pass # TODO: check if this is a nested object or key-value list # v = list(v.values())[0] # multivalueds.append(True) else: multivalueds.append(False) if not v: continue if isinstance(v, str): rng = "string" elif isinstance(v, bool): rng = "boolean" elif isinstance(v, int): rng = "integer" elif isinstance(v, float): rng = "float" elif isinstance(v, dict): rng = None inlineds.append(True) else: # raise ValueError(f"No mappings for {type(v)} // v={v}") rng = None inlineds.append(False) rngs.append(rng) multivalued = any(multivalueds) inlined = any(inlineds) if multivalued and False in multivalueds: raise ValueError(f"Mixed list non list: {vs} // inferred= {multivalueds}") # if not rngs: # raise AssertionError(f"Empty rngs for {k} = {vs}") rng = rngs[0] if rngs else None for other_rng in rngs: coercions = { ("integer", "float"): "float", } if rng != other_rng: if (rng, other_rng) in coercions: rng = coercions[(rng, other_rng)] elif (other_rng, rng) in coercions: rng = coercions[(other_rng, rng)] else: raise ValueError(f"Conflict: {rng} != {other_rng} for {vs}") logger.debug(f"Inducing {k} as {rng} {multivalued} {inlined}") inlined_as_list = inlined and multivalued cd.attributes[k] = SlotDefinition( k, range=rng, multivalued=multivalued, inlined=inlined, inlined_as_list=inlined_as_list ) if exact_dimensions_list: array_expr = ArrayExpression(exact_number_dimensions=len(exact_dimensions_list[0])) cd.attributes[k].array = array_expr sv = self.parent.schema_view sv.schema.classes[self.target_class_name] = cd sv.set_modified() return cd
[docs] def import_data(self, location: Union[Path, str, TextIO], **kwargs): """ Import data from a file or stream :param location: :param kwargs: :return: """ raise NotImplementedError
[docs] def export_data(self, location: Union[Path, str, TextIO], **kwargs): """ Export data to a file or stream :param location: :param kwargs: :return: """ raise NotImplementedError
[docs] def apply_patches(self, patches: List[PatchDict], **kwargs): """ Apply a patch to the collection. Patches conform to the JSON Patch format. :param patches: :param kwargs: :return: """ all_objs = self.find(limit=-1).rows primary_key = self.identifier_attribute_name if not primary_key: raise ValueError(f"No primary key for {self.target_class_name}") new_objs = apply_patches_to_list(all_objs, patches, primary_key=primary_key, **kwargs) self.replace(new_objs)
[docs] def diff(self, other: "Collection", **kwargs) -> List[PatchDict]: """ Diff two collections. :param other: The collection to diff against :param kwargs: :return: """ src_objs = self.find(limit=-1).rows tgt_objs = other.find(limit=-1).rows primary_key = self.identifier_attribute_name if not primary_key: raise ValueError(f"No primary key for {self.target_class_name}") patches_from_objects_lists(src_objs, tgt_objs, primary_key=primary_key) return patches_from_objects_lists(src_objs, tgt_objs, primary_key=primary_key)
[docs] def iter_validate_collection( self, objects: Optional[Iterable[OBJECT]] = None, **kwargs ) -> Iterator["ValidationResult"]: """ Validate the contents of the collection :param kwargs: :param objects: objects to validate :return: iterator over validation results """ from linkml.validator import JsonschemaValidationPlugin, Validator validation_plugins = [JsonschemaValidationPlugin(closed=True)] validator = Validator(self.parent.schema_view.schema, validation_plugins=validation_plugins) cd = self.class_definition() if not cd: raise ValueError(f"Cannot find class definition for {self.target_class_name}") type_designator = None for att in self.parent.schema_view.class_induced_slots(cd.name): if att.designates_type: type_designator = att.name class_name = cd.name if objects is None: objects = self.find_iter(**kwargs) for obj in objects: obj = clean_empties(obj) v_class_name = class_name if type_designator is not None: # TODO: move type designator logic to core linkml this_class_name = obj.get(type_designator) if this_class_name: if ":" in this_class_name: this_class_name = this_class_name.split(":")[-1] v_class_name = this_class_name yield from validator.iter_results(obj, v_class_name)
[docs] def commit(self): """ Commit changes to the collection. :return: """ pass
def _broadcast(self, *args, **kwargs): self.parent.broadcast(self, *args, **kwargs)