Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion agentex/src/adapters/temporal/adapter_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ async def query_workflow(

try:
handle = self.client.get_workflow_handle(workflow_id, run_id=run_id)
result = await handle.query(query, arg)
if arg is not None:
result = await handle.query(query, arg)
else:
result = await handle.query(query)
logger.info(f"Queried workflow {workflow_id} with query '{query}'")
return result
except Exception as e:
Expand Down
24 changes: 23 additions & 1 deletion agentex/src/api/routes/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Annotated
from typing import Annotated, Any

from fastapi import APIRouter, Query
from fastapi.responses import StreamingResponse

from src.adapters.temporal.adapter_temporal import DTemporalAdapter
from src.api.schemas.authorization_types import (
AgentexResource,
AgentexResourceType,
Expand Down Expand Up @@ -215,3 +216,24 @@ async def stream_task_events_by_name(
"X-Accel-Buffering": "no",
},
)


@router.get(
"/{task_id}/query/{query_name}",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add tests for this new route?

summary="Query Task Workflow",
description="Query a Temporal workflow associated with a task for its current state.",
)
async def query_task_workflow(
task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.read),
query_name: str,
temporal_adapter: DTemporalAdapter,
) -> dict[str, Any]:
"""
Query a Temporal workflow by task ID and query name.
Returns the query result from the workflow.
"""
result = await temporal_adapter.query_workflow(
workflow_id=task_id,
query=query_name,
)
Comment on lines +235 to +238
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 No validation that the task exists before querying Temporal

The endpoint passes task_id directly as workflow_id to Temporal without first checking that a task with this ID exists in the database. If a caller provides a valid-format UUID that doesn't correspond to a task (but does match a Temporal workflow started by another system, e.g., a healthcheck workflow), they'd get a result from the wrong workflow.

Other endpoints in this file first load the task via task_use_case.get_task(id=task_id) which returns a 404 if the task doesn't exist. This endpoint should do the same to maintain consistency and prevent querying unrelated workflows.

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/api/routes/tasks.py
Line: 235-238

Comment:
**No validation that the task exists before querying Temporal**

The endpoint passes `task_id` directly as `workflow_id` to Temporal without first checking that a task with this ID exists in the database. If a caller provides a valid-format UUID that doesn't correspond to a task (but does match a Temporal workflow started by another system, e.g., a healthcheck workflow), they'd get a result from the wrong workflow.

Other endpoints in this file first load the task via `task_use_case.get_task(id=task_id)` which returns a 404 if the task doesn't exist. This endpoint should do the same to maintain consistency and prevent querying unrelated workflows.

How can I resolve this? If you propose a fix, please make it concise.

return {"task_id": task_id, "query": query_name, "result": result}
Loading