diff --git a/slack_sdk/web/async_chat_stream.py b/slack_sdk/web/async_chat_stream.py index 4661f19dd..9785d123e 100644 --- a/slack_sdk/web/async_chat_stream.py +++ b/slack_sdk/web/async_chat_stream.py @@ -10,10 +10,11 @@ import json import logging -from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union import slack_sdk.errors as e from slack_sdk.models.blocks.blocks import Block +from slack_sdk.models.messages.chunk import Chunk, MarkdownTextChunk from slack_sdk.models.metadata import Metadata from slack_sdk.web.async_slack_response import AsyncSlackResponse @@ -75,7 +76,8 @@ def __init__( async def append( self, *, - markdown_text: str, + markdown_text: Optional[str] = None, + chunks: Optional[Sequence[Chunk]] = None, **kwargs, ) -> Optional[AsyncSlackResponse]: """Append to the stream. @@ -84,6 +86,7 @@ async def append( is stopped this method cannot be called. Args: + chunks: An array of streaming chunks that can contain either markdown text or task updates. markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far. **kwargs: Additional arguments passed to the underlying API calls. @@ -111,9 +114,10 @@ async def append( raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}") if kwargs.get("token"): self._token = kwargs.pop("token") - self._buffer += markdown_text - if len(self._buffer) >= self._buffer_size: - return await self._flush_buffer(**kwargs) + if markdown_text is not None: + self._buffer += markdown_text + if len(self._buffer) >= self._buffer_size or chunks is not None: + return await self._flush_buffer(chunks=chunks, **kwargs) details = { "buffer_length": len(self._buffer), "buffer_size": self._buffer_size, @@ -129,6 +133,7 @@ async def stop( self, *, markdown_text: Optional[str] = None, + chunks: Optional[Sequence[Chunk]] = None, blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None, metadata: Optional[Union[Dict, Metadata]] = None, **kwargs, @@ -137,6 +142,7 @@ async def stop( Args: blocks: A list of blocks that will be rendered at the bottom of the finalized message. + chunks: An array of streaming chunks that can contain either markdown text or task updates. markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far. metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you @@ -177,26 +183,36 @@ async def stop( raise e.SlackRequestError("Failed to stop stream: stream not started") self._stream_ts = str(response["ts"]) self._state = "in_progress" + flushings: List[Chunk] = [] + if len(self._buffer) != 0: + flushings.append(MarkdownTextChunk(text=self._buffer)) + if chunks is not None: + flushings.extend(chunks) response = await self._client.chat_stopStream( token=self._token, channel=self._stream_args["channel"], ts=self._stream_ts, blocks=blocks, - markdown_text=self._buffer, + chunks=flushings, metadata=metadata, **kwargs, ) self._state = "completed" return response - async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse: - """Flush the internal buffer by making appropriate API calls.""" + async def _flush_buffer(self, chunks: Optional[Sequence[Chunk]] = None, **kwargs) -> AsyncSlackResponse: + """Flush the internal buffer with chunks by making appropriate API calls.""" + flushings: List[Chunk] = [] + if len(self._buffer) != 0: + flushings.append(MarkdownTextChunk(text=self._buffer)) + if chunks is not None: + flushings.extend(chunks) if not self._stream_ts: response = await self._client.chat_startStream( **self._stream_args, token=self._token, **kwargs, - markdown_text=self._buffer, + chunks=flushings, ) self._stream_ts = response.get("ts") self._state = "in_progress" @@ -206,7 +222,7 @@ async def _flush_buffer(self, **kwargs) -> AsyncSlackResponse: channel=self._stream_args["channel"], ts=self._stream_ts, **kwargs, - markdown_text=self._buffer, + chunks=flushings, ) self._buffer = "" return response diff --git a/slack_sdk/web/async_client.py b/slack_sdk/web/async_client.py index 0a9f702b9..5aaa2c610 100644 --- a/slack_sdk/web/async_client.py +++ b/slack_sdk/web/async_client.py @@ -2631,7 +2631,7 @@ async def chat_appendStream( *, channel: str, ts: str, - markdown_text: str, + markdown_text: Optional[str] = None, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs, ) -> AsyncSlackResponse: diff --git a/slack_sdk/web/chat_stream.py b/slack_sdk/web/chat_stream.py index 1a379c9cb..acdac728a 100644 --- a/slack_sdk/web/chat_stream.py +++ b/slack_sdk/web/chat_stream.py @@ -1,9 +1,10 @@ import json import logging -from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union import slack_sdk.errors as e from slack_sdk.models.blocks.blocks import Block +from slack_sdk.models.messages.chunk import Chunk, MarkdownTextChunk from slack_sdk.models.metadata import Metadata from slack_sdk.web.slack_response import SlackResponse @@ -65,7 +66,8 @@ def __init__( def append( self, *, - markdown_text: str, + markdown_text: Optional[str] = None, + chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs, ) -> Optional[SlackResponse]: """Append to the stream. @@ -74,6 +76,7 @@ def append( is stopped this method cannot be called. Args: + chunks: An array of streaming chunks that can contain either markdown text or task updates. markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far. **kwargs: Additional arguments passed to the underlying API calls. @@ -101,9 +104,10 @@ def append( raise e.SlackRequestError(f"Cannot append to stream: stream state is {self._state}") if kwargs.get("token"): self._token = kwargs.pop("token") - self._buffer += markdown_text - if len(self._buffer) >= self._buffer_size: - return self._flush_buffer(**kwargs) + if markdown_text is not None: + self._buffer += markdown_text + if len(self._buffer) >= self._buffer_size or chunks is not None: + return self._flush_buffer(chunks=chunks, **kwargs) details = { "buffer_length": len(self._buffer), "buffer_size": self._buffer_size, @@ -119,6 +123,7 @@ def stop( self, *, markdown_text: Optional[str] = None, + chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, blocks: Optional[Union[str, Sequence[Union[Dict, Block]]]] = None, metadata: Optional[Union[Dict, Metadata]] = None, **kwargs, @@ -127,6 +132,7 @@ def stop( Args: blocks: A list of blocks that will be rendered at the bottom of the finalized message. + chunks: An array of streaming chunks that can contain either markdown text or task updates. markdown_text: Accepts message text formatted in markdown. Limit this field to 12,000 characters. This text is what will be appended to the message received so far. metadata: JSON object with event_type and event_payload fields, presented as a URL-encoded string. Metadata you @@ -167,26 +173,36 @@ def stop( raise e.SlackRequestError("Failed to stop stream: stream not started") self._stream_ts = str(response["ts"]) self._state = "in_progress" + flushings: List[Union[Dict, Chunk]] = [] + if len(self._buffer) != 0: + flushings.append(MarkdownTextChunk(text=self._buffer)) + if chunks is not None: + flushings.extend(chunks) response = self._client.chat_stopStream( token=self._token, channel=self._stream_args["channel"], ts=self._stream_ts, blocks=blocks, - markdown_text=self._buffer, + chunks=flushings, metadata=metadata, **kwargs, ) self._state = "completed" return response - def _flush_buffer(self, **kwargs) -> SlackResponse: - """Flush the internal buffer by making appropriate API calls.""" + def _flush_buffer(self, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs) -> SlackResponse: + """Flush the internal buffer with chunks by making appropriate API calls.""" + flushings: List[Union[Dict, Chunk]] = [] + if len(self._buffer) != 0: + flushings.append(MarkdownTextChunk(text=self._buffer)) + if chunks is not None: + flushings.extend(chunks) if not self._stream_ts: response = self._client.chat_startStream( **self._stream_args, token=self._token, **kwargs, - markdown_text=self._buffer, + chunks=flushings, ) self._stream_ts = response.get("ts") self._state = "in_progress" @@ -196,7 +212,7 @@ def _flush_buffer(self, **kwargs) -> SlackResponse: channel=self._stream_args["channel"], ts=self._stream_ts, **kwargs, - markdown_text=self._buffer, + chunks=flushings, ) self._buffer = "" return response diff --git a/slack_sdk/web/client.py b/slack_sdk/web/client.py index 1a70681a4..392a261ad 100644 --- a/slack_sdk/web/client.py +++ b/slack_sdk/web/client.py @@ -2621,7 +2621,7 @@ def chat_appendStream( *, channel: str, ts: str, - markdown_text: str, + markdown_text: Optional[str] = None, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs, ) -> SlackResponse: diff --git a/slack_sdk/web/legacy_client.py b/slack_sdk/web/legacy_client.py index f11bbc495..7bb0609c5 100644 --- a/slack_sdk/web/legacy_client.py +++ b/slack_sdk/web/legacy_client.py @@ -2632,7 +2632,7 @@ def chat_appendStream( *, channel: str, ts: str, - markdown_text: str, + markdown_text: Optional[str] = None, chunks: Optional[Sequence[Union[Dict, Chunk]]] = None, **kwargs, ) -> Union[Future, SlackResponse]: diff --git a/tests/slack_sdk/web/test_chat_stream.py b/tests/slack_sdk/web/test_chat_stream.py index 75c13c8c2..a6d846769 100644 --- a/tests/slack_sdk/web/test_chat_stream.py +++ b/tests/slack_sdk/web/test_chat_stream.py @@ -7,6 +7,7 @@ from slack_sdk.models.blocks.basic_components import FeedbackButtonObject from slack_sdk.models.blocks.block_elements import FeedbackButtonsElement, IconButtonElement from slack_sdk.models.blocks.blocks import ContextActionsBlock +from slack_sdk.models.messages.chunk import MarkdownTextChunk, TaskUpdateChunk from tests.mock_web_api_server import cleanup_mock_web_api_server, setup_mock_web_api_server from tests.slack_sdk.web.mock_web_api_handler import MockHandler @@ -105,7 +106,10 @@ def test_streams_a_short_message(self): stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {}) self.assertEqual(stop_request.get("channel"), "C0123456789") self.assertEqual(stop_request.get("ts"), "123.123") - self.assertEqual(stop_request.get("markdown_text"), "nice!") + self.assertEqual( + json.dumps(stop_request.get("chunks")), + '[{"text": "nice!", "type": "markdown_text"}]', + ) def test_streams_a_long_message(self): streamer = self.client.chat_stream( @@ -146,13 +150,19 @@ def test_streams_a_long_message(self): start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {}) self.assertEqual(start_request.get("channel"), "C0123456789") self.assertEqual(start_request.get("thread_ts"), "123.000") - self.assertEqual(start_request.get("markdown_text"), "**this messag") + self.assertEqual( + json.dumps(start_request.get("chunks")), + '[{"text": "**this messag", "type": "markdown_text"}]', + ) self.assertEqual(start_request.get("recipient_team_id"), "T0123456789") self.assertEqual(start_request.get("recipient_user_id"), "U0123456789") append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {}) self.assertEqual(append_request.get("channel"), "C0123456789") - self.assertEqual(append_request.get("markdown_text"), "e is bold!") + self.assertEqual( + json.dumps(append_request.get("chunks")), + '[{"text": "e is bold!", "type": "markdown_text"}]', + ) self.assertEqual(append_request.get("token"), "xoxb-chat_stream_test_token1") self.assertEqual(append_request.get("ts"), "123.123") @@ -162,10 +172,74 @@ def test_streams_a_long_message(self): '[{"elements": [{"negative_button": {"text": {"emoji": true, "text": "bad", "type": "plain_text"}, "value": "-1"}, "positive_button": {"text": {"emoji": true, "text": "good", "type": "plain_text"}, "value": "+1"}, "type": "feedback_buttons"}, {"icon": "trash", "text": {"emoji": true, "text": "delete", "type": "plain_text"}, "type": "icon_button"}], "type": "context_actions"}]', ) self.assertEqual(stop_request.get("channel"), "C0123456789") - self.assertEqual(stop_request.get("markdown_text"), "**") + self.assertEqual( + json.dumps(stop_request.get("chunks")), + '[{"text": "**", "type": "markdown_text"}]', + ) self.assertEqual(stop_request.get("token"), "xoxb-chat_stream_test_token2") self.assertEqual(stop_request.get("ts"), "123.123") + def test_streams_a_chunk_message(self): + streamer = self.client.chat_stream( + channel="C0123456789", + recipient_team_id="T0123456789", + recipient_user_id="U0123456789", + thread_ts="123.000", + ) + streamer.append(markdown_text="**this is ") + streamer.append(markdown_text="buffered**") + streamer.append( + chunks=[ + TaskUpdateChunk( + id="001", + title="Counting...", + status="pending", + ), + ], + ) + streamer.append( + chunks=[ + MarkdownTextChunk(text="**this is unbuffered**"), + ], + ) + streamer.append(markdown_text="\n") + streamer.stop( + chunks=[ + MarkdownTextChunk(text=":space_invader:"), + ], + ) + + self.assertEqual(self.received_requests.get("/chat.startStream", 0), 1) + self.assertEqual(self.received_requests.get("/chat.appendStream", 0), 1) + self.assertEqual(self.received_requests.get("/chat.stopStream", 0), 1) + + if hasattr(self.thread.server, "chat_stream_requests"): + start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {}) + self.assertEqual(start_request.get("channel"), "C0123456789") + self.assertEqual(start_request.get("thread_ts"), "123.000") + self.assertEqual( + json.dumps(start_request.get("chunks")), + '[{"text": "**this is buffered**", "type": "markdown_text"}, {"id": "001", "status": "pending", "title": "Counting...", "type": "task_update"}]', + ) + self.assertEqual(start_request.get("recipient_team_id"), "T0123456789") + self.assertEqual(start_request.get("recipient_user_id"), "U0123456789") + + append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {}) + self.assertEqual(append_request.get("channel"), "C0123456789") + self.assertEqual(append_request.get("ts"), "123.123") + self.assertEqual( + json.dumps(append_request.get("chunks")), + '[{"text": "**this is unbuffered**", "type": "markdown_text"}]', + ) + + stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {}) + self.assertEqual(stop_request.get("channel"), "C0123456789") + self.assertEqual(stop_request.get("ts"), "123.123") + self.assertEqual( + json.dumps(stop_request.get("chunks")), + '[{"text": "\\n", "type": "markdown_text"}, {"text": ":space_invader:", "type": "markdown_text"}]', + ) + def test_streams_errors_when_appending_to_an_unstarted_stream(self): streamer = self.client.chat_stream( channel="C0123456789", diff --git a/tests/slack_sdk_async/web/test_async_chat_stream.py b/tests/slack_sdk_async/web/test_async_chat_stream.py index 212fee1e2..2a4f5b931 100644 --- a/tests/slack_sdk_async/web/test_async_chat_stream.py +++ b/tests/slack_sdk_async/web/test_async_chat_stream.py @@ -6,6 +6,7 @@ from slack_sdk.models.blocks.basic_components import FeedbackButtonObject from slack_sdk.models.blocks.block_elements import FeedbackButtonsElement, IconButtonElement from slack_sdk.models.blocks.blocks import ContextActionsBlock +from slack_sdk.models.messages.chunk import MarkdownTextChunk, TaskUpdateChunk from slack_sdk.web.async_client import AsyncWebClient from tests.mock_web_api_server import cleanup_mock_web_api_server, setup_mock_web_api_server from tests.slack_sdk.web.mock_web_api_handler import MockHandler @@ -107,7 +108,10 @@ async def test_streams_a_short_message(self): stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {}) self.assertEqual(stop_request.get("channel"), "C0123456789") self.assertEqual(stop_request.get("ts"), "123.123") - self.assertEqual(stop_request.get("markdown_text"), "nice!") + self.assertEqual( + json.dumps(stop_request.get("chunks")), + '[{"text": "nice!", "type": "markdown_text"}]', + ) @async_test async def test_streams_a_long_message(self): @@ -149,13 +153,19 @@ async def test_streams_a_long_message(self): start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {}) self.assertEqual(start_request.get("channel"), "C0123456789") self.assertEqual(start_request.get("thread_ts"), "123.000") - self.assertEqual(start_request.get("markdown_text"), "**this messag") + self.assertEqual( + json.dumps(start_request.get("chunks")), + '[{"text": "**this messag", "type": "markdown_text"}]', + ) self.assertEqual(start_request.get("recipient_team_id"), "T0123456789") self.assertEqual(start_request.get("recipient_user_id"), "U0123456789") append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {}) self.assertEqual(append_request.get("channel"), "C0123456789") - self.assertEqual(append_request.get("markdown_text"), "e is bold!") + self.assertEqual( + json.dumps(append_request.get("chunks")), + '[{"text": "e is bold!", "type": "markdown_text"}]', + ) self.assertEqual(append_request.get("token"), "xoxb-chat_stream_test_token1") self.assertEqual(append_request.get("ts"), "123.123") @@ -165,10 +175,75 @@ async def test_streams_a_long_message(self): '[{"elements": [{"negative_button": {"text": {"emoji": true, "text": "bad", "type": "plain_text"}, "value": "-1"}, "positive_button": {"text": {"emoji": true, "text": "good", "type": "plain_text"}, "value": "+1"}, "type": "feedback_buttons"}, {"icon": "trash", "text": {"emoji": true, "text": "delete", "type": "plain_text"}, "type": "icon_button"}], "type": "context_actions"}]', ) self.assertEqual(stop_request.get("channel"), "C0123456789") - self.assertEqual(stop_request.get("markdown_text"), "**") + self.assertEqual( + json.dumps(stop_request.get("chunks")), + '[{"text": "**", "type": "markdown_text"}]', + ) self.assertEqual(stop_request.get("token"), "xoxb-chat_stream_test_token2") self.assertEqual(stop_request.get("ts"), "123.123") + @async_test + async def test_streams_a_chunk_message(self): + streamer = await self.client.chat_stream( + channel="C0123456789", + recipient_team_id="T0123456789", + recipient_user_id="U0123456789", + thread_ts="123.000", + ) + await streamer.append(markdown_text="**this is ") + await streamer.append(markdown_text="buffered**") + await streamer.append( + chunks=[ + TaskUpdateChunk( + id="001", + title="Counting...", + status="pending", + ), + ], + ) + await streamer.append( + chunks=[ + MarkdownTextChunk(text="**this is unbuffered**"), + ], + ) + await streamer.append(markdown_text="\n") + await streamer.stop( + chunks=[ + MarkdownTextChunk(text=":space_invader:"), + ], + ) + + self.assertEqual(self.received_requests.get("/chat.startStream", 0), 1) + self.assertEqual(self.received_requests.get("/chat.appendStream", 0), 1) + self.assertEqual(self.received_requests.get("/chat.stopStream", 0), 1) + + if hasattr(self.thread.server, "chat_stream_requests"): + start_request = self.thread.server.chat_stream_requests.get("/chat.startStream", {}) + self.assertEqual(start_request.get("channel"), "C0123456789") + self.assertEqual(start_request.get("thread_ts"), "123.000") + self.assertEqual( + json.dumps(start_request.get("chunks")), + '[{"text": "**this is buffered**", "type": "markdown_text"}, {"id": "001", "status": "pending", "title": "Counting...", "type": "task_update"}]', + ) + self.assertEqual(start_request.get("recipient_team_id"), "T0123456789") + self.assertEqual(start_request.get("recipient_user_id"), "U0123456789") + + append_request = self.thread.server.chat_stream_requests.get("/chat.appendStream", {}) + self.assertEqual(append_request.get("channel"), "C0123456789") + self.assertEqual(append_request.get("ts"), "123.123") + self.assertEqual( + json.dumps(append_request.get("chunks")), + '[{"text": "**this is unbuffered**", "type": "markdown_text"}]', + ) + + stop_request = self.thread.server.chat_stream_requests.get("/chat.stopStream", {}) + self.assertEqual(stop_request.get("channel"), "C0123456789") + self.assertEqual(stop_request.get("ts"), "123.123") + self.assertEqual( + json.dumps(stop_request.get("chunks")), + '[{"text": "\\n", "type": "markdown_text"}, {"text": ":space_invader:", "type": "markdown_text"}]', + ) + @async_test async def test_streams_errors_when_appending_to_an_unstarted_stream(self): streamer = await self.client.chat_stream(