Source code for schema_automator.importers.cadsr_import_engine

"""
CADSR CDE Import Engine

This ingests the output of the caDSR API https://cadsrapi.cancer.gov/rad/NCIAPI/1.0/api
"""
import logging
import urllib
from typing import Union, Dict, Tuple, List, Any, Optional, Iterable, Iterator

from dataclasses import dataclass

from linkml.utils.schema_builder import SchemaBuilder
from linkml_runtime.linkml_model import Annotation
from linkml_runtime.linkml_model.meta import SchemaDefinition, SlotDefinition, EnumDefinition, \
    PermissibleValue, UniqueKey, ClassDefinition
from linkml_runtime.loaders import json_loader
from linkml_runtime.utils.formatutils import camelcase, underscore

from schema_automator.importers.import_engine import ImportEngine
import schema_automator.metamodels.cadsr as cadsr

ID_LABEL_PAIR = Tuple[str, str]

TMAP = {
    "DATE": "date",
    "NUMBER": "float",
    "ALPHANUMERIC": "string",
    "CHARACTER": "string",
    "HL7EDv3": "string",
    "HL7CDv3": "string",
    "java.lang.Double": "float",
    "Numeric Alpha DVG": "float",
    "SAS Date": "string",
    "java.util.Date": "date",
    "DATE/TIME": "datetime",
    "TIME": "time",
    "Integer": "integer",
    "java.lang.Integer": "integer",
    "Floating-point": "float",
}

def extract_concepts(concepts: List[cadsr.Concept]) -> Tuple[ID_LABEL_PAIR, List[str]]:
    main = None
    rest = []
    if not concepts:
        raise ValueError("No concepts")
    for concept in concepts:
        if concept.evsSource != "NCI_CONCEPT_CODE":
            continue
        id = f"NCIT:{concept.conceptCode.strip()}"
        pair = id, concept.longName
        if concept.primaryIndicator == "Yes":
            if main:
                raise ValueError(f"Multiple primary for: {concepts}")
            main = pair
        else:
            rest.append(id)
    if not main:
        logging.warning(f"No primary, using arbitrary from {rest}")
        main = rest[0]
        rest = rest[1:]
    return main, rest

