@@ -560,6 +560,7 @@ def __init__(self, proxy: CopilotProvider):
560560 self .sse_processor : Optional [SSEProcessor ] = None
561561 self .output_pipeline_instance : Optional [OutputPipelineInstance ] = None
562562 self .stream_queue : Optional [asyncio .Queue ] = None
563+ self .processing_task : Optional [asyncio .Task ] = None
563564
564565 def connection_made (self , transport : asyncio .Transport ) -> None :
565566 """Handle successful connection to target"""
@@ -609,9 +610,7 @@ async def stream_iterator():
609610 StreamingChoices (
610611 finish_reason = choice .get ("finish_reason" , None ),
611612 index = 0 ,
612- delta = Delta (
613- content = content , role = "assistant"
614- ),
613+ delta = Delta (content = content , role = "assistant" ),
615614 logprobs = None ,
616615 )
617616 )
@@ -643,8 +642,17 @@ async def stream_iterator():
643642 # Now send the final zero chunk
644643 self ._proxy_transport_write (b"0\r \n \r \n " )
645644
645+ except asyncio .CancelledError :
646+ logger .debug ("Stream processing cancelled" )
647+ raise
646648 except Exception as e :
647649 logger .error (f"Error processing stream: { e } " )
650+ finally :
651+ # Clean up
652+ if self .processing_task and not self .processing_task .done ():
653+ self .processing_task .cancel ()
654+ if self .proxy .context_tracking and self .proxy .context_tracking .sensitive :
655+ self .proxy .context_tracking .sensitive .secure_cleanup ()
648656
649657 def _process_chunk (self , chunk : bytes ):
650658 records = self .sse_processor .process_chunk (chunk )
@@ -689,6 +697,7 @@ def data_received(self, data: bytes) -> None:
689697
690698 def connection_lost (self , exc : Optional [Exception ]) -> None :
691699 """Handle connection loss to target"""
700+
692701 if (
693702 not self .proxy ._closing
694703 and self .proxy .transport
@@ -699,4 +708,8 @@ def connection_lost(self, exc: Optional[Exception]) -> None:
699708 except Exception as e :
700709 logger .error (f"Error closing proxy transport: { e } " )
701710
702- # todo: clear the context to erase the sensitive data
711+ # Clean up resources
712+ if self .processing_task and not self .processing_task .done ():
713+ self .processing_task .cancel ()
714+ if self .proxy .context_tracking and self .proxy .context_tracking .sensitive :
715+ self .proxy .context_tracking .sensitive .secure_cleanup ()
0 commit comments