diff --git a/dev/microsoft-agents-testing/microsoft_agents/testing/core/fluent/activity.py b/dev/microsoft-agents-testing/microsoft_agents/testing/core/fluent/activity.py deleted file mode 100644 index 6d967a5b..00000000 --- a/dev/microsoft-agents-testing/microsoft_agents/testing/core/fluent/activity.py +++ /dev/null @@ -1,404 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -"""Activity-specific fluent utilities. - -This module contains a specialized assertion class (ActivityExpect) for -Activity objects. The implementation is commented out pending finalization -of the API design. -""" - -from __future__ import annotations - -from microsoft_agents.activity import Activity, ActivityTypes - -from typing import Iterable, Self - -# BUG: Duplicate import of Activity — the first import on the line above -# already imports Activity from microsoft_agents.activity. -from microsoft_agents.activity import Activity, ActivityTypes # TODO: Duplicate import of Activity - -from .expect import Expect -from .model_template import ModelTemplate - -# TODO: ActivityExpect is commented out - determine if it should be removed or completed - -# class ActivityExpect(Expect): -# """ -# Specialized Expect class for asserting on Activity objects. - -# Provides convenience methods for common Activity assertions. - -# Usage: -# # Assert all activities are messages -# ActivityExpect(responses).are_messages() - -# # Assert conversation was started -# ActivityExpect(responses).starts_conversation() - -# # Assert text contains value -# ActivityExpect(responses).has_text_containing("hello") -# """ - -# def __init__(self, items: Iterable[Activity]) -> None: -# """Initialize ActivityExpect with Activity objects. - -# :param items: An iterable of Activity instances. -# """ -# super().__init__(items) - -# # ========================================================================= -# # Type Assertions -# # ========================================================================= - -# def are_messages(self) -> Self: -# """Assert that all activities are of type 'message'. - -# :raises AssertionError: If any activity is not a message. -# :return: Self for chaining. -# """ -# return self.that(type=ActivityTypes.message) - -# def are_typing(self) -> Self: -# """Assert that all activities are of type 'typing'. - -# :raises AssertionError: If any activity is not typing. -# :return: Self for chaining. -# """ -# return self.that(type=ActivityTypes.typing) - -# def are_events(self) -> Self: -# """Assert that all activities are of type 'event'. - -# :raises AssertionError: If any activity is not an event. -# :return: Self for chaining. -# """ -# return self.that(type=ActivityTypes.event) - -# def has_type(self, activity_type: str) -> Self: -# """Assert that all activities have the specified type. - -# :param activity_type: The expected activity type. -# :raises AssertionError: If any activity doesn't match the type. -# :return: Self for chaining. -# """ -# return self.that(type=activity_type) - -# def has_any_type(self, activity_type: str) -> Self: -# """Assert that at least one activity has the specified type. - -# :param activity_type: The expected activity type. -# :raises AssertionError: If no activity matches the type. -# :return: Self for chaining. -# """ -# return self.that_for_any(type=activity_type) - -# # ========================================================================= -# # Conversation Flow Assertions -# # ========================================================================= - -# def starts_conversation(self) -> Self: -# """Assert that the activities include a conversation start. - -# Checks for conversationUpdate with membersAdded. - -# :raises AssertionError: If no conversation start activity found. -# :return: Self for chaining. -# """ -# def is_conversation_start(activity: Activity) -> bool: -# if activity.type != ActivityTypes.conversation_update: -# return False -# return bool(activity.members_added and len(activity.members_added) > 0) - -# return self.that_for_any(is_conversation_start) - -# def ends_conversation(self) -> Self: -# """Assert that the activities include a conversation end. - -# Checks for endOfConversation activity type. - -# :raises AssertionError: If no conversation end activity found. -# :return: Self for chaining. -# """ -# return self.that_for_any(type=ActivityTypes.end_of_conversation) - -# def has_members_added(self) -> Self: -# """Assert that at least one activity has members added. - -# :raises AssertionError: If no activity has members added. -# :return: Self for chaining. -# """ -# def has_members(activity: Activity) -> bool: -# return bool(activity.members_added and len(activity.members_added) > 0) - -# return self.that_for_any(has_members) - -# def has_members_removed(self) -> Self: -# """Assert that at least one activity has members removed. - -# :raises AssertionError: If no activity has members removed. -# :return: Self for chaining. -# """ -# def has_removed(activity: Activity) -> bool: -# return bool(activity.members_removed and len(activity.members_removed) > 0) - -# return self.that_for_any(has_removed) - -# # ========================================================================= -# # Text Assertions -# # ========================================================================= - -# def has_text(self, text: str) -> Self: -# """Assert that all activities have the exact text. - -# :param text: The expected text. -# :raises AssertionError: If any activity doesn't have the exact text. -# :return: Self for chaining. -# """ -# return self.that(text=text) - -# def has_any_text(self, text: str) -> Self: -# """Assert that at least one activity has the exact text. - -# :param text: The expected text. -# :raises AssertionError: If no activity has the exact text. -# :return: Self for chaining. -# """ -# return self.that_for_any(text=text) - -# def has_text_containing(self, substring: str) -> Self: -# """Assert that all activities have text containing the substring. - -# :param substring: The substring to search for. -# :raises AssertionError: If any activity doesn't contain the substring. -# :return: Self for chaining. -# """ -# def contains_text(activity: Activity) -> bool: -# return activity.text is not None and substring in activity.text - -# return self.that(contains_text) - -# def has_any_text_containing(self, substring: str) -> Self: -# """Assert that at least one activity has text containing the substring. - -# :param substring: The substring to search for. -# :raises AssertionError: If no activity contains the substring. -# :return: Self for chaining. -# """ -# def contains_text(activity: Activity) -> bool: -# return activity.text is not None and substring in activity.text - -# return self.that_for_any(contains_text) - -# def has_text_matching(self, pattern: str) -> Self: -# """Assert that all activities have text matching the regex pattern. - -# :param pattern: The regex pattern to match. -# :raises AssertionError: If any activity doesn't match the pattern. -# :return: Self for chaining. -# """ -# import re -# regex = re.compile(pattern) - -# def matches_pattern(activity: Activity) -> bool: -# return activity.text is not None and regex.search(activity.text) is not None - -# return self.that(matches_pattern) - -# def has_any_text_matching(self, pattern: str) -> Self: -# """Assert that at least one activity has text matching the regex pattern. - -# :param pattern: The regex pattern to match. -# :raises AssertionError: If no activity matches the pattern. -# :return: Self for chaining. -# """ -# import re -# regex = re.compile(pattern) - -# def matches_pattern(activity: Activity) -> bool: -# return activity.text is not None and regex.search(activity.text) is not None - -# return self.that_for_any(matches_pattern) - -# # ========================================================================= -# # Attachment Assertions -# # ========================================================================= - -# def has_attachments(self) -> Self: -# """Assert that all activities have at least one attachment. - -# :raises AssertionError: If any activity has no attachments. -# :return: Self for chaining. -# """ -# def has_attach(activity: Activity) -> bool: -# return bool(activity.attachments and len(activity.attachments) > 0) - -# return self.that(has_attach) - -# def has_any_attachments(self) -> Self: -# """Assert that at least one activity has attachments. - -# :raises AssertionError: If no activity has attachments. -# :return: Self for chaining. -# """ -# def has_attach(activity: Activity) -> bool: -# return bool(activity.attachments and len(activity.attachments) > 0) - -# return self.that_for_any(has_attach) - -# def has_attachment_of_type(self, content_type: str) -> Self: -# """Assert that at least one activity has an attachment of the specified type. - -# :param content_type: The attachment content type (e.g., 'image/png'). -# :raises AssertionError: If no matching attachment found. -# :return: Self for chaining. -# """ -# def has_type(activity: Activity) -> bool: -# if not activity.attachments: -# return False -# return any(a.content_type == content_type for a in activity.attachments) - -# return self.that_for_any(has_type) - -# def has_adaptive_card(self) -> Self: -# """Assert that at least one activity has an Adaptive Card attachment. - -# :raises AssertionError: If no Adaptive Card found. -# :return: Self for chaining. -# """ -# return self.has_attachment_of_type("application/vnd.microsoft.card.adaptive") - -# def has_hero_card(self) -> Self: -# """Assert that at least one activity has a Hero Card attachment. - -# :raises AssertionError: If no Hero Card found. -# :return: Self for chaining. -# """ -# return self.has_attachment_of_type("application/vnd.microsoft.card.hero") - -# def has_thumbnail_card(self) -> Self: -# """Assert that at least one activity has a Thumbnail Card attachment. - -# :raises AssertionError: If no Thumbnail Card found. -# :return: Self for chaining. -# """ -# return self.has_attachment_of_type("application/vnd.microsoft.card.thumbnail") - -# # ========================================================================= -# # Suggested Actions Assertions -# # ========================================================================= - -# def has_suggested_actions(self) -> Self: -# """Assert that at least one activity has suggested actions. - -# :raises AssertionError: If no activity has suggested actions. -# :return: Self for chaining. -# """ -# def has_actions(activity: Activity) -> bool: -# return bool( -# activity.suggested_actions -# and activity.suggested_actions.actions -# and len(activity.suggested_actions.actions) > 0 -# ) - -# return self.that_for_any(has_actions) - -# def has_suggested_action_titled(self, title: str) -> Self: -# """Assert that at least one activity has a suggested action with the given title. - -# :param title: The expected action title. -# :raises AssertionError: If no matching suggested action found. -# :return: Self for chaining. -# """ -# def has_action_title(activity: Activity) -> bool: -# if not activity.suggested_actions or not activity.suggested_actions.actions: -# return False -# return any(a.title == title for a in activity.suggested_actions.actions) - -# return self.that_for_any(has_action_title) - -# # ========================================================================= -# # Channel/Conversation Assertions -# # ========================================================================= - -# def from_channel(self, channel_id: str) -> Self: -# """Assert that all activities are from the specified channel. - -# :param channel_id: The expected channel ID. -# :raises AssertionError: If any activity is from a different channel. -# :return: Self for chaining. -# """ -# return self.that(channel_id=channel_id) - -# def in_conversation(self, conversation_id: str) -> Self: -# """Assert that all activities are in the specified conversation. - -# :param conversation_id: The expected conversation ID. -# :raises AssertionError: If any activity is in a different conversation. -# :return: Self for chaining. -# """ -# def in_conv(activity: Activity) -> bool: -# return activity.conversation is not None and activity.conversation.id == conversation_id - -# return self.that(in_conv) - -# def from_user(self, user_id: str) -> Self: -# """Assert that all activities are from the specified user. - -# :param user_id: The expected user ID. -# :raises AssertionError: If any activity is from a different user. -# :return: Self for chaining. -# """ -# def from_usr(activity: Activity) -> bool: -# return activity.from_property is not None and activity.from_property.id == user_id - -# return self.that(from_usr) - -# def to_recipient(self, recipient_id: str) -> Self: -# """Assert that all activities are addressed to the specified recipient. - -# :param recipient_id: The expected recipient ID. -# :raises AssertionError: If any activity is to a different recipient. -# :return: Self for chaining. -# """ -# def to_recip(activity: Activity) -> bool: -# return activity.recipient is not None and activity.recipient.id == recipient_id - -# return self.that(to_recip) - -# # ========================================================================= -# # Value/Entity Assertions -# # ========================================================================= - -# def has_value(self) -> Self: -# """Assert that all activities have a value set. - -# :raises AssertionError: If any activity has no value. -# :return: Self for chaining. -# """ -# def has_val(activity: Activity) -> bool: -# return activity.value is not None - -# return self.that(has_val) - -# def has_entities(self) -> Self: -# """Assert that at least one activity has entities. - -# :raises AssertionError: If no activity has entities. -# :return: Self for chaining. -# """ -# def has_ent(activity: Activity) -> bool: -# return bool(activity.entities and len(activity.entities) > 0) - -# return self.that_for_any(has_ent) - -# def has_semantic_action(self) -> Self: -# """Assert that at least one activity has a semantic action. - -# :raises AssertionError: If no activity has a semantic action. -# :return: Self for chaining. -# """ -# def has_action(activity: Activity) -> bool: -# return activity.semantic_action is not None - -# return self.that_for_any(has_action) \ No newline at end of file diff --git a/dev/microsoft-agents-testing/microsoft_agents/testing/core/utils.py b/dev/microsoft-agents-testing/microsoft_agents/testing/core/utils.py index a704737c..f8223976 100644 --- a/dev/microsoft-agents-testing/microsoft_agents/testing/core/utils.py +++ b/dev/microsoft-agents-testing/microsoft_agents/testing/core/utils.py @@ -90,4 +90,4 @@ def generate_token_from_config(sdk_config: dict, connection_name: str = "SERVICE if not client_id or not client_secret or not tenant_id: raise ValueError("Incorrect configuration provided for token generation.") - return generate_token(client_id, client_secret, tenant_id) + return generate_token(client_id, client_secret, tenant_id) \ No newline at end of file diff --git a/dev/microsoft-agents-testing/microsoft_agents/testing/utils.py b/dev/microsoft-agents-testing/microsoft_agents/testing/utils.py index 4d984d6c..f89965dd 100644 --- a/dev/microsoft-agents-testing/microsoft_agents/testing/utils.py +++ b/dev/microsoft-agents-testing/microsoft_agents/testing/utils.py @@ -102,4 +102,4 @@ def resolve_scenario(scenario_or_str: Scenario | str ) -> Scenario: else: return scenario_registry.get(scenario_or_str) else: - raise TypeError("Input must be a Scenario instance or a string key.") \ No newline at end of file + raise TypeError("Input must be a Scenario instance or a string key.") diff --git a/dev/tests/sdk/test_streaming_response.py b/dev/tests/sdk/test_streaming_response.py new file mode 100644 index 00000000..d73ad022 --- /dev/null +++ b/dev/tests/sdk/test_streaming_response.py @@ -0,0 +1,122 @@ +import pytest +import asyncio + +from microsoft_agents.activity import ( + Activity, + ActivityTypes, + Channels, + Entity +) + +from microsoft_agents.hosting.core import ( + TurnContext, + TurnState, +) + +from microsoft_agents.testing import ( + AgentClient, + AgentEnvironment, + AiohttpScenario, +) + +FULL_TEXT = "This is a streaming response." +CHUNKS = FULL_TEXT.split() + +def get_streaminfo(activity: Activity) -> Entity: + for entity in activity.entities: + if isinstance(entity, dict) and entity.get("type") == "streaminfo": + return Entity.model_validate(entity) + elif isinstance(entity, Entity) and entity.type == "streaminfo": + return entity + raise ValueError("No streaminfo entity found") + +async def init_agent(env: AgentEnvironment): + + app = env.agent_application + + @app.message("/stream") + async def stream_handler(context: TurnContext, state: TurnState): + + assert context.streaming_response is not None + + context.streaming_response.queue_informative_update("Starting stream...") + + for chunk in CHUNKS: + await asyncio.sleep(1.0) # Simulate delay between chunks + context.streaming_response.queue_text_chunk(chunk) + + await asyncio.sleep(1.0) + + await context.streaming_response.end_stream() + +_SCENARIO = AiohttpScenario(init_agent=init_agent, use_jwt_middleware=False) + +@pytest.mark.asyncio +@pytest.mark.agent_test(_SCENARIO) +async def test_basic_streaming_response_non_streaming_channel(agent_client: AgentClient): + + expected_len = len(FULL_TEXT.split()) + + agent_client.template = agent_client.template.with_updates(channel_id=Channels.emulator) + + # give enough time for all the activities to send + await agent_client.send("/stream", wait=expected_len * 2.0) + + stream_activities = agent_client.select().where( + entities=lambda x: any(e["type"] == "streaminfo" for e in x) + ).get() + + assert len(stream_activities) == 1 + + final_streaminfo = get_streaminfo(stream_activities[0]) + + assert final_streaminfo.stream_sequence == 1 + assert final_streaminfo.stream_type == "final" + assert stream_activities[0].text == FULL_TEXT.replace(" ", "") + + + +@pytest.mark.asyncio +@pytest.mark.agent_test(_SCENARIO) +async def test_basic_streaming_response_streaming_channel(agent_client: AgentClient): + + expected_len = len(FULL_TEXT.split()) + + agent_client.template = agent_client.template.with_updates(channel_id=Channels.webchat) + + # give enough time for all the activities to send + await agent_client.send("/stream", wait=expected_len * 2.0) + + stream_activities = agent_client.select().where( + entities=lambda x: any(e["type"] == "streaminfo" for e in x) + ).get() + + assert len(stream_activities) == len(CHUNKS) + 2 + + informative = stream_activities[0] + informative_streaminfo = get_streaminfo(informative) + + assert informative_streaminfo.stream_type == "informative" + assert informative_streaminfo.stream_sequence == 1 + assert informative.text == "Starting stream..." + assert informative.type == ActivityTypes.typing + + t = "" + for i, chunk in enumerate(CHUNKS): + t += chunk + + j = i + 1 + + streaminfo = get_streaminfo(stream_activities[j]) + + assert stream_activities[j].text == t + assert stream_activities[j].type == ActivityTypes.typing + assert streaminfo.stream_type == "streaming" + assert streaminfo.stream_sequence == j + 1 + + final_streaminfo = get_streaminfo(stream_activities[-1]) + + assert final_streaminfo.stream_sequence == len(stream_activities) + assert final_streaminfo.stream_type == "final" + assert stream_activities[-1].text == FULL_TEXT.replace(" ", "") + diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/__init__.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/__init__.py deleted file mode 100644 index 8216be63..00000000 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -from .streaming import ( - Citation, - CitationUtil, - StreamingResponse, -) - -__all__ = [ - "Citation", - "CitationUtil", - "StreamingResponse", -] diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/__init__.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/__init__.py deleted file mode 100644 index 4cd61f38..00000000 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -from .citation import Citation -from .citation_util import CitationUtil -from .streaming_response import StreamingResponse - -__all__ = [ - "Citation", - "CitationUtil", - "StreamingResponse", -] diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/citation.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/citation.py deleted file mode 100644 index f643639a..00000000 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/citation.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -from typing import Optional -from dataclasses import dataclass - - -@dataclass -class Citation: - """Citations returned by the model.""" - - content: str - """The content of the citation.""" - - title: Optional[str] = None - """The title of the citation.""" - - url: Optional[str] = None - """The URL of the citation.""" - - filepath: Optional[str] = None - """The filepath of the document.""" diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/citation_util.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/citation_util.py deleted file mode 100644 index 1ec923dc..00000000 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/citation_util.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -import re -from typing import List, Optional - -from microsoft_agents.activity import ClientCitation - - -class CitationUtil: - """Utility functions for manipulating text and citations.""" - - @staticmethod - def snippet(text: str, max_length: int) -> str: - """ - Clips the text to a maximum length in case it exceeds the limit. - - Args: - text: The text to clip. - max_length: The maximum length of the text to return, cutting off the last whole word. - - Returns: - The modified text - """ - if len(text) <= max_length: - return text - - snippet = text[:max_length] - snippet = snippet[: min(len(snippet), snippet.rfind(" "))] - snippet += "..." - return snippet - - @staticmethod - def format_citations_response(text: str) -> str: - """ - Convert citation tags `[doc(s)n]` to `[n]` where n is a number. - - Args: - text: The text to format. - - Returns: - The formatted text. - """ - return re.sub(r"\[docs?(\d+)\]", r"[\1]", text, flags=re.IGNORECASE) - - @staticmethod - def get_used_citations( - text: str, citations: List[ClientCitation] - ) -> Optional[List[ClientCitation]]: - """ - Get the citations used in the text. This will remove any citations that are - included in the citations array from the response but not referenced in the text. - - Args: - text: The text to search for citation references, i.e. [1], [2], etc. - citations: The list of citations to search for. - - Returns: - The list of citations used in the text. - """ - regex = re.compile(r"\[(\d+)\]", re.IGNORECASE) - matches = regex.findall(text) - - if not matches: - return None - - # Remove duplicates - filtered_matches = set(matches) - - # Add citations - used_citations = [] - for match in filtered_matches: - citation_ref = f"[{match}]" - found = next( - ( - citation - for citation in citations - if f"[{citation.position}]" == citation_ref - ), - None, - ) - if found: - used_citations.append(found) - - return used_citations if used_citations else None diff --git a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/streaming_response.py b/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/streaming_response.py deleted file mode 100644 index 05986cb1..00000000 --- a/libraries/microsoft-agents-hosting-aiohttp/microsoft_agents/hosting/aiohttp/app/streaming/streaming_response.py +++ /dev/null @@ -1,416 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -import asyncio -import logging -from typing import List, Optional, Callable, Literal - -from microsoft_agents.activity import ( - Activity, - Entity, - Attachment, - Channels, - ClientCitation, - DeliveryModes, - SensitivityUsageInfo, -) - -from microsoft_agents.hosting.core import error_resources -from microsoft_agents.hosting.core.turn_context import TurnContext - -from .citation import Citation -from .citation_util import CitationUtil - -logger = logging.getLogger(__name__) - - -class StreamingResponse: - """ - A helper class for streaming responses to the client. - - This class is used to send a series of updates to the client in a single response. - The expected sequence of calls is: - - `queue_informative_update()`, `queue_text_chunk()`, `queue_text_chunk()`, ..., `end_stream()`. - - Once `end_stream()` is called, the stream is considered ended and no further updates can be sent. - """ - - def __init__(self, context: "TurnContext"): - """ - Creates a new StreamingResponse instance. - - Args: - context: Context for the current turn of conversation with the user. - """ - self._context = context - self._sequence_number = 1 - self._stream_id: Optional[str] = None - self._message = "" - self._attachments: Optional[List[Attachment]] = None - self._ended = False - self._cancelled = False - - # Queue for outgoing activities - self._queue: List[Callable[[], Activity]] = [] - self._queue_sync: Optional[asyncio.Task] = None - self._chunk_queued = False - - # Powered by AI feature flags - self._enable_feedback_loop = False - self._feedback_loop_type: Optional[Literal["default", "custom"]] = None - self._enable_generated_by_ai_label = False - self._citations: Optional[List[ClientCitation]] = [] - self._sensitivity_label: Optional[SensitivityUsageInfo] = None - - # Channel information - self._is_streaming_channel: bool = False - self._channel_id: Channels = None - self._interval: float = 0.1 # Default interval for sending updates - self._set_defaults(context) - - @property - def stream_id(self) -> Optional[str]: - """ - Gets the stream ID of the current response. - Assigned after the initial update is sent. - """ - return self._stream_id - - @property - def citations(self) -> Optional[List[ClientCitation]]: - """Gets the citations of the current response.""" - return self._citations - - @property - def updates_sent(self) -> int: - """Gets the number of updates sent for the stream.""" - return self._sequence_number - 1 - - def queue_informative_update(self, text: str) -> None: - """ - Queues an informative update to be sent to the client. - - Args: - text: Text of the update to send. - """ - if not self._is_streaming_channel: - return - - if self._ended: - raise RuntimeError(str(error_resources.StreamAlreadyEnded)) - - # Queue a typing activity - def create_activity(): - activity = Activity( - type="typing", - text=text, - entities=[ - Entity( - type="streaminfo", - stream_type="informative", - stream_sequence=self._sequence_number, - ) - ], - ) - self._sequence_number += 1 - return activity - - self._queue_activity(create_activity) - - def queue_text_chunk( - self, text: str, citations: Optional[List[Citation]] = None - ) -> None: - """ - Queues a chunk of partial message text to be sent to the client. - - The text will be sent as quickly as possible to the client. - Chunks may be combined before delivery to the client. - - Args: - text: Partial text of the message to send. - citations: Citations to be included in the message. - """ - if self._cancelled: - return - if self._ended: - raise RuntimeError(str(error_resources.StreamAlreadyEnded)) - - # Update full message text - self._message += text - - # If there are citations, modify the content so that the sources are numbers instead of [doc1], [doc2], etc. - self._message = CitationUtil.format_citations_response(self._message) - - # Queue the next chunk - self._queue_next_chunk() - - async def end_stream(self) -> None: - """ - Ends the stream by sending the final message to the client. - """ - if self._ended: - raise RuntimeError(str(error_resources.StreamAlreadyEnded)) - - # Queue final message - self._ended = True - self._queue_next_chunk() - - # Wait for the queue to drain - await self.wait_for_queue() - - def set_attachments(self, attachments: List[Attachment]) -> None: - """ - Sets the attachments to attach to the final chunk. - - Args: - attachments: List of attachments. - """ - self._attachments = attachments - - def set_sensitivity_label(self, sensitivity_label: SensitivityUsageInfo) -> None: - """ - Sets the sensitivity label to attach to the final chunk. - - Args: - sensitivity_label: The sensitivity label. - """ - self._sensitivity_label = sensitivity_label - - def set_citations(self, citations: List[Citation]) -> None: - """ - Sets the citations for the full message. - - Args: - citations: Citations to be included in the message. - """ - if citations: - if not self._citations: - self._citations = [] - - curr_pos = len(self._citations) - - for citation in citations: - client_citation = ClientCitation( - type="Claim", - position=curr_pos + 1, - appearance={ - "type": "DigitalDocument", - "name": citation.title or f"Document #{curr_pos + 1}", - "abstract": CitationUtil.snippet(citation.content, 477), - }, - ) - curr_pos += 1 - self._citations.append(client_citation) - - def set_feedback_loop(self, enable_feedback_loop: bool) -> None: - """ - Sets the Feedback Loop in Teams that allows a user to - give thumbs up or down to a response. - Default is False. - - Args: - enable_feedback_loop: If true, the feedback loop is enabled. - """ - self._enable_feedback_loop = enable_feedback_loop - - def set_feedback_loop_type( - self, feedback_loop_type: Literal["default", "custom"] - ) -> None: - """ - Sets the type of UI to use for the feedback loop. - - Args: - feedback_loop_type: The type of the feedback loop. - """ - self._feedback_loop_type = feedback_loop_type - - def set_generated_by_ai_label(self, enable_generated_by_ai_label: bool) -> None: - """ - Sets the Generated by AI label in Teams. - Default is False. - - Args: - enable_generated_by_ai_label: If true, the label is added. - """ - self._enable_generated_by_ai_label = enable_generated_by_ai_label - - def get_message(self) -> str: - """ - Returns the most recently streamed message. - """ - return self._message - - async def wait_for_queue(self) -> None: - """ - Waits for the outgoing activity queue to be empty. - """ - if self._queue_sync: - await self._queue_sync - - def _set_defaults(self, context: "TurnContext"): - if Channels.ms_teams == context.activity.channel_id.channel: - if context.activity.is_agentic_request(): - # Agentic requests do not support streaming responses at this time. - # TODO : Enable streaming for agentic requests when supported. - self._is_streaming_channel = False - else: - self._is_streaming_channel = True - self._interval = 1.0 - elif Channels.direct_line == context.activity.channel_id.channel: - self._is_streaming_channel = True - self._interval = 0.5 - elif context.activity.delivery_mode == DeliveryModes.stream: - self._is_streaming_channel = True - self._interval = 0.1 - - self._channel_id = context.activity.channel_id - - def _queue_next_chunk(self) -> None: - """ - Queues the next chunk of text to be sent to the client. - """ - # Are we already waiting to send a chunk? - if self._chunk_queued: - return - - # Queue a chunk of text to be sent - self._chunk_queued = True - - def create_activity(): - self._chunk_queued = False - if self._ended: - # Send final message - activity = Activity( - type="message", - text=self._message or "end stream response", - attachments=self._attachments or [], - entities=[ - Entity( - type="streaminfo", - stream_id=self._stream_id, - stream_type="final", - stream_sequence=self._sequence_number, - ) - ], - ) - elif self._is_streaming_channel: - # Send typing activity - activity = Activity( - type="typing", - text=self._message, - entities=[ - Entity( - type="streaminfo", - stream_type="streaming", - stream_sequence=self._sequence_number, - ) - ], - ) - else: - return - self._sequence_number += 1 - return activity - - self._queue_activity(create_activity) - - def _queue_activity(self, factory: Callable[[], Activity]) -> None: - """ - Queues an activity to be sent to the client. - """ - self._queue.append(factory) - - # If there's no sync in progress, start one - if not self._queue_sync: - self._queue_sync = asyncio.create_task(self._drain_queue()) - - async def _drain_queue(self) -> None: - """ - Sends any queued activities to the client until the queue is empty. - """ - try: - logger.debug(f"Draining queue with {len(self._queue)} activities.") - while self._queue: - factory = self._queue.pop(0) - activity = factory() - if activity: - await self._send_activity(activity) - except Exception as err: - if ( - "403" in str(err) - and self._context.activity.channel_id == Channels.ms_teams - ): - logger.warning("Teams channel stopped the stream.") - self._cancelled = True - else: - logger.error( - f"Error occurred when sending activity while streaming: {err}" - ) - raise - finally: - self._queue_sync = None - - async def _send_activity(self, activity: Activity) -> None: - """ - Sends an activity to the client and saves the stream ID returned. - - Args: - activity: The activity to send. - """ - - streaminfo_entity = None - - if not activity.entities: - streaminfo_entity = Entity(type="streaminfo") - activity.entities = [streaminfo_entity] - else: - for entity in activity.entities: - if hasattr(entity, "type") and entity.type == "streaminfo": - streaminfo_entity = entity - break - - if not streaminfo_entity: - # If no streaminfo entity exists, create one - streaminfo_entity = Entity(type="streaminfo") - activity.entities.append(streaminfo_entity) - - # Set activity ID to the assigned stream ID - if self._stream_id: - activity.id = self._stream_id - streaminfo_entity.stream_id = self._stream_id - - if self._citations and len(self._citations) > 0 and not self._ended: - # Filter out the citations unused in content. - curr_citations = CitationUtil.get_used_citations( - self._message, self._citations - ) - if curr_citations: - activity.entities.append( - Entity( - type="https://schema.org/Message", - schema_type="Message", - context="https://schema.org", - id="", - citation=curr_citations, - ) - ) - - # Add in Powered by AI feature flags - if self._ended: - if self._enable_feedback_loop and self._feedback_loop_type: - # Add feedback loop to streaminfo entity - streaminfo_entity.feedback_loop = {"type": self._feedback_loop_type} - else: - # Add feedback loop enabled to streaminfo entity - streaminfo_entity.feedback_loop_enabled = self._enable_feedback_loop - # Add in Generated by AI - if self._enable_generated_by_ai_label: - activity.add_ai_metadata(self._citations, self._sensitivity_label) - - # Send activity - response = await self._context.send_activity(activity) - await asyncio.sleep(self._interval) - - # Save assigned stream ID - if not self._stream_id and response: - self._stream_id = response.id diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py index 2d5b0fbf..cde42b72 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/app/streaming/streaming_response.py @@ -1,9 +1,11 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import uuid import asyncio import logging from typing import List, Optional, Callable, Literal, TYPE_CHECKING +from dataclasses import dataclass from microsoft_agents.activity import ( Activity, @@ -15,8 +17,7 @@ SensitivityUsageInfo, ) -if TYPE_CHECKING: - from microsoft_agents.hosting.core.turn_context import TurnContext +from microsoft_agents.hosting.core.errors import error_resources from .citation import Citation from .citation_util import CitationUtil @@ -47,58 +48,38 @@ def __init__(self, context: "TurnContext"): self._sequence_number = 1 self._stream_id: Optional[str] = None self._message = "" - self._attachments: Optional[List[Attachment]] = None - self._ended = False - self._cancelled = False - - # Queue for outgoing activities - self._queue: List[Callable[[], Activity]] = [] + self._queue: List[Callable[[], Activity | None]] = [] self._queue_sync: Optional[asyncio.Task] = None self._chunk_queued = False - - # Powered by AI feature flags + self._ended = False + self._cancelled = False + self._is_streaming_channel = False + self._interval = 0.1 + self._attachments: Optional[List[Attachment]] = None + self._citations: Optional[List[ClientCitation]] = None + self._sensitivity_label: Optional[SensitivityUsageInfo] = None self._enable_feedback_loop = False self._feedback_loop_type: Optional[Literal["default", "custom"]] = None self._enable_generated_by_ai_label = False - self._citations: Optional[List[ClientCitation]] = [] - self._sensitivity_label: Optional[SensitivityUsageInfo] = None - # Channel information - self._is_streaming_channel: bool = False - self._channel_id: Channels = None - self._interval: float = 0.1 # Default interval for sending updates + # Set defaults based on channel self._set_defaults(context) - @property - def stream_id(self) -> Optional[str]: - """ - Gets the stream ID of the current response. - Assigned after the initial update is sent. - """ - return self._stream_id - - @property - def citations(self) -> Optional[List[ClientCitation]]: - """Gets the citations of the current response.""" - return self._citations - - @property - def updates_sent(self) -> int: - """Gets the number of updates sent for the stream.""" - return self._sequence_number - 1 - def queue_informative_update(self, text: str) -> None: """ Queues an informative update to be sent to the client. + Informative updates do not contain the message content that the user will + read but rather an indication that the agent is processing the request. + Args: - text: Text of the update to send. + text: The informative text to send to the client. """ - if not self._is_streaming_channel: + if self._cancelled or not self._is_streaming_channel: return if self._ended: - raise RuntimeError("The stream has already ended.") + raise RuntimeError(str(error_resources.StreamAlreadyEnded)) # Queue a typing activity def create_activity(): @@ -134,7 +115,7 @@ def queue_text_chunk( if self._cancelled: return if self._ended: - raise RuntimeError("The stream has already ended.") + raise RuntimeError(str(error_resources.StreamAlreadyEnded)) # Update full message text self._message += text @@ -150,7 +131,7 @@ async def end_stream(self) -> None: Ends the stream by sending the final message to the client. """ if self._ended: - raise RuntimeError("The stream has already ended.") + raise RuntimeError(str(error_resources.StreamAlreadyEnded)) # Queue final message self._ended = True @@ -249,17 +230,29 @@ async def wait_for_queue(self) -> None: await self._queue_sync def _set_defaults(self, context: "TurnContext"): - if Channels.ms_teams == context.activity.channel_id.channel: - self._is_streaming_channel = True - self._interval = 1.0 - elif Channels.direct_line == context.activity.channel_id.channel: + + channel = ( + context.activity.channel_id.channel if context.activity.channel_id else None + ) + + if channel == Channels.ms_teams: + if context.activity.is_agentic_request(): + # Agentic requests do not support streaming responses at this time. + # TODO : Enable streaming for agentic requests when supported. + self._is_streaming_channel = False + else: + self._is_streaming_channel = True + self._interval = 1.0 + elif channel in [Channels.webchat, Channels.direct_line]: self._is_streaming_channel = True self._interval = 0.5 + self._stream_id = str(uuid.uuid4()) elif context.activity.delivery_mode == DeliveryModes.stream: self._is_streaming_channel = True self._interval = 0.1 - - self._channel_id = context.activity.channel_id + self._stream_id = str(uuid.uuid4()) + else: + self._is_streaming_channel = False def _queue_next_chunk(self) -> None: """ @@ -272,7 +265,7 @@ def _queue_next_chunk(self) -> None: # Queue a chunk of text to be sent self._chunk_queued = True - def create_activity(): + def create_activity() -> Activity | None: self._chunk_queued = False if self._ended: # Send final message @@ -283,7 +276,6 @@ def create_activity(): entities=[ Entity( type="streaminfo", - stream_id=self._stream_id, stream_type="final", stream_sequence=self._sequence_number, ) @@ -309,7 +301,7 @@ def create_activity(): self._queue_activity(create_activity) - def _queue_activity(self, factory: Callable[[], Activity]) -> None: + def _queue_activity(self, factory: Callable[[], Activity | None]) -> None: """ Queues an activity to be sent to the client. """ @@ -339,7 +331,7 @@ async def _drain_queue(self) -> None: self._cancelled = True else: logger.error( - f"Error occurred when sending activity while streaming: {err}" + f"Error occurred when sending activity while streaming: {type(err).__name__}" ) raise finally: @@ -374,7 +366,7 @@ async def _send_activity(self, activity: Activity) -> None: activity.id = self._stream_id streaminfo_entity.stream_id = self._stream_id - if self._citations and len(self._citations) > 0 and not self._ended: + if self._citations and not self._ended: # Filter out the citations unused in content. curr_citations = CitationUtil.get_used_citations( self._message, self._citations diff --git a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/turn_context.py b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/turn_context.py index a3320b4b..56165e93 100644 --- a/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/turn_context.py +++ b/libraries/microsoft-agents-hosting-core/microsoft_agents/hosting/core/turn_context.py @@ -142,15 +142,9 @@ def streaming_response(self): """ # Use lazy import to avoid circular dependency if not hasattr(self, "_streaming_response"): - try: - from microsoft_agents.hosting.aiohttp.app.streaming import ( - StreamingResponse, - ) + from microsoft_agents.hosting.core.app.streaming import StreamingResponse - self._streaming_response = StreamingResponse(self) - except ImportError: - # If the hosting library isn't available, return None - self._streaming_response = None + self._streaming_response = StreamingResponse(self) return self._streaming_response @property diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/__init__.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/__init__.py deleted file mode 100644 index 8216be63..00000000 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -from .streaming import ( - Citation, - CitationUtil, - StreamingResponse, -) - -__all__ = [ - "Citation", - "CitationUtil", - "StreamingResponse", -] diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/__init__.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/__init__.py deleted file mode 100644 index 4cd61f38..00000000 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -from .citation import Citation -from .citation_util import CitationUtil -from .streaming_response import StreamingResponse - -__all__ = [ - "Citation", - "CitationUtil", - "StreamingResponse", -] diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/citation.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/citation.py deleted file mode 100644 index f643639a..00000000 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/citation.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -from typing import Optional -from dataclasses import dataclass - - -@dataclass -class Citation: - """Citations returned by the model.""" - - content: str - """The content of the citation.""" - - title: Optional[str] = None - """The title of the citation.""" - - url: Optional[str] = None - """The URL of the citation.""" - - filepath: Optional[str] = None - """The filepath of the document.""" diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/citation_util.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/citation_util.py deleted file mode 100644 index 1ec923dc..00000000 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/citation_util.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -import re -from typing import List, Optional - -from microsoft_agents.activity import ClientCitation - - -class CitationUtil: - """Utility functions for manipulating text and citations.""" - - @staticmethod - def snippet(text: str, max_length: int) -> str: - """ - Clips the text to a maximum length in case it exceeds the limit. - - Args: - text: The text to clip. - max_length: The maximum length of the text to return, cutting off the last whole word. - - Returns: - The modified text - """ - if len(text) <= max_length: - return text - - snippet = text[:max_length] - snippet = snippet[: min(len(snippet), snippet.rfind(" "))] - snippet += "..." - return snippet - - @staticmethod - def format_citations_response(text: str) -> str: - """ - Convert citation tags `[doc(s)n]` to `[n]` where n is a number. - - Args: - text: The text to format. - - Returns: - The formatted text. - """ - return re.sub(r"\[docs?(\d+)\]", r"[\1]", text, flags=re.IGNORECASE) - - @staticmethod - def get_used_citations( - text: str, citations: List[ClientCitation] - ) -> Optional[List[ClientCitation]]: - """ - Get the citations used in the text. This will remove any citations that are - included in the citations array from the response but not referenced in the text. - - Args: - text: The text to search for citation references, i.e. [1], [2], etc. - citations: The list of citations to search for. - - Returns: - The list of citations used in the text. - """ - regex = re.compile(r"\[(\d+)\]", re.IGNORECASE) - matches = regex.findall(text) - - if not matches: - return None - - # Remove duplicates - filtered_matches = set(matches) - - # Add citations - used_citations = [] - for match in filtered_matches: - citation_ref = f"[{match}]" - found = next( - ( - citation - for citation in citations - if f"[{citation.position}]" == citation_ref - ), - None, - ) - if found: - used_citations.append(found) - - return used_citations if used_citations else None diff --git a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/streaming_response.py b/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/streaming_response.py deleted file mode 100644 index 7d837dfe..00000000 --- a/libraries/microsoft-agents-hosting-fastapi/microsoft_agents/hosting/fastapi/app/streaming/streaming_response.py +++ /dev/null @@ -1,392 +0,0 @@ -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. - -import asyncio -import logging -from typing import List, Optional, Callable, Literal, TYPE_CHECKING -from dataclasses import dataclass - -from microsoft_agents.activity import ( - Activity, - Entity, - Attachment, - Channels, - ClientCitation, - DeliveryModes, - SensitivityUsageInfo, -) - -from microsoft_agents.hosting.core import error_resources -from microsoft_agents.hosting.core.turn_context import TurnContext - -from .citation import Citation -from .citation_util import CitationUtil - -logger = logging.getLogger(__name__) - - -class StreamingResponse: - """ - A helper class for streaming responses to the client. - - This class is used to send a series of updates to the client in a single response. - The expected sequence of calls is: - - `queue_informative_update()`, `queue_text_chunk()`, `queue_text_chunk()`, ..., `end_stream()`. - - Once `end_stream()` is called, the stream is considered ended and no further updates can be sent. - """ - - def __init__(self, context: "TurnContext"): - """ - Creates a new StreamingResponse instance. - - Args: - context: Context for the current turn of conversation with the user. - """ - self._context = context - self._sequence_number = 1 - self._stream_id: Optional[str] = None - self._message = "" - self._queue: List[Callable[[], Activity]] = [] - self._queue_sync: Optional[asyncio.Task] = None - self._chunk_queued = False - self._ended = False - self._cancelled = False - self._is_streaming_channel = False - self._interval = 0.1 - self._channel_id: Optional[str] = None - self._attachments: Optional[List[Attachment]] = None - self._citations: Optional[List[ClientCitation]] = None - self._sensitivity_label: Optional[SensitivityUsageInfo] = None - self._enable_feedback_loop = False - self._feedback_loop_type: Optional[Literal["default", "custom"]] = None - self._enable_generated_by_ai_label = False - - # Set defaults based on channel - self._set_defaults(context) - - def queue_informative_update(self, text: str) -> None: - """ - Queues an informative update to be sent to the client. - - Informative updates do not contain the message content that the user will - read but rather an indication that the agent is processing the request. - - Args: - text: The informative text to send to the client. - """ - if self._cancelled: - return - - if self._ended: - raise RuntimeError(str(error_resources.StreamAlreadyEnded)) - - # Queue a typing activity - def create_activity(): - activity = Activity( - type="typing", - text=text, - entities=[ - Entity( - type="streaminfo", - stream_type="informative", - stream_sequence=self._sequence_number, - ) - ], - ) - self._sequence_number += 1 - return activity - - self._queue_activity(create_activity) - - def queue_text_chunk( - self, text: str, citations: Optional[List[Citation]] = None - ) -> None: - """ - Queues a chunk of partial message text to be sent to the client. - - The text will be sent as quickly as possible to the client. - Chunks may be combined before delivery to the client. - - Args: - text: Partial text of the message to send. - citations: Citations to be included in the message. - """ - if self._cancelled: - return - if self._ended: - raise RuntimeError(str(error_resources.StreamAlreadyEnded)) - - # Update full message text - self._message += text - - # If there are citations, modify the content so that the sources are numbers instead of [doc1], [doc2], etc. - self._message = CitationUtil.format_citations_response(self._message) - - # Queue the next chunk - self._queue_next_chunk() - - async def end_stream(self) -> None: - """ - Ends the stream by sending the final message to the client. - """ - if self._ended: - raise RuntimeError(str(error_resources.StreamAlreadyEnded)) - - # Queue final message - self._ended = True - self._queue_next_chunk() - - # Wait for the queue to drain - await self.wait_for_queue() - - def set_attachments(self, attachments: List[Attachment]) -> None: - """ - Sets the attachments to attach to the final chunk. - - Args: - attachments: List of attachments. - """ - self._attachments = attachments - - def set_sensitivity_label(self, sensitivity_label: SensitivityUsageInfo) -> None: - """ - Sets the sensitivity label to attach to the final chunk. - - Args: - sensitivity_label: The sensitivity label. - """ - self._sensitivity_label = sensitivity_label - - def set_citations(self, citations: List[Citation]) -> None: - """ - Sets the citations for the full message. - - Args: - citations: Citations to be included in the message. - """ - if citations: - if not self._citations: - self._citations = [] - - curr_pos = len(self._citations) - - for citation in citations: - client_citation = ClientCitation( - type="Claim", - position=curr_pos + 1, - appearance={ - "type": "DigitalDocument", - "name": citation.title or f"Document #{curr_pos + 1}", - "abstract": CitationUtil.snippet(citation.content, 477), - }, - ) - curr_pos += 1 - self._citations.append(client_citation) - - def set_feedback_loop(self, enable_feedback_loop: bool) -> None: - """ - Sets the Feedback Loop in Teams that allows a user to - give thumbs up or down to a response. - Default is False. - - Args: - enable_feedback_loop: If true, the feedback loop is enabled. - """ - self._enable_feedback_loop = enable_feedback_loop - - def set_feedback_loop_type( - self, feedback_loop_type: Literal["default", "custom"] - ) -> None: - """ - Sets the type of UI to use for the feedback loop. - - Args: - feedback_loop_type: The type of the feedback loop. - """ - self._feedback_loop_type = feedback_loop_type - - def set_generated_by_ai_label(self, enable_generated_by_ai_label: bool) -> None: - """ - Sets the Generated by AI label in Teams. - Default is False. - - Args: - enable_generated_by_ai_label: If true, the label is added. - """ - self._enable_generated_by_ai_label = enable_generated_by_ai_label - - def get_message(self) -> str: - """ - Returns the most recently streamed message. - """ - return self._message - - async def wait_for_queue(self) -> None: - """ - Waits for the outgoing activity queue to be empty. - """ - if self._queue_sync: - await self._queue_sync - - def _set_defaults(self, context: "TurnContext"): - if context.activity.channel_id == Channels.ms_teams: - self._is_streaming_channel = True - self._interval = 1.0 - elif context.activity.channel_id == Channels.direct_line: - self._is_streaming_channel = True - self._interval = 0.5 - elif context.activity.delivery_mode == DeliveryModes.stream: - self._is_streaming_channel = True - self._interval = 0.1 - - self._channel_id = context.activity.channel_id - - def _queue_next_chunk(self) -> None: - """ - Queues the next chunk of text to be sent to the client. - """ - # Are we already waiting to send a chunk? - if self._chunk_queued: - return - - # Queue a chunk of text to be sent - self._chunk_queued = True - - def create_activity(): - self._chunk_queued = False - if self._ended: - # Send final message - activity = Activity( - type="message", - text=self._message or "end stream response", - attachments=self._attachments or [], - entities=[ - Entity( - type="streaminfo", - stream_type="final", - stream_sequence=self._sequence_number, - ) - ], - ) - elif self._is_streaming_channel: - # Send typing activity - activity = Activity( - type="typing", - text=self._message, - entities=[ - Entity( - type="streaminfo", - stream_type="streaming", - stream_sequence=self._sequence_number, - ) - ], - ) - else: - return - self._sequence_number += 1 - return activity - - self._queue_activity(create_activity) - - def _queue_activity(self, factory: Callable[[], Activity]) -> None: - """ - Queues an activity to be sent to the client. - """ - self._queue.append(factory) - - # If there's no sync in progress, start one - if not self._queue_sync: - self._queue_sync = asyncio.create_task(self._drain_queue()) - - async def _drain_queue(self) -> None: - """ - Sends any queued activities to the client until the queue is empty. - """ - try: - logger.debug(f"Draining queue with {len(self._queue)} activities.") - while self._queue: - factory = self._queue.pop(0) - activity = factory() - if activity: - await self._send_activity(activity) - except Exception as err: - if ( - "403" in str(err) - and self._context.activity.channel_id == Channels.ms_teams - ): - logger.warning("Teams channel stopped the stream.") - self._cancelled = True - else: - logger.error( - f"Error occurred when sending activity while streaming: {type(err).__name__}" - ) - raise - finally: - self._queue_sync = None - - async def _send_activity(self, activity: Activity) -> None: - """ - Sends an activity to the client and saves the stream ID returned. - - Args: - activity: The activity to send. - """ - - streaminfo_entity = None - - if not activity.entities: - streaminfo_entity = Entity(type="streaminfo") - activity.entities = [streaminfo_entity] - else: - for entity in activity.entities: - if hasattr(entity, "type") and entity.type == "streaminfo": - streaminfo_entity = entity - break - - if not streaminfo_entity: - # If no streaminfo entity exists, create one - streaminfo_entity = Entity(type="streaminfo") - activity.entities.append(streaminfo_entity) - - # Set activity ID to the assigned stream ID - if self._stream_id: - activity.id = self._stream_id - streaminfo_entity.stream_id = self._stream_id - - if self._citations and not self._ended: - # Filter out the citations unused in content. - curr_citations = CitationUtil.get_used_citations( - self._message, self._citations - ) - if curr_citations: - activity.entities.append( - Entity( - type="https://schema.org/Message", - schema_type="Message", - context="https://schema.org", - id="", - citation=curr_citations, - ) - ) - - # Add in Powered by AI feature flags - if self._ended: - if self._enable_feedback_loop and self._feedback_loop_type: - # Add feedback loop to streaminfo entity - streaminfo_entity.feedback_loop = {"type": self._feedback_loop_type} - else: - # Add feedback loop enabled to streaminfo entity - streaminfo_entity.feedback_loop_enabled = self._enable_feedback_loop - # Add in Generated by AI - if self._enable_generated_by_ai_label: - activity.add_ai_metadata(self._citations, self._sensitivity_label) - - # Send activity - response = await self._context.send_activity(activity) - await asyncio.sleep(self._interval) - - # Save assigned stream ID - if not self._stream_id and response: - self._stream_id = response.id diff --git a/tests/hosting_core/app/streaming/__init__.py b/tests/hosting_core/app/streaming/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/hosting_core/app/streaming/test_streaming_response.py b/tests/hosting_core/app/streaming/test_streaming_response.py new file mode 100644 index 00000000..7c3630a5 --- /dev/null +++ b/tests/hosting_core/app/streaming/test_streaming_response.py @@ -0,0 +1,521 @@ +""" +Copyright (c) Microsoft Corporation. All rights reserved. +Licensed under the MIT License. +""" + +import asyncio + +import pytest + +from microsoft_agents.activity import ( + Activity, + ChannelId, + Channels, + DeliveryModes, + ResourceResponse, +) +from microsoft_agents.hosting.core.app.streaming.citation import Citation +from microsoft_agents.hosting.core.app.streaming.streaming_response import ( + StreamingResponse, +) +from microsoft_agents.hosting.core.turn_context import TurnContext + +STREAMING_CHANNELS = [Channels.webchat, Channels.ms_teams, Channels.direct_line] +NON_STREAMING_CHANNELS = [Channels.test, Channels.slack, Channels.email] + + +@pytest.fixture(name="non_streaming_channel", params=NON_STREAMING_CHANNELS) +def fixture_non_streaming_channel(request) -> ChannelId: + return ChannelId(channel=request.param) + + +@pytest.fixture(name="streaming_channel", params=STREAMING_CHANNELS) +def fixture_streaming_channel(request) -> ChannelId: + return ChannelId(channel=request.param) + + +def _create_turn_context( + mocker, + *, + channel_id: ChannelId | Channels = Channels.webchat, + delivery_mode: str | None = DeliveryModes.stream, + return_value=None, +): + if isinstance(channel_id, Channels): + channel_id = ChannelId(channel=channel_id) + + context = mocker.MagicMock(spec=TurnContext) + activity = mocker.MagicMock(spec=Activity) + activity.channel_id = channel_id + activity.delivery_mode = delivery_mode + activity.is_agentic_request.return_value = False + context.activity = activity + if isinstance(return_value, list) and len(return_value) > 0: + context.send_activity = mocker.AsyncMock(side_effect=return_value) + else: + context.send_activity = mocker.AsyncMock(return_value=return_value) + return context + + +@pytest.mark.asyncio +async def test_queue_informative_update_is_ignored_for_non_streaming_channel(mocker): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.normal, + channel_id=Channels.slack, + ) + response = StreamingResponse(context) + + response.queue_informative_update("working") + await response.wait_for_queue() + + context.send_activity.assert_not_called() + + +@pytest.mark.asyncio +async def test_queue_text_chunk_and_end_stream_send_streaming_then_final_message( + mocker, +): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.stream, + return_value=ResourceResponse(id="stream-1"), + ) + response = StreamingResponse(context) + + response.queue_text_chunk("Hello [doc1]") + await response.end_stream() + + assert context.send_activity.await_count == 1 + + final_activity = context.send_activity.await_args_list[0].args[0] + + assert final_activity.type == "message" + assert final_activity.id != "" + assert final_activity.text == "Hello [1]" + assert final_activity.entities[0].stream_type == "final" + + +@pytest.mark.asyncio +async def test_multiple_queued_text_chunks_are_coalesced_into_one_final_activity( + mocker, +): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.stream, + return_value=ResourceResponse(id="stream-1"), + ) + response = StreamingResponse(context) + + response.queue_text_chunk("Hello") + response.queue_text_chunk(" ") + response.queue_text_chunk("world") + await response.end_stream() + + assert context.send_activity.await_count == 1 + final_activity = context.send_activity.await_args_list[0].args[0] + assert final_activity.text == "Hello world" + assert final_activity.entities[0].stream_type == "final" + + +@pytest.mark.asyncio +async def test_set_citations_only_sends_final_when_end_stream_happens_before_drain( + mocker, +): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.stream, + return_value=[ResourceResponse(id="stream-2")], + ) + response = StreamingResponse(context) + response.set_citations( + [ + Citation(content="Document one content", title="Doc One"), + Citation(content="Document two content", title="Doc Two"), + ] + ) + + response.queue_text_chunk("Answer with citation [1].") + await response.end_stream() + + assert context.send_activity.await_count == 1 + final_activity = context.send_activity.await_args_list[0].args[0] + + citation_entities = [ + entity + for entity in final_activity.entities + if getattr(entity, "schema_type", None) == "Message" + ] + assert len(citation_entities) == 0 + + +@pytest.mark.asyncio +async def test_set_citations_adds_only_used_citations_when_streaming_activity_is_sent( + mocker, +): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.stream, + return_value=[ResourceResponse(id="stream-2"), ResourceResponse(id="stream-2")], + ) + response = StreamingResponse(context) + response.set_citations( + [ + Citation(content="Document one content", title="Doc One"), + Citation(content="Document two content", title="Doc Two"), + ] + ) + + response.queue_text_chunk("Answer with citation [1].") + await response.wait_for_queue() + await response.end_stream() + + assert context.send_activity.await_count == 2 + streaming_activity = context.send_activity.await_args_list[0].args[0] + + citation_entities = [ + entity + for entity in streaming_activity.entities + if getattr(entity, "schema_type", None) == "Message" + ] + assert len(citation_entities) == 1 + assert len(citation_entities[0].citation) == 1 + assert citation_entities[0].citation[0].position == 1 + + +@pytest.mark.asyncio +async def test_end_stream_cannot_be_called_twice(mocker): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.stream, + return_value=[ResourceResponse(id="stream-3")], + ) + response = StreamingResponse(context) + + response.queue_text_chunk("Done") + await response.end_stream() + + with pytest.raises(RuntimeError, match="already ended"): + await response.end_stream() + + +@pytest.mark.asyncio +async def test_teams_403_marks_stream_as_cancelled_and_future_chunks_are_ignored( + mocker, +): + + context = _create_turn_context( + mocker, + channel_id=Channels.ms_teams, + return_value=[ + RuntimeError("403 Forbidden: Stream cancelled by user"), + ResourceResponse(id="stream-4"), + ], + ) + + response = StreamingResponse(context) + + response.queue_text_chunk("first") + await response.wait_for_queue() + + response.queue_text_chunk(" second") + await response.wait_for_queue() + + assert context.send_activity.await_count == 1 + assert response.get_message() == "first" + + +@pytest.mark.asyncio +async def test_feedback_loop_type_added_to_final_streaminfo_entity(mocker): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.stream, + return_value=[ResourceResponse(id="stream-4")], + ) + response = StreamingResponse(context) + response.set_feedback_loop(True) + response.set_feedback_loop_type("custom") + + response.queue_text_chunk("feedback") + await response.end_stream() + + final_activity = context.send_activity.await_args_list[-1].args[0] + stream_info = next( + entity for entity in final_activity.entities if entity.type == "streaminfo" + ) + + assert stream_info.feedback_loop == {"type": "custom"} + + +@pytest.mark.asyncio +async def test_generated_by_ai_label_adds_ai_entity_on_final_message(mocker): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.stream, + return_value=[ResourceResponse(id="stream-5")], + ) + response = StreamingResponse(context) + response.set_citations([Citation(content="Document one content", title="Doc One")]) + response.set_generated_by_ai_label(True) + + response.queue_text_chunk("See [1]") + await response.end_stream() + + final_activity = context.send_activity.await_args_list[-1].args[0] + ai_entities = [ + entity + for entity in final_activity.entities + if "AIGeneratedContent" in (getattr(entity, "additional_type", None) or []) + ] + + assert len(ai_entities) == 1 + + +@pytest.mark.asyncio +async def test_streaming_operations_with_sleeps_send_informative_and_text_updates( + mocker, +): + context = _create_turn_context( + mocker, + channel_id=Channels.test, + delivery_mode=DeliveryModes.stream, + return_value=[ + ResourceResponse(id="stream-10"), + ResourceResponse(id="stream-10"), + ResourceResponse(id="stream-10"), + ResourceResponse(id="stream-10"), + ], + ) + response = StreamingResponse(context) + + response.queue_informative_update("Searching documents") + await asyncio.sleep(0.2) + + response.queue_text_chunk("Hello") + await asyncio.sleep(0.2) + + response.queue_text_chunk(" world") + await asyncio.sleep(0.2) + + await response.end_stream() + + assert context.send_activity.await_count == 4 + + sent_activities = [call.args[0] for call in context.send_activity.await_args_list] + stream_types = [ + next( + entity for entity in activity.entities if entity.type == "streaminfo" + ).stream_type + for activity in sent_activities + ] + sent_types = [activity.type for activity in sent_activities] + + assert sent_types == ["typing", "typing", "typing", "message"] + assert stream_types == ["informative", "streaming", "streaming", "final"] + assert sent_activities[-1].text == "Hello world" + + +@pytest.mark.asyncio +async def test_streaming_loop_with_sleep_emits_informative_and_streaming_updates( + mocker, +): + context = _create_turn_context( + mocker, + channel_id=Channels.test, + delivery_mode=DeliveryModes.stream, + return_value=[ + ResourceResponse(id="stream-11"), + ResourceResponse(id="stream-11"), + ResourceResponse(id="stream-11"), + ResourceResponse(id="stream-11"), + ResourceResponse(id="stream-11"), + ResourceResponse(id="stream-11"), + ResourceResponse(id="stream-11"), + ], + ) + response = StreamingResponse(context) + + updates = [ + ("Thinking", "Alpha "), + ("Drafting", "Beta "), + ("Finalizing", "Gamma"), + ] + + for informative_text, chunk in updates: + response.queue_informative_update(informative_text) + response.queue_text_chunk(chunk) + await asyncio.sleep(0.3) + + await response.end_stream() + + sent_activities = [call.args[0] for call in context.send_activity.await_args_list] + stream_types = [ + next( + entity for entity in activity.entities if entity.type == "streaminfo" + ).stream_type + for activity in sent_activities + ] + + assert stream_types.count("informative") == 3 + assert stream_types.count("streaming") == 3 + assert stream_types[-1] == "final" + assert sent_activities[-1].type == "message" + assert sent_activities[-1].text == "Alpha Beta Gamma" + + +class TestStreamingResponseNonStreamingChannel: + + @pytest.mark.asyncio + async def test_queue_text_chunk_and_end_stream_send_final_message( + self, mocker, non_streaming_channel + ): + context = _create_turn_context(mocker, channel_id=non_streaming_channel) + response = StreamingResponse(context) + + response.queue_text_chunk("Hello") + await response.end_stream() + + assert context.send_activity.await_count == 1 + final_activity = context.send_activity.await_args_list[0].args[0] + + assert final_activity.type == "message" + assert final_activity.text == "Hello" + assert final_activity.entities[0].stream_type == "final" + + +@pytest.mark.asyncio +async def test_wait_for_queue_is_noop_when_nothing_was_queued(mocker): + context = _create_turn_context(mocker, delivery_mode=DeliveryModes.stream) + response = StreamingResponse(context) + + await response.wait_for_queue() + + context.send_activity.assert_not_called() + + +@pytest.mark.asyncio +async def test_end_stream_without_chunks_sends_default_final_text(mocker): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.stream, + return_value=ResourceResponse(id="stream-1"), + ) + response = StreamingResponse(context) + + await response.end_stream() + + context.send_activity.assert_awaited_once() + sent = context.send_activity.await_args.args[0] + assert sent.type == "message" + assert sent.text == "end stream response" + + +@pytest.mark.asyncio +async def test_non_streaming_channel_buffers_text_and_only_sends_on_end(mocker): + context = _create_turn_context( + mocker, + channel_id=Channels.emulator, # non-streaming branch + delivery_mode=DeliveryModes.normal, + return_value=ResourceResponse(id="final-1"), + ) + response = StreamingResponse(context) + + response.queue_text_chunk("Hello") + response.queue_text_chunk(" world") + + await asyncio.sleep(1) + + # Should not send partial updates on non-streaming channels + context.send_activity.assert_not_called() + + await response.end_stream() + + context.send_activity.assert_awaited_once() + sent = context.send_activity.await_args.args[0] + assert sent.type == "message" + assert sent.text == "Hello world" + + +@pytest.mark.asyncio +async def test_queue_informative_update_is_noop_on_non_streaming_channel(mocker): + context = _create_turn_context( + mocker, + channel_id=Channels.emulator, # non-streaming + delivery_mode=None, + ) + response = StreamingResponse(context) + + response.queue_informative_update("Working...") + await response.wait_for_queue() + + context.send_activity.assert_not_called() + + +@pytest.mark.asyncio +async def test_public_methods_raise_after_end_stream(mocker): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.stream, + return_value=ResourceResponse(id="stream-2"), + ) + response = StreamingResponse(context) + + response.queue_text_chunk("done") + await response.end_stream() + + with pytest.raises(RuntimeError): + response.queue_text_chunk("extra") + + with pytest.raises(RuntimeError): + response.queue_informative_update("extra") + + with pytest.raises(RuntimeError): + await response.end_stream() + + +@pytest.mark.asyncio +async def test_queue_text_chunk_citations_argument_is_currently_ignored(mocker): + context = _create_turn_context( + mocker, + delivery_mode=DeliveryModes.stream, + return_value=ResourceResponse(id="stream-3"), + ) + response = StreamingResponse(context) + + response.queue_text_chunk( + "Answer with [doc1]", + citations=[Citation(title="Doc 1", content="Citation content")], + ) + await response.wait_for_queue() + + sent = context.send_activity.await_args.args[0] + entity_types = [getattr(e, "type", None) for e in (sent.entities or [])] + + # streaminfo should exist; schema.org citation entity should not + assert "streaminfo" in entity_types + assert "https://schema.org/Message" not in entity_types + + +@pytest.mark.asyncio +async def test_feedback_loop_type_without_enable_does_not_emit_feedback_loop_object( + mocker, +): + context = _create_turn_context( + mocker, + channel_id=Channels.ms_teams, + return_value=ResourceResponse(id="teams-1"), + ) + response = StreamingResponse(context) + response._interval = 0 + + response.set_feedback_loop_type("custom") + response.queue_text_chunk("hello") + await response.end_stream() + + sent = context.send_activity.await_args_list[-1].args[0] + streaminfo = next( + e for e in sent.entities if getattr(e, "type", None) == "streaminfo" + ) + + assert not hasattr(streaminfo, "feedback_loop") + assert getattr(streaminfo, "feedback_loop_enabled", None) is False