diff --git a/agentex/src/adapters/temporal/adapter_temporal.py b/agentex/src/adapters/temporal/adapter_temporal.py index ad11fbd7..f64d0d29 100644 --- a/agentex/src/adapters/temporal/adapter_temporal.py +++ b/agentex/src/adapters/temporal/adapter_temporal.py @@ -188,7 +188,10 @@ async def query_workflow( try: handle = self.client.get_workflow_handle(workflow_id, run_id=run_id) - result = await handle.query(query, arg) + if arg is not None: + result = await handle.query(query, arg) + else: + result = await handle.query(query) logger.info(f"Queried workflow {workflow_id} with query '{query}'") return result except Exception as e: diff --git a/agentex/src/api/routes/tasks.py b/agentex/src/api/routes/tasks.py index 0cccdc5e..de036d53 100644 --- a/agentex/src/api/routes/tasks.py +++ b/agentex/src/api/routes/tasks.py @@ -1,8 +1,9 @@ -from typing import Annotated +from typing import Annotated, Any from fastapi import APIRouter, Query from fastapi.responses import StreamingResponse +from src.adapters.temporal.adapter_temporal import DTemporalAdapter from src.api.schemas.authorization_types import ( AgentexResource, AgentexResourceType, @@ -215,3 +216,24 @@ async def stream_task_events_by_name( "X-Accel-Buffering": "no", }, ) + + +@router.get( + "/{task_id}/query/{query_name}", + summary="Query Task Workflow", + description="Query a Temporal workflow associated with a task for its current state.", +) +async def query_task_workflow( + task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.read), + query_name: str, + temporal_adapter: DTemporalAdapter, +) -> dict[str, Any]: + """ + Query a Temporal workflow by task ID and query name. + Returns the query result from the workflow. + """ + result = await temporal_adapter.query_workflow( + workflow_id=task_id, + query=query_name, + ) + return {"task_id": task_id, "query": query_name, "result": result}