Skip to content
Merged

Bte #48

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
worker: [aragorn, aragorn_lookup, aragorn_omnicorp, aragorn_score, arax, sipr, filter_kgraph_orphans, filter_results_top_n, finish_query, merge_message, sort_results_score]
worker: [aragorn, aragorn_lookup, aragorn_omnicorp, aragorn_score, arax, bte, bte_lookup, sipr, filter_kgraph_orphans, filter_results_top_n, finish_query, merge_message, sort_results_score]
steps:
- name: Check out the repo
uses: actions/checkout@v4
Expand Down
31 changes: 31 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,37 @@ services:
volumes:
- ./logs:/app/logs
- ./.env:/app/.env

######### BTE
bte:
container_name: bte
build:
context: .
dockerfile: workers/bte/Dockerfile
restart: unless-stopped
depends_on:
shepherd_db:
condition: service_healthy
shepherd_broker:
condition: service_healthy
volumes:
- ./logs:/app/logs
- ./.env:/app/.env

bte_lookup:
container_name: bte_lookup
build:
context: .
dockerfile: workers/bte_lookup/Dockerfile
restart: unless-stopped
depends_on:
shepherd_db:
condition: service_healthy
shepherd_broker:
condition: service_healthy
volumes:
- ./logs:/app/logs
- ./.env:/app/.env

######### SIPR
sipr:
Expand Down
63 changes: 63 additions & 0 deletions shepherd_server/aras/bte.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from fastapi import Body, FastAPI, Request, Response
from fastapi.openapi.docs import (
get_swagger_ui_html,
)
from starlette.responses import HTMLResponse

from shepherd_server.base_routes import (
ARATargetEnum,
base_router,
callback,
default_input_query,
run_async_query,
run_sync_query,
)
from shepherd_server.openapi import construct_open_api_schema

BTE = FastAPI(title="Shepherd BTE")


@BTE.post("/query")
async def sync_query(
query: dict = Body(..., example=default_input_query),
) -> Response:
response = await run_sync_query(ARATargetEnum.BTE, query)
return response


@BTE.post("/asyncquery")
async def async_query(
query: dict = Body(..., example=default_input_query),
) -> Response:
response = await run_async_query(ARATargetEnum.BTE, query)
return response


@BTE.post("/callback/{callback_id}", status_code=200, include_in_schema=False)
async def handle_callback(
callback_id: str,
response: dict,
) -> Response:
response = await callback(ARATargetEnum.BTE, callback_id, response)
return response


BTE.include_router(base_router, prefix="")


@BTE.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html(req: Request) -> HTMLResponse:
"""Customize Swagger UI."""
root_path = req.scope.get("root_path", "").rstrip("/")
openapi_url = root_path + BTE.openapi_url
swagger_favicon_url = root_path + "/static/favicon.png"
return get_swagger_ui_html(
openapi_url=openapi_url,
title=BTE.title + " - Swagger UI",
swagger_favicon_url=swagger_favicon_url,
)


BTE.openapi_schema = construct_open_api_schema(
BTE, infores="infores:shepherd-bte", subpath="/bte"
)
1 change: 1 addition & 0 deletions shepherd_server/base_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
class ARATargetEnum(str, Enum):
ARAGORN = "aragorn"
ARAX = "arax"
BTE = "bte"
EXAMPLE = "example"
SIPR = "sipr"

Expand Down
2 changes: 1 addition & 1 deletion shepherd_server/openapi-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ contact:
x-id: https://github.com/maximusunc
x-role: responsible developer
description: '<img src="/static/favicon.png" width="200px"><br /><br />Shepherd: Translator Autonomous Relay Agent Platform'
version: 0.3.0
version: 0.4.0
servers:
- description: Default server
url: https://shepherd.renci.org
Expand Down
2 changes: 2 additions & 0 deletions shepherd_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from shepherd_server.aras.aragorn import ARAGORN
from shepherd_server.aras.arax import ARAX
from shepherd_server.aras.bte import BTE
from shepherd_server.aras.sipr import SIPR
from shepherd_server.base_routes import base_router
from shepherd_server.openapi import construct_open_api_schema
Expand Down Expand Up @@ -51,6 +52,7 @@ async def lifespan(app: FastAPI):

APP.mount("/aragorn", ARAGORN)
APP.mount("/arax", ARAX)
APP.mount("/bte", BTE)
APP.mount("/sipr", SIPR)

