Skip to content

Commit c140b0b

Browse files
Merge pull request #564 from microsoft/psl-us-23031-dk
perf: added event polling instead of looping
2 parents acd7f18 + c7bc593 commit c140b0b

File tree

6 files changed

+364
-33
lines changed

6 files changed

+364
-33
lines changed

src/backend/v3/api/router.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -396,8 +396,8 @@ async def plan_approval(
396396
orchestration_config
397397
and human_feedback.m_plan_id in orchestration_config.approvals
398398
):
399-
orchestration_config.approvals[human_feedback.m_plan_id] = (
400-
human_feedback.approved
399+
orchestration_config.set_approval_result(
400+
human_feedback.m_plan_id, human_feedback.approved
401401
)
402402
# orchestration_config.plans[human_feedback.m_plan_id][
403403
# "plan_id"
@@ -528,10 +528,10 @@ async def user_clarification(
528528
orchestration_config
529529
and human_feedback.request_id in orchestration_config.clarifications
530530
):
531-
orchestration_config.clarifications[human_feedback.request_id] = (
532-
human_feedback.answer
531+
# Use the new event-driven method to set clarification result
532+
orchestration_config.set_clarification_result(
533+
human_feedback.request_id, human_feedback.answer
533534
)
534-
535535
try:
536536
result = await PlanService.handle_human_clarification(
537537
human_feedback, user_id

src/backend/v3/config/settings.py

Lines changed: 150 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import asyncio
77
import json
88
import logging
9-
from typing import Dict
9+
from typing import Dict, Optional
1010

1111
from common.config.app_config import config
1212
from common.models.messages_kernel import TeamConfiguration
@@ -86,10 +86,159 @@ def __init__(self):
8686
20 # Maximum number of replanning rounds 20 needed to accommodate complex tasks
8787
)
8888

89+
# Event-driven notification system for approvals and clarifications
90+
self._approval_events: Dict[str, asyncio.Event] = {}
91+
self._clarification_events: Dict[str, asyncio.Event] = {}
92+
93+
# Default timeout for waiting operations (5 minutes)
94+
self.default_timeout: float = 300.0
95+
8996
def get_current_orchestration(self, user_id: str) -> MagenticOrchestration:
9097
"""get existing orchestration instance."""
9198
return self.orchestrations.get(user_id, None)
9299

100+
def set_approval_pending(self, plan_id: str) -> None:
101+
"""Set an approval as pending and create an event for it."""
102+
self.approvals[plan_id] = None
103+
if plan_id not in self._approval_events:
104+
self._approval_events[plan_id] = asyncio.Event()
105+
else:
106+
# Clear existing event to reset state
107+
self._approval_events[plan_id].clear()
108+
109+
def set_approval_result(self, plan_id: str, approved: bool) -> None:
110+
"""Set the approval result and trigger the event."""
111+
self.approvals[plan_id] = approved
112+
if plan_id in self._approval_events:
113+
self._approval_events[plan_id].set()
114+
115+
async def wait_for_approval(self, plan_id: str, timeout: Optional[float] = None) -> bool:
116+
"""
117+
Wait for an approval decision with timeout.
118+
119+
Args:
120+
plan_id: The plan ID to wait for
121+
timeout: Timeout in seconds (defaults to default_timeout)
122+
123+
Returns:
124+
The approval decision (True/False)
125+
126+
Raises:
127+
asyncio.TimeoutError: If timeout is exceeded
128+
KeyError: If plan_id is not found in approvals
129+
"""
130+
if timeout is None:
131+
timeout = self.default_timeout
132+
133+
if plan_id not in self.approvals:
134+
raise KeyError(f"Plan ID {plan_id} not found in approvals")
135+
136+
if self.approvals[plan_id] is not None:
137+
# Already has a result
138+
return self.approvals[plan_id]
139+
140+
if plan_id not in self._approval_events:
141+
self._approval_events[plan_id] = asyncio.Event()
142+
143+
try:
144+
await asyncio.wait_for(self._approval_events[plan_id].wait(), timeout=timeout)
145+
return self.approvals[plan_id]
146+
except asyncio.TimeoutError:
147+
# Clean up on timeout
148+
self.cleanup_approval(plan_id)
149+
raise
150+
except asyncio.CancelledError:
151+
# Handle task cancellation gracefully
152+
logger.debug(f"Approval request {plan_id} was cancelled")
153+
raise
154+
except Exception as e:
155+
# Handle any other unexpected errors
156+
logger.error(f"Unexpected error waiting for approval {plan_id}: {e}")
157+
raise
158+
finally:
159+
# Ensure cleanup happens regardless of how the try block exits
160+
# Only cleanup if the approval is still pending (None) to avoid
161+
# cleaning up successful approvals
162+
if plan_id in self.approvals and self.approvals[plan_id] is None:
163+
self.cleanup_approval(plan_id)
164+
165+
def set_clarification_pending(self, request_id: str) -> None:
166+
"""Set a clarification as pending and create an event for it."""
167+
self.clarifications[request_id] = None
168+
if request_id not in self._clarification_events:
169+
self._clarification_events[request_id] = asyncio.Event()
170+
else:
171+
# Clear existing event to reset state
172+
self._clarification_events[request_id].clear()
173+
174+
def set_clarification_result(self, request_id: str, answer: str) -> None:
175+
"""Set the clarification response and trigger the event."""
176+
self.clarifications[request_id] = answer
177+
if request_id in self._clarification_events:
178+
self._clarification_events[request_id].set()
179+
180+
async def wait_for_clarification(self, request_id: str, timeout: Optional[float] = None) -> str:
181+
"""
182+
Wait for a clarification response with timeout.
183+
184+
Args:
185+
request_id: The request ID to wait for
186+
timeout: Timeout in seconds (defaults to default_timeout)
187+
188+
Returns:
189+
The clarification response
190+
191+
Raises:
192+
asyncio.TimeoutError: If timeout is exceeded
193+
KeyError: If request_id is not found in clarifications
194+
"""
195+
if timeout is None:
196+
timeout = self.default_timeout
197+
198+
if request_id not in self.clarifications:
199+
raise KeyError(f"Request ID {request_id} not found in clarifications")
200+
201+
if self.clarifications[request_id] is not None:
202+
# Already has a result
203+
return self.clarifications[request_id]
204+
205+
if request_id not in self._clarification_events:
206+
self._clarification_events[request_id] = asyncio.Event()
207+
208+
try:
209+
await asyncio.wait_for(self._clarification_events[request_id].wait(), timeout=timeout)
210+
return self.clarifications[request_id]
211+
except asyncio.TimeoutError:
212+
# Clean up on timeout
213+
self.cleanup_clarification(request_id)
214+
raise
215+
except asyncio.CancelledError:
216+
# Handle task cancellation gracefully
217+
logger.debug(f"Clarification request {request_id} was cancelled")
218+
raise
219+
except Exception as e:
220+
# Handle any other unexpected errors
221+
logger.error(f"Unexpected error waiting for clarification {request_id}: {e}")
222+
raise
223+
finally:
224+
# Ensure cleanup happens regardless of how the try block exits
225+
# Only cleanup if the clarification is still pending (None) to avoid
226+
# cleaning up successful clarifications
227+
if request_id in self.clarifications and self.clarifications[request_id] is None:
228+
self.cleanup_clarification(request_id)
229+
230+
def cleanup_approval(self, plan_id: str) -> None:
231+
"""Clean up approval resources."""
232+
self.approvals.pop(plan_id, None)
233+
if plan_id in self._approval_events:
234+
del self._approval_events[plan_id]
235+
236+
def cleanup_clarification(self, request_id: str) -> None:
237+
"""Clean up clarification resources."""
238+
self.clarifications.pop(request_id, None)
239+
if request_id in self._clarification_events:
240+
del self._clarification_events[request_id]
241+
93242

94243
class ConnectionConfig:
95244
"""Connection manager for WebSocket connections."""

src/backend/v3/magentic_agents/proxy_agent.py

Lines changed: 102 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import asyncio
55
import logging
6+
import time
67
import uuid
78
from collections.abc import AsyncIterable
89
from typing import AsyncIterator, Optional
@@ -30,6 +31,9 @@
3031
from v3.models.messages import (UserClarificationRequest,
3132
UserClarificationResponse, WebsocketMessageType)
3233

34+
# Initialize logger for the module
35+
logger = logging.getLogger(__name__)
36+
3337

3438
class DummyAgentThread(AgentThread):
3539
"""Dummy thread implementation for proxy agent."""
@@ -185,10 +189,16 @@ async def invoke(
185189
clarification_message.request_id
186190
)
187191

188-
if not human_response:
189-
human_response = "No additional clarification provided."
192+
# Handle silent timeout/cancellation
193+
if human_response is None:
194+
# Process was terminated silently - don't yield any response
195+
logger.debug("Clarification process terminated silently - ending invoke")
196+
return
197+
198+
# Extract the answer from the response
199+
answer = human_response.answer if human_response else "No additional clarification provided."
190200

191-
response = f"Human clarification: {human_response}"
201+
response = f"Human clarification: {answer}"
192202

193203
chat_message = self._create_message_content(response, thread.id)
194204

@@ -242,10 +252,16 @@ async def invoke_stream(
242252
clarification_message.request_id
243253
)
244254

245-
if not human_response:
246-
human_response = "No additional clarification provided."
255+
# Handle silent timeout/cancellation
256+
if human_response is None:
257+
# Process was terminated silently - don't yield any response
258+
logger.debug("Clarification process terminated silently - ending invoke_stream")
259+
return
260+
261+
# Extract the answer from the response
262+
answer = human_response.answer if human_response else "No additional clarification provided."
247263

248-
response = f"Human clarification: {human_response}"
264+
response = f"Human clarification: {answer}"
249265

250266
chat_message = self._create_message_content(response, thread.id)
251267

@@ -254,16 +270,86 @@ async def invoke_stream(
254270
async def _wait_for_user_clarification(
255271
self, request_id: str
256272
) -> Optional[UserClarificationResponse]:
257-
"""Wait for user clarification response."""
258-
# To do: implement timeout and error handling
259-
if request_id not in orchestration_config.clarifications:
260-
orchestration_config.clarifications[request_id] = None
261-
while orchestration_config.clarifications[request_id] is None:
262-
await asyncio.sleep(0.2)
263-
return UserClarificationResponse(
264-
request_id=request_id,
265-
answer=orchestration_config.clarifications[request_id],
266-
)
273+
"""
274+
Wait for user clarification response using event-driven pattern with timeout handling.
275+
276+
Args:
277+
request_id: The request ID to wait for clarification
278+
279+
Returns:
280+
UserClarificationResponse: Clarification result with request ID and answer
281+
282+
Raises:
283+
asyncio.TimeoutError: If timeout is exceeded (300 seconds default)
284+
"""
285+
# logger.info(f"Waiting for user clarification for request: {request_id}")
286+
287+
# Initialize clarification as pending using the new event-driven method
288+
orchestration_config.set_clarification_pending(request_id)
289+
290+
try:
291+
# Wait for clarification with timeout using the new event-driven method
292+
answer = await orchestration_config.wait_for_clarification(request_id)
293+
294+
# logger.info(f"Clarification received for request {request_id}: {answer}")
295+
return UserClarificationResponse(
296+
request_id=request_id,
297+
answer=answer,
298+
)
299+
except asyncio.TimeoutError:
300+
# Enhanced timeout handling - notify user via WebSocket and cleanup
301+
logger.debug(f"Clarification timeout for request {request_id} - notifying user and terminating process")
302+
303+
# Create timeout notification message
304+
from v3.models.messages import TimeoutNotification, WebsocketMessageType
305+
timeout_notification = TimeoutNotification(
306+
timeout_type="clarification",
307+
request_id=request_id,
308+
message=f"User clarification request timed out after {orchestration_config.default_timeout} seconds. Please try again.",
309+
timestamp=time.time(),
310+
timeout_duration=orchestration_config.default_timeout
311+
)
312+
313+
# Send timeout notification to user via WebSocket
314+
try:
315+
await connection_config.send_status_update_async(
316+
message=timeout_notification,
317+
user_id=self.user_id,
318+
message_type=WebsocketMessageType.TIMEOUT_NOTIFICATION,
319+
)
320+
logger.info(f"Timeout notification sent to user {self.user_id} for clarification {request_id}")
321+
except Exception as e:
322+
logger.error(f"Failed to send timeout notification: {e}")
323+
324+
# Clean up this specific request
325+
orchestration_config.cleanup_clarification(request_id)
326+
327+
# Return None to indicate silent termination
328+
# The timeout naturally stops this specific wait operation without affecting other tasks
329+
return None
330+
331+
except KeyError as e:
332+
# Silent error handling for invalid request IDs
333+
logger.debug(f"Request ID not found: {e} - terminating process silently")
334+
return None
335+
336+
except asyncio.CancelledError:
337+
# Handle task cancellation gracefully
338+
logger.debug(f"Clarification request {request_id} was cancelled")
339+
orchestration_config.cleanup_clarification(request_id)
340+
return None
341+
342+
except Exception as e:
343+
# Silent error handling for unexpected errors
344+
logger.debug(f"Unexpected error waiting for clarification: {e} - terminating process silently")
345+
orchestration_config.cleanup_clarification(request_id)
346+
return None
347+
finally:
348+
# Ensure cleanup happens for any incomplete requests
349+
# This provides an additional safety net for resource cleanup
350+
if (request_id in orchestration_config.clarifications and orchestration_config.clarifications[request_id] is None):
351+
logger.debug(f"Final cleanup for pending clarification request {request_id}")
352+
orchestration_config.cleanup_clarification(request_id)
267353

268354
async def get_response(self, chat_history, **kwargs):
269355
"""Get response from the agent - required by Agent base class."""

src/backend/v3/models/messages.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,27 @@ class AgentMessageResponse:
176176
streaming_message: str = None
177177

178178

179+
@dataclass(slots=True)
180+
class TimeoutNotification:
181+
"""Notification sent to user when session timeout occurs."""
182+
183+
timeout_type: str # "approval" or "clarification"
184+
request_id: str # plan_id or request_id that timed out
185+
message: str # Human-readable timeout message
186+
timestamp: float # When the timeout occurred
187+
timeout_duration: float # How long we waited before timing out
188+
189+
def to_dict(self) -> Dict[str, Any]:
190+
"""Convert to dictionary for JSON serialization."""
191+
return {
192+
"timeout_type": self.timeout_type,
193+
"request_id": self.request_id,
194+
"message": self.message,
195+
"timestamp": self.timestamp,
196+
"timeout_duration": self.timeout_duration
197+
}
198+
199+
179200
class WebsocketMessageType(str, Enum):
180201
"""Types of WebSocket messages."""
181202

@@ -192,3 +213,4 @@ class WebsocketMessageType(str, Enum):
192213
USER_CLARIFICATION_REQUEST = "user_clarification_request"
193214
USER_CLARIFICATION_RESPONSE = "user_clarification_response"
194215
FINAL_RESULT_MESSAGE = "final_result_message"
216+
TIMEOUT_NOTIFICATION = "timeout_notification"

0 commit comments

Comments
 (0)