diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ac25f37..9f8fb4e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -42,7 +42,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - worker: [aragorn, aragorn_lookup, aragorn_omnicorp, aragorn_score, arax, sipr, filter_kgraph_orphans, filter_results_top_n, finish_query, merge_message, sort_results_score] + worker: [aragorn, aragorn_lookup, aragorn_omnicorp, aragorn_score, arax, bte, bte_lookup, sipr, filter_kgraph_orphans, filter_results_top_n, finish_query, merge_message, sort_results_score] steps: - name: Check out the repo uses: actions/checkout@v4 diff --git a/compose.yml b/compose.yml index ee3d28e..97f9055 100644 --- a/compose.yml +++ b/compose.yml @@ -259,6 +259,37 @@ services: volumes: - ./logs:/app/logs - ./.env:/app/.env + +######### BTE + bte: + container_name: bte + build: + context: . + dockerfile: workers/bte/Dockerfile + restart: unless-stopped + depends_on: + shepherd_db: + condition: service_healthy + shepherd_broker: + condition: service_healthy + volumes: + - ./logs:/app/logs + - ./.env:/app/.env + + bte_lookup: + container_name: bte_lookup + build: + context: . + dockerfile: workers/bte_lookup/Dockerfile + restart: unless-stopped + depends_on: + shepherd_db: + condition: service_healthy + shepherd_broker: + condition: service_healthy + volumes: + - ./logs:/app/logs + - ./.env:/app/.env ######### SIPR sipr: diff --git a/shepherd_server/aras/bte.py b/shepherd_server/aras/bte.py new file mode 100644 index 0000000..3d0f86b --- /dev/null +++ b/shepherd_server/aras/bte.py @@ -0,0 +1,63 @@ +from fastapi import Body, FastAPI, Request, Response +from fastapi.openapi.docs import ( + get_swagger_ui_html, +) +from starlette.responses import HTMLResponse + +from shepherd_server.base_routes import ( + ARATargetEnum, + base_router, + callback, + default_input_query, + run_async_query, + run_sync_query, +) +from shepherd_server.openapi import construct_open_api_schema + +BTE = FastAPI(title="Shepherd BTE") + + +@BTE.post("/query") +async def sync_query( + query: dict = Body(..., example=default_input_query), +) -> Response: + response = await run_sync_query(ARATargetEnum.BTE, query) + return response + + +@BTE.post("/asyncquery") +async def async_query( + query: dict = Body(..., example=default_input_query), +) -> Response: + response = await run_async_query(ARATargetEnum.BTE, query) + return response + + +@BTE.post("/callback/{callback_id}", status_code=200, include_in_schema=False) +async def handle_callback( + callback_id: str, + response: dict, +) -> Response: + response = await callback(ARATargetEnum.BTE, callback_id, response) + return response + + +BTE.include_router(base_router, prefix="") + + +@BTE.get("/docs", include_in_schema=False) +async def custom_swagger_ui_html(req: Request) -> HTMLResponse: + """Customize Swagger UI.""" + root_path = req.scope.get("root_path", "").rstrip("/") + openapi_url = root_path + BTE.openapi_url + swagger_favicon_url = root_path + "/static/favicon.png" + return get_swagger_ui_html( + openapi_url=openapi_url, + title=BTE.title + " - Swagger UI", + swagger_favicon_url=swagger_favicon_url, + ) + + +BTE.openapi_schema = construct_open_api_schema( + BTE, infores="infores:shepherd-bte", subpath="/bte" +) diff --git a/shepherd_server/base_routes.py b/shepherd_server/base_routes.py index 85c0d61..272cc73 100644 --- a/shepherd_server/base_routes.py +++ b/shepherd_server/base_routes.py @@ -34,6 +34,7 @@ class ARATargetEnum(str, Enum): ARAGORN = "aragorn" ARAX = "arax" + BTE = "bte" EXAMPLE = "example" SIPR = "sipr" diff --git a/shepherd_server/openapi-config.yaml b/shepherd_server/openapi-config.yaml index 7b5294f..8328f89 100644 --- a/shepherd_server/openapi-config.yaml +++ b/shepherd_server/openapi-config.yaml @@ -4,7 +4,7 @@ contact: x-id: https://github.com/maximusunc x-role: responsible developer description: '