[docs] @dataclass class CADSRImportEngine(ImportEngine): """ An ImportEngine that imports NCI CADSR CDEs Ingests the output of `caDSR API <https://cadsrapi.cancer.gov/rad/NCIAPI/1.0/api>`_. Note that we include a LinkML schema for this in schema_automator.metamodels.cadsr - Each CDE (DataElement) becomes a unique slot - the CDE is added as a lot of a context-specific class - the context-specific class is a subclass of the CDE's DataElementConcept Note that this creates a lot of 1-1 classes, as in many cases there is no attempt to group concepts. However, this is not always the case. E.g. the concept with publicId 2012668 (Access Route) is used in 5 contexts (AHRQ, CCR, ...) Each context-specific concept has its own set of CDEs See also https://github.com/monarch-initiative/cde-harmonization """
[docs] def convert(self, paths: Iterable[str], id: str=None, name: str=None, **kwargs) -> SchemaDefinition: """ Converts one or more CDE JSON files into LinkML :param files: :param kwargs: :return: """ sb = SchemaBuilder() schema = sb.schema if id: schema.id = id if not name: name = package.name if name: schema.name = name classes = {} slots = {} enums = {} for path in paths: logging.info(f"Loading {path}") with (open(path) as file): container: cadsr.DataElementContainer container = json_loader.load(file, target_class=cadsr.DataElementContainer) cde = container.DataElement ctxt = cde.context ln = cde.longName source = urllib.parse.quote(ctxt) source = f"cadsr:{source}" # Each DataElement becomes one slot; # we make the name both unique and human readable # TODO: check this; frequently reused, e.g. # NCI Standards + Group ID slot = SlotDefinition( name=urllib.parse.quote(underscore(f"{ctxt} {cde.preferredName} {ln}")), slot_uri=f"cadsr:{cde.publicId}", title=cde.preferredName, description=cde.preferredDefinition, aliases=[ln], conforms_to=f"cadsr:DataElement", source=source, ) # each data element belongs to a concept # (may be reused across classes?) slots[slot.name] = slot dec = cde.DataElementConcept property = dec.Property main_prop_concept, prop_mappings = extract_concepts(property.Concepts) # a concept is linked to a class objectClass = dec.ObjectClass # NCIT concepts describing the class; # there will be one primary and possibly secondary concepts mainConcept, mappings = extract_concepts(objectClass.Concepts) class_name = objectClass.longName concept_name = urllib.parse.quote(camelcase(f"{ctxt} {class_name}")) parent_concept_name = urllib.parse.quote(class_name) if parent_concept_name not in classes: parent_cls = ClassDefinition( name=parent_concept_name, title=objectClass.preferredName, description=objectClass.preferredDefinition, #aliases=[concept.longName], class_uri=f"cadsr:{objectClass.publicId}", exact_mappings=[mainConcept[0]], broad_mappings=mappings, conforms_to=f"cadsr:ObjectClass", ) classes[parent_concept_name] = parent_cls if concept_name not in classes: cls = ClassDefinition( name=concept_name, title=f"{dec.preferredName} ({ctxt})", description=dec.preferredDefinition, aliases=[dec.longName], class_uri=f"cadsr:{dec.publicId}", is_a=parent_concept_name, conforms_to=f"cadsr:DataElementConcept", ) classes[concept_name] = cls else: cls = classes[concept_name] cls.slots.append(slot.name) # In theory the ObjectClass should link to a general class of utility in NCIT. # In practice the actual concept may not be so useful. E.g. in 2724331 # "Agent Adverse Event Attribution Name" the DataConcept is # Agent (C1708) defined as "An active power or cause (as principle, # substance, physical or biological factor, etc.) that produces a specific effect." # which is very upper-ontological #for ocConcept in objectClass.Concepts: # if ocConcept.evsSource == "NCI_CONCEPT_CODE": # cls.is_a = f"NCIT:{ocConcept.conceptCode}" valueDomain = cde.ValueDomain # TODO conceptualDomain = valueDomain.ConceptualDomain pvs = valueDomain.PermissibleValues if pvs: enum_name = urllib.parse.quote(camelcase(valueDomain.preferredName)) enum = EnumDefinition( name=enum_name, title=valueDomain.preferredName, description=valueDomain.preferredDefinition, aliases=[valueDomain.longName], # enum_uri=f"cadsr:{valueDomain.publicId}", ) enums[enum_name] = enum rng = enum_name for pv in pvs: # url encode the value to escape symbols like <, >, etc. pv_value = urllib.parse.quote(pv.value).replace("%20", " ") tgt_pv = PermissibleValue( text=pv_value, title=pv.value, description=pv.valueDescription, ) enum.permissible_values[tgt_pv.text] = tgt_pv vm = pv.ValueMeaning tgt_pv.title = vm.preferredName if not tgt_pv.description: tgt_pv.description = vm.preferredDefinition if vm.Concepts: mainConcept, mappings = extract_concepts(vm.Concepts) tgt_pv.meaning = mainConcept[0] tgt_pv.broad_mappings = mappings else: datatype = valueDomain.dataType rng = TMAP.get(datatype, "string") slot.range = rng anns = [] for rd in cde.ReferenceDocuments: rf_type = urllib.parse.quote(underscore(rd.type)) anns.append(Annotation( tag=rf_type, value=rd.description, )) for ann in anns: slot.annotations[ann.tag] = ann sb.add_prefix("NCIT", "http://purl.obolibrary.org/obo/NCIT_") sb.add_prefix("cadsr", "http://example.org/cadsr/") sb.add_defaults() for c in schema.classes.values(): c.from_schema = 'http://example.org/' schema = sb.schema schema.classes = classes schema.slots = slots schema.enums = enums return schema
def as_rows(self, paths: Iterable[str], **kwargs) -> Iterator[Dict]: for path in paths: logging.info(f"Loading {path}") with (open(path) as file): container: cadsr.DataElementContainer container = json_loader.load(file, target_class=cadsr.DataElementContainer) cde = container.DataElement yield from self._obj_as_rows(cde, path) def _obj_as_rows(self, e: Union[cadsr.DataElement, cadsr.DataElementConcept, cadsr.Concept, cadsr.Property, cadsr.ObjectClass, cadsr.ConceptualDomain, cadsr.ValueDomain, cadsr.PermissibleValue, cadsr.ValueMeaning], parent_id: str) -> Iterator[Dict]: if isinstance(e, cadsr.Concept): obj = { "id": e.conceptCode, "context": e.evsSource, "longName": e.longName, } elif isinstance(e, cadsr.CDEPermissibleValue): obj = { "id": e.publicId, "value": e.value, "valueDescription": e.valueDescription, } else: obj = { "id": e.publicId, "preferredName": e.preferredName, "context": e.context, "longName": e.longName, } obj["parentId"] = parent_id obj["type"] = type(e).class_name id = obj["id"] yield obj if isinstance(e, cadsr.DataElement): yield from self._obj_as_rows(e.DataElementConcept, id) yield from self._obj_as_rows(e.ValueDomain, id) elif isinstance(e, cadsr.DataElementConcept): yield from self._obj_as_rows(e.ObjectClass, id) yield from self._obj_as_rows(e.Property, id) yield from self._obj_as_rows(e.ConceptualDomain, id) elif isinstance(e, cadsr.ValueDomain): for pv in e.PermissibleValues: yield from self._obj_as_rows(pv.ValueMeaning, id) if isinstance(e, (cadsr.ObjectClass, cadsr.Property, cadsr.PermissibleValue)): for c in e.Concepts: yield from self._obj_as_rows(c, id)