Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "amrita_core"
version = "0.8.4"
version = "0.8.4.1"
description = "High performance, lightweight agent framework."
readme = "README.md"
requires-python = ">=3.10,<3.15"
Expand Down
19 changes: 16 additions & 3 deletions src/amrita_core/chatmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,13 @@ async def _entry(self) -> None:
self._is_done = True
self.end_at = datetime.now(utc)
chat_manager.running_chat_object_id2map.pop(self.stream_id, None)
chat_manager.clean_obj(self.session_id, 1000) # To avoid memory leaks
if chat_manager.clean_obj(
self.session_id, 10000
): # A hard limit just to avoid memory leaks
logger.warning(
"Detected too many chat objects in session id `%s`! Please check if there are any memory leaks!",
self.session_id,
)
logger.debug("Chat event processing completed")

else:
Expand Down Expand Up @@ -824,7 +830,9 @@ async def _process_chat(
f"Because of `{e!s}`, LLM request failed, retrying ({i}/{self.config.llm.max_retries})..."
)
ctx = FallbackContext(self.preset, e, self.config, self.context_wrap, i)
await MatcherManager.trigger_event(ctx, ctx.config, (FallbackFailed,))
await MatcherManager.trigger_event(
ctx, ctx.config, exception_ignored=(FallbackFailed,)
)
if ctx.preset is self.preset:
ctx.fail("No preset fallback available, exiting!")
self.preset = ctx.preset
Expand Down Expand Up @@ -892,14 +900,17 @@ class ChatManager:
)
running_chat_object_id2map: dict[str, ChatObjectMeta] = field(default_factory=dict)

def clean_obj(self, k: str, maxitems: int = 10):
def clean_obj(self, k: str, maxitems: int = 10) -> bool:
"""
Clean up running chat objects under the specified key, keeping only the first 10 objects,
removing any excess unfinished parts

Args:
k (tuple[int, bool]): Key value, composed of instance ID and whether it's group chat
maxitems (int, optional): Maximum number of objects. Defaults to 10.

Returns:
bool: Whether the cleanup was successful
"""
objs = self.running_chat_object[k]
if len(objs) > maxitems:
Expand All @@ -909,6 +920,8 @@ def clean_obj(self, k: str, maxitems: int = 10):
for obj in dropped_obj:
self.running_chat_object_id2map.pop(obj.stream_id, None)
self.running_chat_object[k] = objs
return True
return False

def get_all_objs(self) -> list[ChatObjectMeta]:
"""
Expand Down
4 changes: 2 additions & 2 deletions src/amrita_core/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ async def _wait_for_continue(self, tag: str | None = None) -> bool:
await self.__resume_signal
return True
try:
self.__suspend_signal.set_result(True)
if not self.__suspend_signal.done():
self.__suspend_signal.set_result(True)
self.__resume_signal = asyncio.Future()
await self.__resume_signal
return True
Expand Down Expand Up @@ -150,7 +151,6 @@ def resume(self) -> None:
"""Resume to run when suspend."""
if self.__resume_signal and not self.__resume_signal.done():
self.__resume_signal.set_result(True)
self._suspend_tags = None

def queue_closed(self) -> bool:
"""Check if the response queue is closed.
Expand Down
Loading