Shepherd: Translator Autonomous Relay Agent Platform' -version: 0.3.0 +version: 0.4.0 servers: - description: Default server url: https://shepherd.renci.org diff --git a/shepherd_server/server.py b/shepherd_server/server.py index 76ade87..36e6d62 100644 --- a/shepherd_server/server.py +++ b/shepherd_server/server.py @@ -18,6 +18,7 @@ from shepherd_server.aras.aragorn import ARAGORN from shepherd_server.aras.arax import ARAX +from shepherd_server.aras.bte import BTE from shepherd_server.aras.sipr import SIPR from shepherd_server.base_routes import base_router from shepherd_server.openapi import construct_open_api_schema @@ -51,6 +52,7 @@ async def lifespan(app: FastAPI): APP.mount("/aragorn", ARAGORN) APP.mount("/arax", ARAX) +APP.mount("/bte", BTE) APP.mount("/sipr", SIPR) APP.add_middleware( diff --git a/workers/bte/Dockerfile b/workers/bte/Dockerfile new file mode 100644 index 0000000..70e29f9 --- /dev/null +++ b/workers/bte/Dockerfile @@ -0,0 +1,34 @@ +# Use RENCI python base image +FROM ghcr.io/translatorsri/renci-python-image:3.11.5 + +# Add image info +LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd + +ENV PYTHONHASHSEED=0 + +# set up requirements +WORKDIR /app + +# make sure all is writeable for the nru USER later on +RUN chmod -R 777 . + +# Install requirements +COPY ./shepherd_utils ./shepherd_utils +COPY ./pyproject.toml . +RUN pip install . + +COPY ./workers/bte/requirements.txt . +RUN pip install -r requirements.txt + +# switch to the non-root user (nru). defined in the base image +USER nru + +# Copy in files +COPY ./workers/bte ./ + +# Set up base for command and any variables +# that shouldn't be modified +# ENTRYPOINT ["uvicorn", "shepherd_server.server:APP"] + +# Variables that can be overriden +CMD ["python", "worker.py"] diff --git a/workers/bte/__init__.py b/workers/bte/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/workers/bte/requirements.txt b/workers/bte/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/workers/bte/worker.py b/workers/bte/worker.py new file mode 100644 index 0000000..ae45036 --- /dev/null +++ b/workers/bte/worker.py @@ -0,0 +1,133 @@ +"""BTE ARA module.""" + +import asyncio +import logging +import time +import uuid +from shepherd_utils.db import get_message +from shepherd_utils.otel import setup_tracer +from shepherd_utils.shared import get_tasks, wrap_up_task + +# Queue name +STREAM = "bte" +# Consumer group, most likely you don't need to change this. +GROUP = "consumer" +CONSUMER = str(uuid.uuid4())[:8] +TASK_LIMIT = 100 +tracer = setup_tracer(STREAM) + + +def examine_query(message): + """Decides whether the input is an infer. Returns the grouping node""" + # Currently, we support: + # queries that are any shape with all lookup edges + # OR + # A 1-hop infer query. + # OR + # Pathfinder query + try: + # this can still fail if the input looks like e.g.: + # "query_graph": None + qedges = message.get("message", {}).get("query_graph", {}).get("edges", {}) + except: + qedges = {} + n_infer_edges = 0 + for edge_id in qedges: + if qedges.get(edge_id, {}).get("knowledge_type", "lookup") == "inferred": + n_infer_edges += 1 + pathfinder = n_infer_edges == 3 + if n_infer_edges > 1 and n_infer_edges and not pathfinder: + raise Exception("Only a single infer edge is supported", 400) + if (n_infer_edges > 0) and (n_infer_edges < len(qedges)): + raise Exception("Mixed infer and lookup queries not supported", 400) + infer = n_infer_edges == 1 + if not infer: + return infer, None, None, pathfinder + qnodes = message.get("message", {}).get("query_graph", {}).get("nodes", {}) + question_node = None + answer_node = None + for qnode_id, qnode in qnodes.items(): + if qnode.get("ids", None) is None: + answer_node = qnode_id + else: + question_node = qnode_id + if answer_node is None: + raise Exception("Both nodes of creative edge pinned", 400) + if question_node is None: + raise Exception("No nodes of creative edge pinned", 400) + return infer, question_node, answer_node, pathfinder + + +async def bte(task, logger: logging.Logger): + """Main BTE entrypoint that establishes query workflow.""" + start = time.time() + # given a task, get the message from the db + query_id = task[1]["query_id"] + message = await get_message(query_id, logger) + try: + infer, question_qnode, answer_qnode, pathfinder = examine_query(message) + except Exception as e: + logger.error(e) + return None, 500 + if pathfinder: + # BTE doesn't currently handle Pathfinder queries + return None, 500 + + supported_workflow_operations = set( + [ + "bte.lookup", + "aragorn.omnicorp", + "aragorn.score", + "sort_results_score", + "filter_results_top_n", + "filter_kgraph_ophans", + ] + ) + workflow = None + if "workflow" in message: + workflow = message["workflow"] + for workflow_op in workflow: + if workflow_op not in supported_workflow_operations: + logger.error(f"Unsupported workflow operation: {workflow_op}") + else: + if infer: + workflow = [ + {"id": "bte.lookup"}, + {"id": "aragorn.omnicorp"}, + {"id": "aragorn.score"}, + {"id": "sort_results_score"}, + {"id": "filter_results_top_n", "parameters": {"max_results": 500}}, + {"id": "filter_kgraph_orphans"}, + ] + else: + workflow = [ + {"id": "bte.lookup"}, + {"id": "aragorn.omnicorp"}, + {"id": "aragorn.score"}, + {"id": "sort_results_score"}, + {"id": "filter_results_top_n", "parameters": {"max_results": 500}}, + {"id": "filter_kgraph_orphans"}, + ] + + await wrap_up_task(STREAM, GROUP, task, workflow, logger) + logger.info(f"Task took {time.time() - start}") + + +async def process_task(task, parent_ctx, logger, limiter): + span = tracer.start_span(STREAM, context=parent_ctx) + try: + await bte(task, logger) + finally: + span.end() + limiter.release() + + +async def poll_for_tasks(): + async for task, parent_ctx, logger, limiter in get_tasks( + STREAM, GROUP, CONSUMER, TASK_LIMIT + ): + asyncio.create_task(process_task(task, parent_ctx, logger, limiter)) + + +if __name__ == "__main__": + asyncio.run(poll_for_tasks()) diff --git a/workers/bte_lookup/Dockerfile b/workers/bte_lookup/Dockerfile new file mode 100644 index 0000000..f5f1a30 --- /dev/null +++ b/workers/bte_lookup/Dockerfile @@ -0,0 +1,34 @@ +# Use RENCI python base image +FROM ghcr.io/translatorsri/renci-python-image:3.11.5 + +# Add image info +LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd + +ENV PYTHONHASHSEED=0 + +# set up requirements +WORKDIR /app + +# make sure all is writeable for the nru USER later on +RUN chmod -R 777 . + +# Install requirements +COPY ./shepherd_utils ./shepherd_utils +COPY ./pyproject.toml . +RUN pip install . + +COPY ./workers/bte_lookup/requirements.txt . +RUN pip install -r requirements.txt + +# switch to the non-root user (nru). defined in the base image +USER nru + +# Copy in files +COPY ./workers/bte_lookup ./ + +# Set up base for command and any variables +# that shouldn't be modified +# ENTRYPOINT ["uvicorn", "shepherd_server.server:APP"] + +# Variables that can be overriden +CMD ["python", "worker.py"] diff --git a/workers/bte_lookup/__init__.py b/workers/bte_lookup/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/workers/bte_lookup/requirements.txt b/workers/bte_lookup/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/workers/bte_lookup/template_groups.json b/workers/bte_lookup/template_groups.json new file mode 100644 index 0000000..22f60cb --- /dev/null +++ b/workers/bte_lookup/template_groups.json @@ -0,0 +1,66 @@ +[ + { + "name": "Drug treats Disease", + "subject": [ + "Drug", + "SmallMolecule", + "ChemicalEntity", + "ComplexMolecularMixture", + "MolecularMixture" + ], + "predicate": ["treats", "ameliorates"], + "object": ["Disease", "PhenotypicFeature", "DiseaseOrPhenotypicFeature"], + "templates": [ + "Chem-treats-DoP.json", + "Chem-treats-PhenoOfDisease.json", + "Chem-regulates,affects-Gene-biomarker,associated_condition-DoP.json" + ], + "qualifiers": {} + }, + { + "name": "Chem increases Gene's activity or abundance", + "subject": [ + "Drug", + "SmallMolecule", + "ChemicalEntity", + "ComplexMolecularMixture", + "MolecularMixture" + ], + "predicate": ["affects"], + "qualifiers": { + "object_aspect_qualifier": "activity_or_abundance", + "object_direction_qualifier": "increased" + }, + "object": ["Gene", "Protein"], + "templates": [ + "Chem-increasesGene.json", + "Chem-IncreaseAnotherGeneThatUpregs-Gene.json", + "Chem-DecreaseAnotherGeneThatDownregs-Gene.json", + "Chem-physically_interacts-GeneThatUpregs-Gene.json", + "Chem-physically_interacts-Gene.json" + ] + }, + { + "name": "Chem decreases Gene's activity or abundance", + "subject": [ + "Drug", + "SmallMolecule", + "ChemicalEntity", + "ComplexMolecularMixture", + "MolecularMixture" + ], + "predicate": ["affects"], + "qualifiers": { + "object_aspect_qualifier": "activity_or_abundance", + "object_direction_qualifier": "decreased" + }, + "object": ["Gene", "Protein"], + "templates": [ + "Chem-decreasesGene.json", + "Chem-IncreaseAnotherGeneThatDownregs-Gene.json", + "Chem-DecreaseAnotherGeneThatUpregs-Gene.json", + "Chem-physically_interacts-GeneThatDownregs-Gene.json", + "Chem-physically_interacts-Gene.json" + ] + } +] \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-DecreaseAnotherGeneThatUpregs-Gene.json b/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-DecreaseAnotherGeneThatUpregs-Gene.json new file mode 100644 index 0000000..ed8cc75 --- /dev/null +++ b/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-DecreaseAnotherGeneThatUpregs-Gene.json @@ -0,0 +1,54 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene", "biolink:Protein"], + "is_set": true + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:affects"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "decreased" + }, + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "activity_or_abundance" + } + ] + } + ] + }, + "eB": { + "subject": "nA", + "object": "on", + "predicates": ["biolink:regulates"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "upregulated" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-IncreaseAnotherGeneThatDownregs-Gene.json b/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-IncreaseAnotherGeneThatDownregs-Gene.json new file mode 100644 index 0000000..90b51ab --- /dev/null +++ b/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-IncreaseAnotherGeneThatDownregs-Gene.json @@ -0,0 +1,54 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene", "biolink:Protein"], + "is_set": true + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:affects"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "increased" + }, + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "activity_or_abundance" + } + ] + } + ] + }, + "eB": { + "subject": "nA", + "object": "on", + "predicates": ["biolink:regulates"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "downregulated" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-decreasesGene.json b/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-decreasesGene.json new file mode 100644 index 0000000..5091b09 --- /dev/null +++ b/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-decreasesGene.json @@ -0,0 +1,35 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "on", + "predicates": ["biolink:affects"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "decreased" + }, + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "activity_or_abundance" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-negatively_correlated-Gene.json b/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-negatively_correlated-Gene.json new file mode 100644 index 0000000..872b3b7 --- /dev/null +++ b/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-negatively_correlated-Gene.json @@ -0,0 +1,21 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "on", + "predicates": ["biolink:negatively_correlated_with"] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-physically_interacts-GeneThatDownregs-Gene.json b/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-physically_interacts-GeneThatDownregs-Gene.json new file mode 100644 index 0000000..309a5c6 --- /dev/null +++ b/workers/bte_lookup/templates/Chem-decreases-Gene/Chem-physically_interacts-GeneThatDownregs-Gene.json @@ -0,0 +1,40 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene", "biolink:Protein"], + "is_set": true + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:physically_interacts_with"] + }, + "eB": { + "subject": "nA", + "object": "on", + "predicates": ["biolink:regulates"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "downregulated" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-increases-Gene/Chem-DecreaseAnotherGeneThatDownregs-Gene.json b/workers/bte_lookup/templates/Chem-increases-Gene/Chem-DecreaseAnotherGeneThatDownregs-Gene.json new file mode 100644 index 0000000..09bcd3f --- /dev/null +++ b/workers/bte_lookup/templates/Chem-increases-Gene/Chem-DecreaseAnotherGeneThatDownregs-Gene.json @@ -0,0 +1,54 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene", "biolink:Protein"], + "is_set": true + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:affects"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "decreased" + }, + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "activity_or_abundance" + } + ] + } + ] + }, + "eB": { + "subject": "nA", + "object": "on", + "predicates": ["biolink:regulates"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "downregulated" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-increases-Gene/Chem-IncreaseAnotherGeneThatUpregs-Gene.json b/workers/bte_lookup/templates/Chem-increases-Gene/Chem-IncreaseAnotherGeneThatUpregs-Gene.json new file mode 100644 index 0000000..6f3dd82 --- /dev/null +++ b/workers/bte_lookup/templates/Chem-increases-Gene/Chem-IncreaseAnotherGeneThatUpregs-Gene.json @@ -0,0 +1,54 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene", "biolink:Protein"], + "is_set": true + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:affects"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "increased" + }, + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "activity_or_abundance" + } + ] + } + ] + }, + "eB": { + "subject": "nA", + "object": "on", + "predicates": ["biolink:regulates"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "upregulated" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-increases-Gene/Chem-increasesGene.json b/workers/bte_lookup/templates/Chem-increases-Gene/Chem-increasesGene.json new file mode 100644 index 0000000..6131e1a --- /dev/null +++ b/workers/bte_lookup/templates/Chem-increases-Gene/Chem-increasesGene.json @@ -0,0 +1,35 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "on", + "predicates": ["biolink:affects"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "increased" + }, + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "activity_or_abundance" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-increases-Gene/Chem-physically_interacts-GeneThatUpregs-Gene.json b/workers/bte_lookup/templates/Chem-increases-Gene/Chem-physically_interacts-GeneThatUpregs-Gene.json new file mode 100644 index 0000000..1c01673 --- /dev/null +++ b/workers/bte_lookup/templates/Chem-increases-Gene/Chem-physically_interacts-GeneThatUpregs-Gene.json @@ -0,0 +1,40 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene", "biolink:Protein"], + "is_set": true + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:physically_interacts_with"] + }, + "eB": { + "subject": "nA", + "object": "on", + "predicates": ["biolink:regulates"], + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "upregulated" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-increases-Gene/Chem-positively_correlated-Gene.json b/workers/bte_lookup/templates/Chem-increases-Gene/Chem-positively_correlated-Gene.json new file mode 100644 index 0000000..a0f434d --- /dev/null +++ b/workers/bte_lookup/templates/Chem-increases-Gene/Chem-positively_correlated-Gene.json @@ -0,0 +1,21 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "on", + "predicates": ["biolink:positively_correlated_with"] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Chem-physically_interacts-Gene.json b/workers/bte_lookup/templates/Chem-physically_interacts-Gene.json new file mode 100644 index 0000000..e12f08f --- /dev/null +++ b/workers/bte_lookup/templates/Chem-physically_interacts-Gene.json @@ -0,0 +1,21 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "on": { + "categories":["biolink:Gene", "biolink:Protein"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "on", + "predicates": ["biolink:physically_interacts_with"] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Drug-treats-Disease/Chem-decreaseGeneWithFunctionGainIn-Disease.json b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-decreaseGeneWithFunctionGainIn-Disease.json new file mode 100644 index 0000000..285a45f --- /dev/null +++ b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-decreaseGeneWithFunctionGainIn-Disease.json @@ -0,0 +1,52 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene"], + "is_set": true + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "decreased" + }, + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "activity_or_abundance" + } + ] + } + ] + }, + "eB": { + "subject": "nA", + "object": "on", + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:subject_form_or_variant_qualifier", + "qualifier_value": "gain_of_function_variant_form" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Drug-treats-Disease/Chem-increaseGeneWithFunctionLossIn-Disease.json b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-increaseGeneWithFunctionLossIn-Disease.json new file mode 100644 index 0000000..50e2452 --- /dev/null +++ b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-increaseGeneWithFunctionLossIn-Disease.json @@ -0,0 +1,52 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene"], + "is_set": true + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "increased" + }, + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "activity_or_abundance" + } + ] + } + ] + }, + "eB": { + "subject": "nA", + "object": "on", + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:subject_form_or_variant_qualifier", + "qualifier_value": "loss_of_function_variant_form" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Drug-treats-Disease/Chem-interacts,correlated,associated-Gene-biomarker,associated_condition-DoP.json b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-interacts,correlated,associated-Gene-biomarker,associated_condition-DoP.json new file mode 100644 index 0000000..f442547 --- /dev/null +++ b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-interacts,correlated,associated-Gene-biomarker,associated_condition-DoP.json @@ -0,0 +1,33 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene"], + "is_set": true + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:physically_interacts_with", "biolink:correlated_with", "biolink:associated_with"] + }, + "eB": { + "subject": "nA", + "object": "on", + "predicates": [ + "biolink:gene_associated_with_condition", + "biolink:biomarker_for" + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Drug-treats-Disease/Chem-regulates,affects-Gene-biomarker,associated_condition-DoP.json b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-regulates,affects-Gene-biomarker,associated_condition-DoP.json new file mode 100644 index 0000000..44e585e --- /dev/null +++ b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-regulates,affects-Gene-biomarker,associated_condition-DoP.json @@ -0,0 +1,33 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene"], + "is_set": true + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:regulates", "biolink:affects"] + }, + "eB": { + "subject": "nA", + "object": "on", + "predicates": [ + "biolink:gene_associated_with_condition", + "biolink:biomarker_for" + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Drug-treats-Disease/Chem-regulates,affects-Gene-cause,contribute,affects-DoP.json b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-regulates,affects-Gene-cause,contribute,affects-DoP.json new file mode 100644 index 0000000..79bff8b --- /dev/null +++ b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-regulates,affects-Gene-cause,contribute,affects-DoP.json @@ -0,0 +1,34 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:Gene"], + "is_set": true + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:regulates", "biolink:affects"] + }, + "eB": { + "subject": "nA", + "object": "on", + "predicates": [ + "biolink:affects", + "biolink:causes", + "biolink:contributes_to" + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-DoP-causes,part-DoP.json b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-DoP-causes,part-DoP.json new file mode 100644 index 0000000..27b1fcf --- /dev/null +++ b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-DoP-causes,part-DoP.json @@ -0,0 +1,33 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:DiseaseOrPhenotypicFeature"], + "is_set": true + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:treats"] + }, + "eB": { + "subject": "nA", + "object": "on", + "predicates": [ + "biolink:part_of", + "biolink:causes" + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-DoP-similar,part-DoP.json b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-DoP-similar,part-DoP.json new file mode 100644 index 0000000..9087f61 --- /dev/null +++ b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-DoP-similar,part-DoP.json @@ -0,0 +1,33 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:DiseaseOrPhenotypicFeature"], + "is_set": true + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:treats"] + }, + "eB": { + "subject": "on", + "object": "nA", + "predicates": [ + "biolink:similar_to", + "biolink:has_part" + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-DoP.json b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-DoP.json new file mode 100644 index 0000000..f0e746a --- /dev/null +++ b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-DoP.json @@ -0,0 +1,21 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "on", + "predicates": ["biolink:treats_or_applied_or_studied_to_treat"] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-PhenoOfDisease.json b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-PhenoOfDisease.json new file mode 100644 index 0000000..a1b6fd7 --- /dev/null +++ b/workers/bte_lookup/templates/Drug-treats-Disease/Chem-treats-PhenoOfDisease.json @@ -0,0 +1,30 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:ChemicalEntity"] + }, + "nA": { + "categories":["biolink:PhenotypicFeature"], + "is_set": true + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "nA", + "predicates": ["biolink:treats_or_applied_or_studied_to_treat"] + }, + "eB": { + "subject": "on", + "object": "nA", + "predicates": ["biolink:has_phenotype"] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/Drug-treats-Disease/ChemDecreasesLikelihood-Disease.json b/workers/bte_lookup/templates/Drug-treats-Disease/ChemDecreasesLikelihood-Disease.json new file mode 100644 index 0000000..36e6da7 --- /dev/null +++ b/workers/bte_lookup/templates/Drug-treats-Disease/ChemDecreasesLikelihood-Disease.json @@ -0,0 +1,35 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories": + ["ChemicalEntity"] + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "on", + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "decreased" + }, + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "likelihood" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/NonChem-treats-Disease/NonChem-treats-DoP.json b/workers/bte_lookup/templates/NonChem-treats-Disease/NonChem-treats-DoP.json new file mode 100644 index 0000000..f351c6b --- /dev/null +++ b/workers/bte_lookup/templates/NonChem-treats-Disease/NonChem-treats-DoP.json @@ -0,0 +1,22 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories": + ["biolink:Procedure", "biolink:Treatment", "biolink:ClinicalIntervention"] + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "on", + "predicates": ["biolink:treats"] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/NonChem-treats-Disease/Procedure-decreasesLikelihood-Disease.json b/workers/bte_lookup/templates/NonChem-treats-Disease/Procedure-decreasesLikelihood-Disease.json new file mode 100644 index 0000000..ad8796a --- /dev/null +++ b/workers/bte_lookup/templates/NonChem-treats-Disease/Procedure-decreasesLikelihood-Disease.json @@ -0,0 +1,35 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories": + ["Procedure"] + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "on", + "qualifier_constraints": [ + { + "qualifier_set": [ + { + "qualifier_type_id": "biolink:object_direction_qualifier", + "qualifier_value": "decreased" + }, + { + "qualifier_type_id": "biolink:object_aspect_qualifier", + "qualifier_value": "likelihood" + } + ] + } + ] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/NonChem-treats-Disease/Treatment-related_to-DoP.json b/workers/bte_lookup/templates/NonChem-treats-Disease/Treatment-related_to-DoP.json new file mode 100644 index 0000000..726dc06 --- /dev/null +++ b/workers/bte_lookup/templates/NonChem-treats-Disease/Treatment-related_to-DoP.json @@ -0,0 +1,21 @@ +{ + "message": { + "query_graph": { + "nodes": { + "sn": { + "categories":["biolink:Treatment"] + }, + "on": { + "categories":["biolink:DiseaseOrPhenotypicFeature"] + } + }, + "edges": { + "eA": { + "subject": "sn", + "object": "on", + "predicates": ["biolink:related_to"] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/less-promising/m3-Disease-SeqVar-Gene-Chem.json b/workers/bte_lookup/templates/less-promising/m3-Disease-SeqVar-Gene-Chem.json new file mode 100644 index 0000000..90b46d1 --- /dev/null +++ b/workers/bte_lookup/templates/less-promising/m3-Disease-SeqVar-Gene-Chem.json @@ -0,0 +1,37 @@ +{ + "message": { + "query_graph": { + "nodes": { + "on": { + "categories":["biolink:Disease"] + }, + "nA": { + "categories":["biolink:SequenceVariant"], + "is_set": true + }, + "nB": { + "categories":["biolink:Gene"], + "is_set": true + }, + "sn": { + "categories":["biolink:ChemicalEntity"] + } + }, + "edges": { + "eA": { + "subject": "on", + "object": "nA" + }, + "eB": { + "subject": "nA", + "object": "nB" + }, + "eC": { + "subject": "nB", + "object": "sn", + "predicates": ["biolink:regulated_by", "biolink:affected_by"] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/less-promising/m5-Disease-Pheno-Gene-Chem.json b/workers/bte_lookup/templates/less-promising/m5-Disease-Pheno-Gene-Chem.json new file mode 100644 index 0000000..a9ff676 --- /dev/null +++ b/workers/bte_lookup/templates/less-promising/m5-Disease-Pheno-Gene-Chem.json @@ -0,0 +1,39 @@ +{ + "message": { + "query_graph": { + "nodes": { + "on": { + "categories":["biolink:Disease"] + }, + "nA": { + "categories":["biolink:PhenotypicFeature"], + "is_set": true + }, + "nB": { + "categories":["biolink:Gene"], + "is_set": true + }, + "sn": { + "categories":["biolink:ChemicalEntity"] + } + }, + "edges": { + "eA": { + "subject": "on", + "object": "nA", + "predicates": ["biolink:has_phenotype"] + }, + "eB": { + "subject": "nA", + "object": "nB", + "predicates": ["biolink:regulated_by", "biolink:affected_by"] + }, + "eC": { + "subject": "nB", + "object": "sn", + "predicates": ["biolink:regulated_by", "biolink:affected_by"] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/templates/less-promising/m6-Disease-Gene-Gene-Chem.json b/workers/bte_lookup/templates/less-promising/m6-Disease-Gene-Gene-Chem.json new file mode 100644 index 0000000..012b177 --- /dev/null +++ b/workers/bte_lookup/templates/less-promising/m6-Disease-Gene-Gene-Chem.json @@ -0,0 +1,39 @@ +{ + "message": { + "query_graph": { + "nodes": { + "on": { + "categories":["biolink:Disease"] + }, + "nA": { + "categories":["biolink:Gene"], + "is_set": true + }, + "nB": { + "categories":["biolink:Gene"], + "is_set": true + }, + "sn": { + "categories":["biolink:ChemicalEntity"] + } + }, + "edges": { + "eA": { + "subject": "on", + "object": "nA", + "predicates": ["biolink:caused_by"] + }, + "eB": { + "subject": "nA", + "object": "nB", + "predicates": ["biolink:physically_interacts_with"] + }, + "eC": { + "subject": "nB", + "object": "sn", + "predicates": ["biolink:regulated_by", "biolink:affected_by"] + } + } + } + } +} \ No newline at end of file diff --git a/workers/bte_lookup/worker.py b/workers/bte_lookup/worker.py new file mode 100644 index 0000000..1572d63 --- /dev/null +++ b/workers/bte_lookup/worker.py @@ -0,0 +1,349 @@ +"""BTE ARA module.""" + +import asyncio +import json +import logging +import time +import uuid +from pathlib import Path +from typing import Any, Dict, Optional + +import httpx +from pydantic import BaseModel, parse_obj_as + +from shepherd_utils.config import settings +from shepherd_utils.db import ( + add_callback_id, + get_message, + get_running_callbacks, + save_message, +) +from shepherd_utils.otel import setup_tracer +from shepherd_utils.shared import get_tasks, wrap_up_task + +# Queue name +STREAM = "bte.lookup" +# Consumer group, most likely you don't need to change this. +GROUP = "consumer" +CONSUMER = str(uuid.uuid4())[:8] +TASK_LIMIT = 100 +tracer = setup_tracer(STREAM) + + +def examine_query(message): + """Decides whether the input is an infer. Returns the grouping node""" + # Currently, we support: + # queries that are any shape with all lookup edges + # OR + # A 1-hop infer query. + # OR + # Pathfinder query + try: + # this can still fail if the input looks like e.g.: + # "query_graph": None + qedges = message.get("message", {}).get("query_graph", {}).get("edges", {}) + except: + qedges = {} + n_infer_edges = 0 + for edge_id in qedges: + if qedges.get(edge_id, {}).get("knowledge_type", "lookup") == "inferred": + n_infer_edges += 1 + pathfinder = n_infer_edges == 3 + if n_infer_edges > 1 and n_infer_edges and not pathfinder: + raise Exception("Only a single infer edge is supported", 400) + if (n_infer_edges > 0) and (n_infer_edges < len(qedges)): + raise Exception("Mixed infer and lookup queries not supported", 400) + infer = n_infer_edges == 1 + if not infer: + return infer, None, None, pathfinder + qnodes = message.get("message", {}).get("query_graph", {}).get("nodes", {}) + question_node = None + answer_node = None + for qnode_id, qnode in qnodes.items(): + if qnode.get("ids", None) is None: + answer_node = qnode_id + else: + question_node = qnode_id + if answer_node is None: + raise Exception("Both nodes of creative edge pinned", 400) + if question_node is None: + raise Exception("No nodes of creative edge pinned", 400) + return infer, question_node, answer_node, pathfinder + + +async def bte_lookup(task, logger: logging.Logger): + start = time.time() + # given a task, get the message from the db + query_id = task[1]["query_id"] + workflow = json.loads(task[1]["workflow"]) + message = await get_message(query_id, logger) + try: + infer, question_qnode, answer_qnode, pathfinder = examine_query(message) + except Exception as e: + logger.error(e) + return None, 500 + if pathfinder: + # BTE currently doesn't handle Pathfinder queries + return None, 500 + + if not infer: + # Put callback UID and query ID in postgres + callback_id = str(uuid.uuid4())[:8] + await add_callback_id(query_id, callback_id, logger) + # put lookup query graph in redis + await save_message( + f"{query_id}_lookup_query_graph", message["message"]["query_graph"], logger + ) + message["callback"] = f"{settings.callback_host}/bte/callback/{callback_id}" + + async with httpx.AsyncClient(timeout=100) as client: + await client.post( + settings.kg_retrieval_url, + json=message, + ) + else: + expanded_messages = expand_bte_query(message, logger) + logger.info(f"Expanded to {len(expanded_messages)} messages") + requests = [] + if len(expanded_messages) == 0: + lookup_query_graph = message["message"]["query_graph"] + else: + lookup_query_graph = expanded_messages[0]["message"]["query_graph"] + # put lookup query graph in redis + await save_message( + f"{query_id}_lookup_query_graph", + lookup_query_graph, + logger, + ) + # send all messages to retriever + async with httpx.AsyncClient(timeout=100) as client: + for expanded_message in expanded_messages: + callback_id = str(uuid.uuid4())[:8] + # Put callback UID and query ID in postgres + await add_callback_id(query_id, callback_id, logger) + + expanded_message["callback"] = ( + f"{settings.callback_host}/bte/callback/{callback_id}" + ) + + logger.debug( + f"""Sending lookup query to {settings.kg_retrieval_url} with callback {expanded_message['callback']}""" + ) + request = client.post( + settings.kg_retrieval_url, + json=expanded_message, + ) + requests.append(request) + # Then we can retrieve all callback ids from query id to see which are still + # being looked up + # fire all the lookups at the same time + await asyncio.gather(*requests) + + # this worker might have a timeout set for if the lookups don't finish within a certain + # amount of time + MAX_QUERY_TIME = 300 + start_time = time.time() + running_callback_ids = [""] + while time.time() - start_time < MAX_QUERY_TIME: + # see if there are existing lookups going + running_callback_ids = await get_running_callbacks(query_id, logger) + # logger.info(f"Got back {len(running_callback_ids)} running lookups") + # if there are, continue to wait + if len(running_callback_ids) > 0: + await asyncio.sleep(1) + continue + # if there aren't, lookup is complete and we need to pass on to next workflow operation + if len(running_callback_ids) == 0: + logger.debug("Got all lookups back. Continuing...") + break + + if time.time() - start_time > MAX_QUERY_TIME: + logger.warning( + f"Timed out getting lookup callbacks. {len(running_callback_ids)} queries still running..." + ) + # TODO: clean up any on-going queries so they don't get merged in after we've already moved on and potentially overwrite with an old message + + await wrap_up_task(STREAM, GROUP, task, workflow, logger) + logger.info(f"Finished task {task[0]} in {time.time() - start}") + + +class TemplateGroup(BaseModel): + """A group of templates to be matched by given criteria.""" + + name: str + subject: list[str] + predicate: list[str] + object: list[str] + templates: list[str] + qualifiers: Optional[dict[str, str]] + + +def get_params( + query_graph: Dict, +) -> tuple[ + Optional[str], + Optional[str], + Optional[str], + Optional[str], + Optional[str], + dict[str, str], +]: + """Obtain some important parameters from the query graph.""" + edge = next(iter(query_graph["edges"].values())) + + q_subject = query_graph["nodes"].get(edge["subject"]) + subject_type = next(iter(q_subject.get("categories") or []), None) + + q_object = query_graph["nodes"].get(edge["object"]) + object_type = next(iter(q_object.get("categories") or []), None) + + predicate = next(iter(edge.get("predicates") or []), None) + + subject_curie = next(iter(q_subject.get("ids") or []), None) + object_curie = next(iter(q_object.get("ids") or []), None) + qualifiers: dict[str, str] = {} + + qualifier_constraints = edge.get("qualifier_constraints") or [] + if qualifier_constraints is not None and len(qualifier_constraints) > 0: + qualifiers = { + qualifier["qualifier_type_id"]: qualifier["qualifier_value"] + for qualifier in qualifier_constraints[0]["qualifier_set"] + } + + return ( + subject_type, + subject_curie, + object_type, + object_curie, + predicate, + qualifiers, + ) + + +def match_templates( + subject_type: Optional[str], + object_type: Optional[str], + predicate: Optional[str], + qualifiers: dict[str, str], + logger: logging.Logger, +) -> list[Path]: + """Match a given set of parameters to a number of templates.""" + + # TODO: expand subject/object types by descending the biolink hierarchy + subject_types: set[str] = set() + object_types: set[str] = set() + predicates: set[str] = set() + if subject_type is not None: + subject_types.add(subject_type.removeprefix("biolink:")) + if object_type is not None: + object_types.add(object_type.removeprefix("biolink:")) + if predicate is not None: + predicates.add(predicate.removeprefix("biolink:")) + + with open( + Path(__file__).parent / "template_groups.json", "r", encoding="utf-8" + ) as file: + templateGroups = parse_obj_as(list[TemplateGroup], json.load(file)) + + template_paths = { + path.name: path + for path in (Path(__file__).parent / "templates").rglob("*.json") + } + + matched_paths: set[Path] = set() + for group in templateGroups: + conditions: list[bool] = [] + conditions.append(len(subject_types.intersection(group.subject)) > 0) + conditions.append(len(object_types.intersection(group.object)) > 0) + conditions.append(len(predicates.intersection(group.predicate)) > 0) + conditions.append( # Qualifiers (if they exist) are satisfied + all( + (group.qualifiers or {}).get(qualifier_type, False) == value + for qualifier_type, value in qualifiers.items() + ) + ) + + if all(conditions): + for template in group.templates: + matched_paths.add(template_paths[template]) + + return list(matched_paths) + + +def fill_templates( + paths: list[Path], + query_body: Dict, + subject_curie: Optional[str], + object_curie: Optional[str], +) -> list[Dict]: + filled_templates: list[Dict] = [] + for path in paths: + with open(path, "r") as file: + template = json.load(file) + if subject_curie is not None: + template["message"]["query_graph"]["nodes"]["sn"]["ids"] = [subject_curie] + if object_curie is not None: + template["message"]["query_graph"]["nodes"]["on"]["ids"] = [object_curie] + filled_templates.append(template) + if query_body.get("log_level") is not None: + template["log_level"] = query_body["log_level"] + + return filled_templates + + +def expand_bte_query(query_dict: dict[str, Any], logger: logging.Logger) -> list[Any]: + """Expand a given query into the appropriate templates.""" + # Contract: + # 1. there is a single edge in the query graph + # 2. The edge is marked inferred. + # 3. Either the source or the target has IDs, but not both. + # 4. The number of ids on the query node is 1. + + query_graph = query_dict["message"].get("query_graph") + if query_graph is None: + return [] + subject_type, subject_curie, object_type, object_curie, predicate, qualifiers = ( + get_params(query_graph) + ) + + matched_template_paths = match_templates( + subject_type, + object_type, + predicate, + qualifiers, + logger, + ) + + templates = fill_templates( + matched_template_paths, query_dict, subject_curie, object_curie + ) + + final_templates = [] + + for template in templates: + if template["message"].get("knowledge_graph") is not None: + del template["message"]["knowledge_graph"] + template["workflow"] = [{"id": "lookup"}] + final_templates.append(template) + + return final_templates + + +async def process_task(task, parent_ctx, logger, limiter): + span = tracer.start_span(STREAM, context=parent_ctx) + try: + await bte_lookup(task, logger) + finally: + span.end() + limiter.release() + + +async def poll_for_tasks(): + async for task, parent_ctx, logger, limiter in get_tasks( + STREAM, GROUP, CONSUMER, TASK_LIMIT + ): + asyncio.create_task(process_task(task, parent_ctx, logger, limiter)) + + +if __name__ == "__main__": + asyncio.run(poll_for_tasks())