-
Notifications
You must be signed in to change notification settings - Fork 37
feat: add workflow query endpoint for agent state inspection #173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f9c67b3
fc5693a
37eee69
f476f0b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,9 @@ | ||
| from typing import Annotated | ||
| from typing import Annotated, Any | ||
|
|
||
| from fastapi import APIRouter, Query | ||
| from fastapi.responses import StreamingResponse | ||
|
|
||
| from src.adapters.temporal.adapter_temporal import DTemporalAdapter | ||
| from src.api.schemas.authorization_types import ( | ||
| AgentexResource, | ||
| AgentexResourceType, | ||
|
|
@@ -215,3 +216,24 @@ async def stream_task_events_by_name( | |
| "X-Accel-Buffering": "no", | ||
| }, | ||
| ) | ||
|
|
||
|
|
||
| @router.get( | ||
| "/{task_id}/query/{query_name}", | ||
| summary="Query Task Workflow", | ||
| description="Query a Temporal workflow associated with a task for its current state.", | ||
| ) | ||
| async def query_task_workflow( | ||
| task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.read), | ||
| query_name: str, | ||
| temporal_adapter: DTemporalAdapter, | ||
| ) -> dict[str, Any]: | ||
| """ | ||
| Query a Temporal workflow by task ID and query name. | ||
| Returns the query result from the workflow. | ||
| """ | ||
| result = await temporal_adapter.query_workflow( | ||
| workflow_id=task_id, | ||
| query=query_name, | ||
| ) | ||
|
Comment on lines
+235
to
+238
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The endpoint passes Other endpoints in this file first load the task via Prompt To Fix With AIThis is a comment left during a code review.
Path: agentex/src/api/routes/tasks.py
Line: 235-238
Comment:
**No validation that the task exists before querying Temporal**
The endpoint passes `task_id` directly as `workflow_id` to Temporal without first checking that a task with this ID exists in the database. If a caller provides a valid-format UUID that doesn't correspond to a task (but does match a Temporal workflow started by another system, e.g., a healthcheck workflow), they'd get a result from the wrong workflow.
Other endpoints in this file first load the task via `task_use_case.get_task(id=task_id)` which returns a 404 if the task doesn't exist. This endpoint should do the same to maintain consistency and prevent querying unrelated workflows.
How can I resolve this? If you propose a fix, please make it concise. |
||
| return {"task_id": task_id, "query": query_name, "result": result} | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add tests for this new route?