diff --git a/pyproject.toml b/pyproject.toml index 5883371..955a4c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "amrita_core" -version = "0.8.4" +version = "0.8.4.1" description = "High performance, lightweight agent framework." readme = "README.md" requires-python = ">=3.10,<3.15" diff --git a/src/amrita_core/chatmanager.py b/src/amrita_core/chatmanager.py index ad39c5b..4eafefc 100644 --- a/src/amrita_core/chatmanager.py +++ b/src/amrita_core/chatmanager.py @@ -638,7 +638,13 @@ async def _entry(self) -> None: self._is_done = True self.end_at = datetime.now(utc) chat_manager.running_chat_object_id2map.pop(self.stream_id, None) - chat_manager.clean_obj(self.session_id, 1000) # To avoid memory leaks + if chat_manager.clean_obj( + self.session_id, 10000 + ): # A hard limit just to avoid memory leaks + logger.warning( + "Detected too many chat objects in session id `%s`! Please check if there are any memory leaks!", + self.session_id, + ) logger.debug("Chat event processing completed") else: @@ -824,7 +830,9 @@ async def _process_chat( f"Because of `{e!s}`, LLM request failed, retrying ({i}/{self.config.llm.max_retries})..." ) ctx = FallbackContext(self.preset, e, self.config, self.context_wrap, i) - await MatcherManager.trigger_event(ctx, ctx.config, (FallbackFailed,)) + await MatcherManager.trigger_event( + ctx, ctx.config, exception_ignored=(FallbackFailed,) + ) if ctx.preset is self.preset: ctx.fail("No preset fallback available, exiting!") self.preset = ctx.preset @@ -892,7 +900,7 @@ class ChatManager: ) running_chat_object_id2map: dict[str, ChatObjectMeta] = field(default_factory=dict) - def clean_obj(self, k: str, maxitems: int = 10): + def clean_obj(self, k: str, maxitems: int = 10) -> bool: """ Clean up running chat objects under the specified key, keeping only the first 10 objects, removing any excess unfinished parts @@ -900,6 +908,9 @@ def clean_obj(self, k: str, maxitems: int = 10): Args: k (tuple[int, bool]): Key value, composed of instance ID and whether it's group chat maxitems (int, optional): Maximum number of objects. Defaults to 10. + + Returns: + bool: Whether the cleanup was successful """ objs = self.running_chat_object[k] if len(objs) > maxitems: @@ -909,6 +920,8 @@ def clean_obj(self, k: str, maxitems: int = 10): for obj in dropped_obj: self.running_chat_object_id2map.pop(obj.stream_id, None) self.running_chat_object[k] = objs + return True + return False def get_all_objs(self) -> list[ChatObjectMeta]: """ diff --git a/src/amrita_core/streaming.py b/src/amrita_core/streaming.py index 54d162e..ad44ecb 100644 --- a/src/amrita_core/streaming.py +++ b/src/amrita_core/streaming.py @@ -116,7 +116,8 @@ async def _wait_for_continue(self, tag: str | None = None) -> bool: await self.__resume_signal return True try: - self.__suspend_signal.set_result(True) + if not self.__suspend_signal.done(): + self.__suspend_signal.set_result(True) self.__resume_signal = asyncio.Future() await self.__resume_signal return True @@ -150,7 +151,6 @@ def resume(self) -> None: """Resume to run when suspend.""" if self.__resume_signal and not self.__resume_signal.done(): self.__resume_signal.set_result(True) - self._suspend_tags = None def queue_closed(self) -> bool: """Check if the response queue is closed.