Source code for linkml.generators.pydanticgen.array

import sys
from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Any, ClassVar, Generic, Iterable, List, Optional, Type, TypeVar, Union, get_args

from linkml_runtime.linkml_model import Element
from linkml_runtime.linkml_model.meta import ArrayExpression, DimensionExpression
from pydantic import VERSION as PYDANTIC_VERSION

from linkml.utils.deprecation import deprecation_warning

if int(PYDANTIC_VERSION[0]) >= 2:
    from pydantic_core import core_schema
else:
    # Support for having pydantic 1 installed in the same environment will be dropped in 1.9.0
    deprecation_warning("pydantic-v1")

if TYPE_CHECKING:
    from pydantic import GetCoreSchemaHandler
    from pydantic_core import CoreSchema

if sys.version_info.minor <= 8:
    from typing_extensions import Annotated
else:
    from typing import Annotated

from linkml.generators.pydanticgen.build import RangeResult
from linkml.generators.pydanticgen.template import ConditionalImport, Import, Imports, ObjectImport
from linkml.utils.exceptions import ValidationError


[docs]class ArrayRepresentation(Enum): LIST = "list" NUMPYDANTIC = "numpydantic" # numpydantic must be installed to use this
_BOUNDED_ARRAY_FIELDS = ("exact_number_dimensions", "minimum_number_dimensions", "maximum_number_dimensions") _T = TypeVar("_T") _RecursiveListType = Iterable[Union[_T, Iterable["_RecursiveListType"]]]
[docs]class AnyShapeArrayType(Generic[_T]): @classmethod def __get_pydantic_core_schema__(cls, source_type: Any, handler: "GetCoreSchemaHandler") -> "CoreSchema": # double-nested parameterized types here # source_type: List[Union[T,List[...]]] item_type = (Any,) if get_args(get_args(source_type)[0])[0] is _T else get_args(get_args(source_type)[0])[:-1] if len(item_type) == 1: item_schema = handler.generate_schema(item_type[0]) else: item_schema = core_schema.union_schema([handler.generate_schema(i) for i in item_type]) if all([getattr(i, "__module__", "") == "builtins" and i is not Any for i in item_type]): item_schema["strict"] = True # Before python 3.11, `Any` type was a special object without a __name__ item_name = "_".join(["Any" if i is Any else i.__name__ for i in item_type]) array_ref = f"any-shape-array-{item_name}" schema = core_schema.definitions_schema( core_schema.list_schema(core_schema.definition_reference_schema(array_ref)), [ core_schema.union_schema( [ core_schema.list_schema(core_schema.definition_reference_schema(array_ref)), item_schema, ], ref=array_ref, ) ], ) return schema
AnyShapeArray = Annotated[_RecursiveListType, AnyShapeArrayType] _AnyShapeArrayImports = ( Imports() + Import( module="typing", objects=[ ObjectImport(name="Generic"), ObjectImport(name="Iterable"), ObjectImport(name="TypeVar"), ObjectImport(name="Union"), ObjectImport(name="get_args"), ], ) + ConditionalImport( condition="sys.version_info.minor > 8", module="typing", objects=[ObjectImport(name="Annotated")], alternative=Import(module="typing_extensions", objects=[ObjectImport(name="Annotated")]), ) + Import(module="pydantic", objects=[ObjectImport(name="GetCoreSchemaHandler")]) + Import(module="pydantic_core", objects=[ObjectImport(name="CoreSchema"), ObjectImport(name="core_schema")]) ) # annotated types are special and inspect.getsource() can't stringify them _AnyShapeArrayInjects = [ '_T = TypeVar("_T")', '_RecursiveListType = Iterable[Union[_T, Iterable["_RecursiveListType"]]]', AnyShapeArrayType, "AnyShapeArray = Annotated[_RecursiveListType, AnyShapeArrayType]", ] _ConListImports = Imports() + Import(module="pydantic", objects=[ObjectImport(name="conlist")])
[docs]class ArrayValidator: """ Validate the specification of a LinkML Array .. todo:: It looks like :mod:`linkml.validator` is for validating instances against schema, rather than validating the schema itself, so am not subclassing/writing as a plugin. Unsure if there is a more general means of validating schema, but for now this is an independent class """
[docs] @classmethod def validate(cls, array: ArrayExpression): """ Validate an array expression. Raises: :class:`.ValidationError` if invalid """ cls.array_exact_dimensions(array) cls.array_consistent_n_dimensions(array) cls.array_explicitly_unbounded(array) cls.array_dimensions_ordinal(array) if array.dimensions: for dimension in array.dimensions: cls.validate_dimension(dimension)
[docs] @classmethod def validate_dimension(cls, dimension: DimensionExpression): """ Validate a single array dimension Raises: :class:`.ValidationError` if invalid """ cls.dimension_exact_cardinality(dimension) cls.dimension_ordinal(dimension)
[docs] @staticmethod def array_exact_dimensions(array: ArrayExpression): """Arrays can have exact_number_dimensions OR min/max_number_dimensions, but not both""" if array.exact_number_dimensions is not None and ( array.minimum_number_dimensions is not None or array.maximum_number_dimensions is not None ): raise ValidationError( f"Can only specify EITHER exact_number_dimensions OR minimum/maximum dimensions, got: {array}" )
[docs] @staticmethod def array_consistent_n_dimensions(array: ArrayExpression): """ Complex arrays with both exact/min/max_number_dimensions and parameterized dimensions need to have the exact/min/max_number_dimensions greater than the number of parameterized dimensions! """ if not array.dimensions: return for field_name in _BOUNDED_ARRAY_FIELDS: field = getattr(array, field_name, None) if field and field < len(array.dimensions): raise ValidationError( "if exact/minimum/maximum_number_dimensions is provided, " "it must be greater than the parameterized dimensions. " f"got\n- {field_name}: {field}\n- dimensions: {array.dimensions}" )
[docs] @staticmethod def array_dimensions_ordinal(array: ArrayExpression): """ minimum_number_dimensions needs to be less than maximum_number_dimensions when both are set """ if array.minimum_number_dimensions is not None and array.maximum_number_dimensions: if array.minimum_number_dimensions > array.maximum_number_dimensions: raise ValidationError( "minimum_number_dimensions must be lesser than maximum_number_dimensions when both are set. " f"got minimum: {array.minimum_number_dimensions}, maximum: {array.maximum_number_dimensions}" )
[docs] @staticmethod def array_explicitly_unbounded(array: ArrayExpression): """ Complex arrays with a minimum_number_dimensions and parameterized dimensions need to either use exact_number_dimensions to specify extra anonymous dimensions or set maximum_number_dimensions to ``False`` to specify unbounded extra anonymous dimensions to avoid ambiguity. """ if array.minimum_number_dimensions is not None and array.maximum_number_dimensions is None and array.dimensions: raise ValidationError( ( "Cannot specify a minimum_number_dimensions while maximum is None while using labeled dimensions - " "either use exact_number_dimensions > len(dimensions) for extra parameterized dimensions or set " "maximum_number_dimensions explicitly to False for unbounded dimensions" ) )
[docs] @staticmethod def dimension_exact_cardinality(dimension: DimensionExpression): """Dimensions can only have exact_cardinality OR min/max_cardinality, but not both""" if dimension.exact_cardinality is not None and ( dimension.minimum_cardinality is not None or dimension.maximum_cardinality is not None ): raise ValidationError( f"Can only specify EITHER exact_cardinality OR minimum/maximum cardinality, got: {dimension}" )
[docs] @staticmethod def dimension_ordinal(dimension: DimensionExpression): """minimum_cardinality must be less than maximum_cardinality when both are set""" if dimension.minimum_cardinality is not None and dimension.maximum_cardinality is not None: if dimension.minimum_cardinality > dimension.maximum_cardinality: raise ValidationError( "minimum_cardinality must be lesser than maximum_cardinality when both are set. " f"got minimum: {dimension.minimum_cardinality}, maximum: {dimension.maximum_cardinality}" )
[docs]class ArrayRangeGenerator(ABC): """ Metaclass for generating a given format of array range. See :ref:`array-forms` for more details on array range forms. These classes do only enough validation of the array specification to decide which kind of representation to generate. Proper value validation should happen elsewhere (ie. in the metamodel and generated :class:`.ArrayExpression` class.) Each of the array representation generation methods should be able to handle the supported pydantic versions (currently still 1 and 2). Notes: When checking for array specification, recall that there is a semantic difference between ``None`` and ``False`` , particularly for :attr:`.ArrayExpression.max_number_dimensions` - check for absence of specification with ``is None`` rather than checking for truthiness/falsiness (unless that's what you intend to do ofc ;) Attributes: array (:class:`.ArrayExpression` ): Array to create a range for dtype (Union[str, :class:`.Element` ): dtype of the entire array as a string """ REPR: ClassVar[ArrayRepresentation] def __init__(self, array: Optional[ArrayExpression], dtype: Union[str, Element]): self.array = array self.dtype = dtype
[docs] def make(self) -> RangeResult: """ Create the string form of the array representation, validating first """ self.validate() if not self.array.dimensions and not self.has_bounded_dimensions: return self._any_shape(self.array) elif not self.array.dimensions and self.has_bounded_dimensions: return self._bounded_dimensions(self.array) elif self.array.dimensions and not self.has_bounded_dimensions: return self._parameterized_dimensions(self.array) else: return self._complex_dimensions(self.array)
[docs] def validate(self): """ Ensure that the given ArrayExpression is valid using :class:`.ArrayValidator` .. todo:: Integrate with more general schema validation that happens when a schema is loaded, rather than when an array is generated Raises: :class:`.ValidationError` if the schema is invalid """ ArrayValidator.validate(self.array)
@property def has_bounded_dimensions(self) -> bool: """Whether the :class:`.ArrayExpression` has some shape specification aside from ``dimensions``""" return any([getattr(self.array, arr_field, None) is not None for arr_field in _BOUNDED_ARRAY_FIELDS])
[docs] @classmethod def get_generator(cls, repr: ArrayRepresentation) -> Type["ArrayRangeGenerator"]: """Get the generator class for a given array representation""" for subclass in cls.__subclasses__(): if repr in (subclass.REPR, subclass.REPR.value): return subclass raise ValueError(f"Generator for array representation {repr} not found!")
@abstractmethod def _any_shape(self, array: Optional[ArrayRepresentation] = None) -> RangeResult: """Any shaped array!""" pass @abstractmethod def _bounded_dimensions(self, array: ArrayExpression) -> RangeResult: """Array shape specified numerically, without axis parameterization""" pass @abstractmethod def _parameterized_dimensions(self, array: ArrayExpression) -> RangeResult: """Array shape specified with ``dimensions`` without additional parameterized dimensions""" pass @abstractmethod def _complex_dimensions(self, array: ArrayExpression) -> RangeResult: """Array shape with both ``parameterized`` and ``bounded`` dimensions""" pass
[docs]class ListOfListsArray(ArrayRangeGenerator): """ Represent arrays as lists of lists! """ REPR = ArrayRepresentation.LIST @staticmethod def _list_of_lists(dimensions: int, dtype: str) -> str: return ("List[" * dimensions) + dtype + ("]" * dimensions) @staticmethod def _parameterized_dimension(dimension: DimensionExpression, dtype: str) -> RangeResult: # TODO: Preserve label representation in some readable way! doing the MVP now of using conlist if dimension.exact_cardinality: dmin = dimension.exact_cardinality dmax = dimension.exact_cardinality elif dimension.minimum_cardinality or dimension.maximum_cardinality: dmin = dimension.minimum_cardinality dmax = dimension.maximum_cardinality else: # TODO: handle labels for labeled but unshaped arrays return RangeResult(range="List[" + dtype + "]") items = [] if dmin is not None: items.append(f"min_length={dmin}") if dmax is not None: items.append(f"max_length={dmax}") items.append(f"item_type={dtype}") items = ", ".join(items) range = f"conlist({items})" return RangeResult(range=range, imports=_ConListImports) def _any_shape(self, array: Optional[ArrayExpression] = None, with_inner_union: bool = False) -> RangeResult: """ An AnyShaped array (using :class:`.AnyShapeArray` ) Args: array (:class:`.ArrayExpression`): The array expression (not used) with_inner_union (bool): If ``True`` , the innermost type is a ``Union`` of the ``AnyShapeArray`` class and ``dtype`` (default: ``False`` ) """ if self.dtype in ("Any", "AnyType"): range = "AnyShapeArray" else: range = f"AnyShapeArray[{self.dtype}]" if with_inner_union: range = f"Union[{range}, {self.dtype}]" return RangeResult(range=range, injected_classes=_AnyShapeArrayInjects, imports=_AnyShapeArrayImports) def _bounded_dimensions(self, array: ArrayExpression) -> RangeResult: """ A nested series of ``List[]`` ranges with :attr:`.dtype` at the center. When an array expression allows for a range of dimensions, each set of ``List`` s is joined by a ``Union`` . """ if array.exact_number_dimensions or ( array.minimum_number_dimensions and array.maximum_number_dimensions and array.minimum_number_dimensions == array.maximum_number_dimensions ): exact_dims = array.exact_number_dimensions or array.minimum_number_dimensions return RangeResult(range=self._list_of_lists(exact_dims, self.dtype)) elif not array.maximum_number_dimensions and ( array.minimum_number_dimensions is None or array.minimum_number_dimensions == 1 ): return self._any_shape() elif array.maximum_number_dimensions: # e.g., if min = 2, max = 3, range = Union[List[List[dtype]], List[List[List[dtype]]]] min_dims = array.minimum_number_dimensions if array.minimum_number_dimensions is not None else 1 ranges = [self._list_of_lists(i, self.dtype) for i in range(min_dims, array.maximum_number_dimensions + 1)] return RangeResult(range="Union[" + ", ".join(ranges) + "]") else: # min specified with no max # e.g., if min = 3, range = List[List[AnyShapeArray[dtype]]] return RangeResult( range=self._list_of_lists(array.minimum_number_dimensions - 1, self._any_shape().range), injected_classes=_AnyShapeArrayInjects, imports=_AnyShapeArrayImports, ) def _parameterized_dimensions(self, array: ArrayExpression) -> RangeResult: """ Constrained shapes using :func:`pydantic.conlist` TODO: - preservation of aliases - (what other metadata is allowable on labeled dimensions?) """ # generate dimensions from inside out and then format # e.g., if dimensions = [{min_card: 3}, {min_card: 2}], # range = conlist(min_length=3, item_type=conlist(min_length=2, item_type=dtype)) range = self.dtype for dimension in reversed(array.dimensions): range = self._parameterized_dimension(dimension, range).range return RangeResult(range=range, imports=_ConListImports) def _complex_dimensions(self, array: ArrayExpression) -> RangeResult: """ Mixture of parameterized dimensions with a max or min (or both) shape for anonymous dimensions. A mixture of ``List`` , :class:`.conlist` , and :class:`.AnyShapeArray` . """ res = None # first process any unlabeled dimensions which must be the innermost level of the range, # then wrap that with labeled dimensions if array.exact_number_dimensions or ( array.minimum_number_dimensions and array.maximum_number_dimensions and array.minimum_number_dimensions == array.maximum_number_dimensions ): exact_dims = array.exact_number_dimensions or array.minimum_number_dimensions if exact_dims > len(array.dimensions): res = RangeResult(range=self._list_of_lists(exact_dims - len(array.dimensions), self.dtype)) elif exact_dims == len(array.dimensions): # equivalent to labeled shape return self._parameterized_dimensions(array) # else is invalid, see: ArrayValidator.array_consistent_n_dimensions elif array.maximum_number_dimensions is not None and not array.maximum_number_dimensions: # unlimited n dimensions, so innermost is AnyShape with dtype res = self._any_shape(with_inner_union=True) if array.minimum_number_dimensions: # some minimum anonymous dimensions but unlimited max dimensions # e.g., if min = 3, len(dim) = 2, then res.range = List[Union[AnyShapeArray[dtype], dtype]] # res.range will be wrapped with the 2 labeled dimensions later res.range = self._list_of_lists(array.minimum_number_dimensions - len(array.dimensions), res.range) elif array.maximum_number_dimensions: initial_min = array.minimum_number_dimensions if array.minimum_number_dimensions is not None else 0 dmin = max(len(array.dimensions), initial_min) - len(array.dimensions) dmax = array.maximum_number_dimensions - len(array.dimensions) res = self._bounded_dimensions( ArrayExpression(minimum_number_dimensions=dmin, maximum_number_dimensions=dmax) ) if res is None: raise ValueError("Unsupported array specification! this is almost certainly a bug!") # pragma: no cover # Wrap inner dimension with labeled dimension # e.g., if dimensions = [{min_card: 3}, {min_card: 2}] # and res.range = List[Union[AnyShapeArray[dtype], dtype]] # (min 3 dims, no max dims) # then the final range = conlist( # min_length=3, # item_type=conlist( # min_length=2, # item_type=List[Union[AnyShapeArray[dtype], dtype]] # ) # ) for dim in reversed(array.dimensions): res = res.merge(self._parameterized_dimension(dim, dtype=res.range)) return res
[docs]class NumpydanticArray(ArrayRangeGenerator): """ Represent array range with :class:`numpydantic.NDArray` annotations, allowing an abstract array specification to be used with many different array libraries. """ REPR = ArrayRepresentation.NUMPYDANTIC MIN_NUMPYDANTIC_VERSION = "1.6.1" """ Minimum numpydantic version needed to be installed in the environment using the generated models """ IMPORTS = Imports() + Import( module="numpydantic", objects=[ObjectImport(name="NDArray"), ObjectImport(name="Shape")] ) INJECTS = [f'MIN_NUMPYDANTIC_VERSION = "{MIN_NUMPYDANTIC_VERSION}"']
[docs] def make(self) -> RangeResult: result = super().make() result.imports = self.IMPORTS.model_copy() result.injected_classes = self.INJECTS.copy() return result
[docs] @staticmethod def ndarray_annotation(shape: Optional[List[Union[int, str]]] = None, dtype: Optional[str] = None) -> str: """ Make a stringified :class:`numpydantic.NDArray` annotation for a given shape and dtype. If either ``shape`` or ``dtype`` is ``None`` , use ``Any`` """ if shape is None: shape = "Any" else: shape_expression = ", ".join([str(i) for i in shape]) shape = f'Shape["{shape_expression}"]' if dtype is None or dtype in ("Any", "AnyType"): dtype = "Any" if shape == "Any" and dtype == "Any": return "NDArray" else: return f"NDArray[{shape}, {dtype}]"
@staticmethod def _dimension_shape(dimension: DimensionExpression) -> str: if dimension.exact_cardinality: shape = str(dimension.exact_cardinality) elif dimension.minimum_cardinality and not dimension.maximum_cardinality: shape = f"{dimension.minimum_cardinality}-*" elif dimension.maximum_cardinality and not dimension.minimum_cardinality: shape = f"*-{dimension.maximum_cardinality}" elif dimension.minimum_cardinality and dimension.maximum_cardinality: shape = f"{dimension.minimum_cardinality}-{dimension.maximum_cardinality}" else: shape = "*" return shape @classmethod def _parameterized_dimension(cls, dimension: DimensionExpression) -> str: shape = cls._dimension_shape(dimension) if dimension.alias is not None: return f"{shape} {dimension.alias}" else: return shape def _any_shape(self, array: Optional[ArrayRepresentation] = None) -> RangeResult: """ Any shaped array, either an unparameterized :class:`numpydantic.NDArray` if dtype is :class:`typing.Any` , or like ``NDArray[Any, {self.dtype}]`` otherwise. """ if self.dtype in ("Any", "AnyType"): range = "NDArray" else: range = f"NDArray[Any, {self.dtype}]" return RangeResult(range=range) def _bounded_dimensions(self, array: ArrayExpression) -> RangeResult: """ Number of dimensions specified without shape """ if array.exact_number_dimensions or ( array.minimum_number_dimensions and array.maximum_number_dimensions and array.minimum_number_dimensions == array.maximum_number_dimensions ): exact_dims = array.exact_number_dimensions or array.minimum_number_dimensions return RangeResult(range=self.ndarray_annotation(["*"] * exact_dims, self.dtype)) elif not array.maximum_number_dimensions and ( array.minimum_number_dimensions is None or array.minimum_number_dimensions == 1 ): return self._any_shape() elif array.maximum_number_dimensions: # e.g., if min = 2, max = 3, range = Union[NDArray[Shape["*, *"], dtype], NDArray[Shape["*, *, *"], dtype]] min_dims = array.minimum_number_dimensions if array.minimum_number_dimensions is not None else 1 ranges = [ self.ndarray_annotation(["*"] * i, self.dtype) for i in range(min_dims, array.maximum_number_dimensions + 1) ] return RangeResult(range="Union[" + ", ".join(ranges) + "]") else: # min specified with no max # e.g., if min = 3, range = NDArray[Shape[*, *, *, ...], dtype] shape_inner = ["*"] * array.minimum_number_dimensions shape_inner.append("...") return RangeResult(range=self.ndarray_annotation(shape_inner, self.dtype)) def _parameterized_dimensions(self, array: ArrayExpression) -> RangeResult: """ Arrays with constrained shapes or labels """ dims = [self._parameterized_dimension(d) for d in array.dimensions] range = self.ndarray_annotation(dims, self.dtype) return RangeResult(range=range) def _complex_dimensions(self, array: ArrayExpression) -> RangeResult: """ Mixture of parameterized dimensions with a max or min (or both) shape for anonymous dimensions. """ dims = [self._parameterized_dimension(d) for d in array.dimensions] res = None if array.exact_number_dimensions or ( array.minimum_number_dimensions and array.maximum_number_dimensions and array.minimum_number_dimensions == array.maximum_number_dimensions ): exact_dims = array.exact_number_dimensions or array.minimum_number_dimensions if exact_dims > len(array.dimensions): dims.extend(["*"] * (exact_dims - len(dims))) res = self.ndarray_annotation(dims, self.dtype) elif exact_dims == len(array.dimensions): # equivalent to labeled shape return self._parameterized_dimensions(array) # else is invalid, see: ArrayValidator.array_consistent_n_dimensions(array) elif array.maximum_number_dimensions is not None and not array.maximum_number_dimensions: # unlimited n dimensions if array.minimum_number_dimensions: # some minimum anonymous dimensions but unlimited max dimensions dims.extend(["*"] * (array.minimum_number_dimensions - len(dims))) dims.append("...") res = self.ndarray_annotation(dims, self.dtype) elif array.maximum_number_dimensions: # some res of anonymous dimensions if array.minimum_number_dimensions: min_dim = array.minimum_number_dimensions else: min_dim = len(dims) dim_union = [] for i in range(min_dim, array.maximum_number_dimensions + 1): this_dims = dims.copy() this_dims.extend(["*"] * (i - len(dims))) dim_union.append(self.ndarray_annotation(this_dims, self.dtype)) dim_union = ", ".join(dim_union) res = f"Union[{dim_union}]" if res is None: raise ValueError(f"Unhandled range case! {array}") return RangeResult(range=res)