APP.add_middleware(
Expand Down
34 changes: 34 additions & 0 deletions workers/bte/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Use RENCI python base image
FROM ghcr.io/translatorsri/renci-python-image:3.11.5

# Add image info
LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd

ENV PYTHONHASHSEED=0

# set up requirements
WORKDIR /app

# make sure all is writeable for the nru USER later on
RUN chmod -R 777 .

# Install requirements
COPY ./shepherd_utils ./shepherd_utils
COPY ./pyproject.toml .
RUN pip install .

COPY ./workers/bte/requirements.txt .
RUN pip install -r requirements.txt

# switch to the non-root user (nru). defined in the base image
USER nru

# Copy in files
COPY ./workers/bte ./

# Set up base for command and any variables
# that shouldn't be modified
# ENTRYPOINT ["uvicorn", "shepherd_server.server:APP"]

# Variables that can be overriden
CMD ["python", "worker.py"]
Empty file added workers/bte/__init__.py
Empty file.
Empty file added workers/bte/requirements.txt
Empty file.
133 changes: 133 additions & 0 deletions workers/bte/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""BTE ARA module."""

import asyncio
import logging
import time
import uuid
from shepherd_utils.db import get_message
from shepherd_utils.otel import setup_tracer
from shepherd_utils.shared import get_tasks, wrap_up_task

# Queue name
STREAM = "bte"
# Consumer group, most likely you don't need to change this.
GROUP = "consumer"
CONSUMER = str(uuid.uuid4())[:8]
TASK_LIMIT = 100
tracer = setup_tracer(STREAM)


def examine_query(message):
"""Decides whether the input is an infer. Returns the grouping node"""
# Currently, we support:
# queries that are any shape with all lookup edges
# OR
# A 1-hop infer query.
# OR
# Pathfinder query
try:
# this can still fail if the input looks like e.g.:
# "query_graph": None
qedges = message.get("message", {}).get("query_graph", {}).get("edges", {})
except:
qedges = {}
n_infer_edges = 0
for edge_id in qedges:
if qedges.get(edge_id, {}).get("knowledge_type", "lookup") == "inferred":
n_infer_edges += 1
pathfinder = n_infer_edges == 3
if n_infer_edges > 1 and n_infer_edges and not pathfinder:
raise Exception("Only a single infer edge is supported", 400)
if (n_infer_edges > 0) and (n_infer_edges < len(qedges)):
raise Exception("Mixed infer and lookup queries not supported", 400)
infer = n_infer_edges == 1
if not infer:
return infer, None, None, pathfinder
qnodes = message.get("message", {}).get("query_graph", {}).get("nodes", {})
question_node = None
answer_node = None
for qnode_id, qnode in qnodes.items():
if qnode.get("ids", None) is None:
answer_node = qnode_id
else:
question_node = qnode_id
if answer_node is None:
raise Exception("Both nodes of creative edge pinned", 400)
if question_node is None:
raise Exception("No nodes of creative edge pinned", 400)
return infer, question_node, answer_node, pathfinder


async def bte(task, logger: logging.Logger):
"""Main BTE entrypoint that establishes query workflow."""
start = time.time()
# given a task, get the message from the db
query_id = task[1]["query_id"]
message = await get_message(query_id, logger)
try:
infer, question_qnode, answer_qnode, pathfinder = examine_query(message)
except Exception as e:
logger.error(e)
return None, 500
if pathfinder:
# BTE doesn't currently handle Pathfinder queries
return None, 500

supported_workflow_operations = set(
[
"bte.lookup",
"aragorn.omnicorp",
"aragorn.score",
"sort_results_score",
"filter_results_top_n",
"filter_kgraph_ophans",
]
)
workflow = None
if "workflow" in message:
workflow = message["workflow"]
for workflow_op in workflow:
if workflow_op not in supported_workflow_operations:
logger.error(f"Unsupported workflow operation: {workflow_op}")
else:
if infer:
workflow = [
{"id": "bte.lookup"},
{"id": "aragorn.omnicorp"},
{"id": "aragorn.score"},
{"id": "sort_results_score"},
{"id": "filter_results_top_n", "parameters": {"max_results": 500}},
{"id": "filter_kgraph_orphans"},
]
else:
workflow = [
{"id": "bte.lookup"},
{"id": "aragorn.omnicorp"},
{"id": "aragorn.score"},
{"id": "sort_results_score"},
{"id": "filter_results_top_n", "parameters": {"max_results": 500}},
{"id": "filter_kgraph_orphans"},
]

await wrap_up_task(STREAM, GROUP, task, workflow, logger)
logger.info(f"Task took {time.time() - start}")


async def process_task(task, parent_ctx, logger, limiter):
span = tracer.start_span(STREAM, context=parent_ctx)
try:
await bte(task, logger)
finally:
span.end()
limiter.release()


async def poll_for_tasks():
async for task, parent_ctx, logger, limiter in get_tasks(
STREAM, GROUP, CONSUMER, TASK_LIMIT
):
asyncio.create_task(process_task(task, parent_ctx, logger, limiter))


if __name__ == "__main__":
asyncio.run(poll_for_tasks())
34 changes: 34 additions & 0 deletions workers/bte_lookup/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Use RENCI python base image
FROM ghcr.io/translatorsri/renci-python-image:3.11.5

# Add image info
LABEL org.opencontainers.image.source https://github.com/BioPack-team/shepherd

ENV PYTHONHASHSEED=0

# set up requirements
WORKDIR /app

# make sure all is writeable for the nru USER later on
RUN chmod -R 777 .

# Install requirements
COPY ./shepherd_utils ./shepherd_utils
COPY ./pyproject.toml .
RUN pip install .

COPY ./workers/bte_lookup/requirements.txt .
RUN pip install -r requirements.txt

# switch to the non-root user (nru). defined in the base image
USER nru

# Copy in files
COPY ./workers/bte_lookup ./

# Set up base for command and any variables
# that shouldn't be modified
# ENTRYPOINT ["uvicorn", "shepherd_server.server:APP"]

# Variables that can be overriden
CMD ["python", "worker.py"]
Empty file added workers/bte_lookup/__init__.py
Empty file.
Empty file.
Loading
Loading