From 2b0cf1085af263fef92f02bc6b2acd4c1193aa22 Mon Sep 17 00:00:00 2001 From: Frances Liu Date: Thu, 31 Oct 2024 20:35:31 -0700 Subject: [PATCH] Add Writer integration --- libs/community/extended_testing_deps.txt | 3 +- .../langchain_community/chat_models/writer.py | 317 ++++++++++++++++++ .../unit_tests/chat_models/test_writer.py | 303 +++++++++++++++++ 3 files changed, 622 insertions(+), 1 deletion(-) create mode 100644 libs/community/langchain_community/chat_models/writer.py create mode 100644 libs/community/tests/unit_tests/chat_models/test_writer.py diff --git a/libs/community/extended_testing_deps.txt b/libs/community/extended_testing_deps.txt index 56caca04381cf..1214645947abf 100644 --- a/libs/community/extended_testing_deps.txt +++ b/libs/community/extended_testing_deps.txt @@ -95,4 +95,5 @@ xmltodict>=0.13.0,<0.14 nanopq==0.2.1 mlflow[genai]>=2.14.0 databricks-sdk>=0.30.0 -websocket>=0.2.1,<1 \ No newline at end of file +websocket>=0.2.1,<1 +writer-sdk>=1.2.0 diff --git a/libs/community/langchain_community/chat_models/writer.py b/libs/community/langchain_community/chat_models/writer.py new file mode 100644 index 0000000000000..945b9d8b0b6d2 --- /dev/null +++ b/libs/community/langchain_community/chat_models/writer.py @@ -0,0 +1,317 @@ +"""Writer chat wrapper.""" + +from __future__ import annotations + +import logging +from typing import ( + Any, + AsyncIterator, + Callable, + Dict, + Iterator, + List, + Literal, + Mapping, + Optional, + Sequence, + Tuple, + Type, + Union, +) + +from langchain_core.callbacks import ( + AsyncCallbackManagerForLLMRun, + CallbackManagerForLLMRun, +) +from langchain_core.language_models import LanguageModelInput +from langchain_core.language_models.chat_models import ( + BaseChatModel, + agenerate_from_stream, + generate_from_stream, +) +from langchain_core.messages import ( + AIMessage, + AIMessageChunk, + BaseMessage, + ChatMessage, + HumanMessage, + SystemMessage, + ToolMessage, +) +from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult +from langchain_core.runnables import Runnable +from langchain_core.utils.function_calling import convert_to_openai_tool +from pydantic import BaseModel, ConfigDict, Field, SecretStr + +logger = logging.getLogger(__name__) + + +def _convert_message_to_dict(message: BaseMessage) -> dict: + """Convert a LangChain message to a Writer message dict.""" + message_dict = {"role": "", "content": message.content} + + if isinstance(message, ChatMessage): + message_dict["role"] = message.role + elif isinstance(message, HumanMessage): + message_dict["role"] = "user" + elif isinstance(message, AIMessage): + message_dict["role"] = "assistant" + if message.tool_calls: + message_dict["tool_calls"] = [ + { + "id": tool["id"], + "type": "function", + "function": {"name": tool["name"], "arguments": tool["args"]}, + } + for tool in message.tool_calls + ] + elif isinstance(message, SystemMessage): + message_dict["role"] = "system" + elif isinstance(message, ToolMessage): + message_dict["role"] = "tool" + message_dict["tool_call_id"] = message.tool_call_id + else: + raise ValueError(f"Got unknown message type: {type(message)}") + + if message.name: + message_dict["name"] = message.name + + return message_dict + + +def _convert_dict_to_message(response_dict: Dict[str, Any]) -> BaseMessage: + """Convert a Writer message dict to a LangChain message.""" + role = response_dict["role"] + content = response_dict.get("content", "") + + if role == "user": + return HumanMessage(content=content) + elif role == "assistant": + additional_kwargs = {} + if tool_calls := response_dict.get("tool_calls"): + additional_kwargs["tool_calls"] = tool_calls + return AIMessageChunk(content=content, additional_kwargs=additional_kwargs) + elif role == "system": + return SystemMessage(content=content) + elif role == "tool": + return ToolMessage( + content=content, + tool_call_id=response_dict["tool_call_id"], + name=response_dict.get("name"), + ) + else: + return ChatMessage(content=content, role=role) + + +class ChatWriter(BaseChatModel): + """Writer chat model. + + To use, you should have the ``writer-sdk`` Python package installed, and the + environment variable ``WRITER_API_KEY`` set with your API key. + + Example: + .. code-block:: python + + from langchain_community.chat_models import ChatWriter + + chat = ChatWriter(model="palmyra-x-004") + """ + + client: Any = Field(default=None, exclude=True) #: :meta private: + async_client: Any = Field(default=None, exclude=True) #: :meta private: + model_name: str = Field(default="palmyra-x-004", alias="model") + """Model name to use.""" + temperature: float = 0.7 + """What sampling temperature to use.""" + model_kwargs: Dict[str, Any] = Field(default_factory=dict) + """Holds any model parameters valid for `create` call not explicitly specified.""" + writer_api_key: Optional[SecretStr] = Field(default=None, alias="api_key") + """Writer API key.""" + writer_api_base: Optional[str] = Field(default=None, alias="base_url") + """Base URL for API requests.""" + streaming: bool = False + """Whether to stream the results or not.""" + n: int = 1 + """Number of chat completions to generate for each prompt.""" + max_tokens: Optional[int] = None + """Maximum number of tokens to generate.""" + + model_config = ConfigDict(populate_by_name=True) + + @property + def _llm_type(self) -> str: + """Return type of chat model.""" + return "writer-chat" + + @property + def _identifying_params(self) -> Dict[str, Any]: + """Get the identifying parameters.""" + return { + "model_name": self.model_name, + "temperature": self.temperature, + "streaming": self.streaming, + **self.model_kwargs, + } + + def _create_chat_result(self, response: Mapping[str, Any]) -> ChatResult: + generations = [] + for choice in response["choices"]: + message = _convert_dict_to_message(choice["message"]) + gen = ChatGeneration( + message=message, + generation_info=dict(finish_reason=choice.get("finish_reason")), + ) + generations.append(gen) + + token_usage = response.get("usage", {}) + llm_output = { + "token_usage": token_usage, + "model_name": self.model_name, + "system_fingerprint": response.get("system_fingerprint", ""), + } + + return ChatResult(generations=generations, llm_output=llm_output) + + def _convert_messages_to_dicts( + self, messages: List[BaseMessage], stop: Optional[List[str]] = None + ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: + params = { + "model": self.model_name, + "temperature": self.temperature, + "n": self.n, + "stream": self.streaming, + **self.model_kwargs, + } + if stop: + params["stop"] = stop + if self.max_tokens is not None: + params["max_tokens"] = self.max_tokens + + message_dicts = [_convert_message_to_dict(m) for m in messages] + return message_dicts, params + + def _stream( + self, + messages: List[BaseMessage], + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> Iterator[ChatGenerationChunk]: + message_dicts, params = self._convert_messages_to_dicts(messages, stop) + params = {**params, **kwargs, "stream": True} + + response = self.client.chat.chat(messages=message_dicts, **params) + + for chunk in response: + delta = chunk["choices"][0].get("delta") + if not delta or not delta.get("content"): + continue + chunk = _convert_dict_to_message( + {"role": "assistant", "content": delta["content"]} + ) + chunk = ChatGenerationChunk(message=chunk) + + if run_manager: + run_manager.on_llm_new_token(chunk.text) + + yield chunk + + async def _astream( + self, + messages: List[BaseMessage], + stop: Optional[List[str]] = None, + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> AsyncIterator[ChatGenerationChunk]: + message_dicts, params = self._convert_messages_to_dicts(messages, stop) + params = {**params, **kwargs, "stream": True} + + response = await self.async_client.chat.chat(messages=message_dicts, **params) + + async for chunk in response: + delta = chunk["choices"][0].get("delta") + if not delta or not delta.get("content"): + continue + chunk = _convert_dict_to_message( + {"role": "assistant", "content": delta["content"]} + ) + chunk = ChatGenerationChunk(message=chunk) + + if run_manager: + await run_manager.on_llm_new_token(chunk.text) + + yield chunk + + def _generate( + self, + messages: List[BaseMessage], + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> ChatResult: + if self.streaming: + return generate_from_stream( + self._stream(messages, stop, run_manager, **kwargs) + ) + + message_dicts, params = self._convert_messages_to_dicts(messages, stop) + params = {**params, **kwargs} + response = self.client.chat.chat(messages=message_dicts, **params) + return self._create_chat_result(response) + + async def _agenerate( + self, + messages: List[BaseMessage], + stop: Optional[List[str]] = None, + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> ChatResult: + if self.streaming: + return await agenerate_from_stream( + self._astream(messages, stop, run_manager, **kwargs) + ) + + message_dicts, params = self._convert_messages_to_dicts(messages, stop) + params = {**params, **kwargs} + response = await self.async_client.chat.chat(messages=message_dicts, **params) + return self._create_chat_result(response) + + @property + def _default_params(self) -> Dict[str, Any]: + """Get the default parameters for calling Writer API.""" + return { + "model": self.model_name, + "temperature": self.temperature, + "stream": self.streaming, + "n": self.n, + "max_tokens": self.max_tokens, + **self.model_kwargs, + } + + def bind_tools( + self, + tools: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable]], + *, + tool_choice: Optional[Union[str, Literal["auto", "none"]]] = None, + **kwargs: Any, + ) -> Runnable[LanguageModelInput, BaseMessage]: + """Bind tools to the chat model. + + Args: + tools: Tools to bind to the model + tool_choice: Which tool to require ('auto', 'none', or specific tool name) + **kwargs: Additional parameters to pass to the chat model + + Returns: + A runnable that will use the tools + """ + formatted_tools = [convert_to_openai_tool(tool) for tool in tools] + + if tool_choice: + kwargs["tool_choice"] = ( + (tool_choice) + if tool_choice in ("auto", "none") + else {"type": "function", "function": {"name": tool_choice}} + ) + + return super().bind(tools=formatted_tools, **kwargs) diff --git a/libs/community/tests/unit_tests/chat_models/test_writer.py b/libs/community/tests/unit_tests/chat_models/test_writer.py new file mode 100644 index 0000000000000..944a9dfeaba1f --- /dev/null +++ b/libs/community/tests/unit_tests/chat_models/test_writer.py @@ -0,0 +1,303 @@ +"""Unit tests for Writer chat model integration.""" + +import json +from typing import Any, Dict, List +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from langchain_core.callbacks.manager import CallbackManager +from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage +from pydantic import SecretStr + +from langchain_community.chat_models.writer import ChatWriter, _convert_dict_to_message +from tests.unit_tests.callbacks.fake_callback_handler import FakeCallbackHandler + + +class TestChatWriter: + def test_writer_model_param(self) -> None: + """Test different ways to initialize the chat model.""" + test_cases: List[dict] = [ + {"model_name": "palmyra-x-004", "writer_api_key": "test-key"}, + {"model": "palmyra-x-004", "writer_api_key": "test-key"}, + {"model_name": "palmyra-x-004", "writer_api_key": "test-key"}, + { + "model": "palmyra-x-004", + "writer_api_key": "test-key", + "temperature": 0.5, + }, + ] + + for case in test_cases: + chat = ChatWriter(**case) + assert chat.model_name == "palmyra-x-004" + assert chat.writer_api_key + assert chat.writer_api_key.get_secret_value() == "test-key" + assert chat.temperature == (0.5 if "temperature" in case else 0.7) + + def test_convert_dict_to_message_human(self) -> None: + """Test converting a human message dict to a LangChain message.""" + message = {"role": "user", "content": "Hello"} + result = _convert_dict_to_message(message) + assert isinstance(result, HumanMessage) + assert result.content == "Hello" + + def test_convert_dict_to_message_ai(self) -> None: + """Test converting an AI message dict to a LangChain message.""" + message = {"role": "assistant", "content": "Hello"} + result = _convert_dict_to_message(message) + assert isinstance(result, AIMessage) + assert result.content == "Hello" + + def test_convert_dict_to_message_system(self) -> None: + """Test converting a system message dict to a LangChain message.""" + message = {"role": "system", "content": "You are a helpful assistant"} + result = _convert_dict_to_message(message) + assert isinstance(result, SystemMessage) + assert result.content == "You are a helpful assistant" + + def test_convert_dict_to_message_tool_call(self) -> None: + """Test converting a tool call message dict to a LangChain message.""" + content = json.dumps({"result": 42}) + message = { + "role": "tool", + "name": "get_number", + "content": content, + "tool_call_id": "call_abc123", + } + result = _convert_dict_to_message(message) + assert isinstance(result, ToolMessage) + assert result.name == "get_number" + assert result.content == content + + def test_convert_dict_to_message_with_tool_calls(self) -> None: + """Test converting an AIMessage with tool calls.""" + message = { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_abc123", + "type": "function", + "function": { + "name": "get_weather", + "arguments": '{"location": "London"}', + }, + } + ], + } + result = _convert_dict_to_message(message) + assert isinstance(result, AIMessage) + assert result.tool_calls + assert len(result.tool_calls) == 1 + assert result.tool_calls[0]["name"] == "get_weather" + assert result.tool_calls[0]["args"]["location"] == "London" + + @pytest.fixture(autouse=True) + def mock_completion(self) -> Dict[str, Any]: + """Fixture providing a mock API response.""" + return { + "id": "chat-12345", + "object": "chat.completion", + "created": 1699000000, + "model": "palmyra-x-004", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "Hello! How can I help you?", + }, + "finish_reason": "stop", + } + ], + "usage": {"prompt_tokens": 10, "completion_tokens": 8, "total_tokens": 18}, + } + + @pytest.fixture(autouse=True) + def mock_response(self) -> Dict[str, Any]: + response = { + "id": "chat-12345", + "choices": [ + { + "message": { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_abc123", + "type": "function", + "function": { + "name": "GetWeather", + "arguments": '{"location": "London"}', + }, + } + ], + }, + "finish_reason": "tool_calls", + } + ], + } + return response + + @pytest.fixture(autouse=True) + def mock_streaming_chunks(self) -> List[Dict[str, Any]]: + """Fixture providing mock streaming response chunks.""" + return [ + { + "id": "chat-12345", + "object": "chat.completion.chunk", + "created": 1699000000, + "model": "palmyra-x-004", + "choices": [ + { + "index": 0, + "delta": { + "role": "assistant", + "content": "Hello", + }, + "finish_reason": None, + } + ], + }, + { + "id": "chat-12345", + "object": "chat.completion.chunk", + "created": 1699000000, + "model": "palmyra-x-004", + "choices": [ + { + "index": 0, + "delta": { + "content": "!", + }, + "finish_reason": "stop", + } + ], + }, + ] + + def test_sync_completion(self, mock_completion: Dict[str, Any]) -> None: + """Test basic chat completion with mocked response.""" + chat = ChatWriter(api_key=SecretStr("test-key")) + mock_client = MagicMock() + mock_client.chat.chat.return_value = mock_completion + + with patch.object(chat, "client", mock_client): + message = HumanMessage(content="Hi there!") + response = chat.invoke([message]) + assert isinstance(response, AIMessage) + assert response.content == "Hello! How can I help you?" + + async def test_async_completion(self, mock_completion: Dict[str, Any]) -> None: + """Test async chat completion with mocked response.""" + chat = ChatWriter(api_key=SecretStr("test-key")) + mock_client = AsyncMock() + mock_client.chat.chat.return_value = mock_completion + + with patch.object(chat, "async_client", mock_client): + message = HumanMessage(content="Hi there!") + response = await chat.ainvoke([message]) + assert isinstance(response, AIMessage) + assert response.content == "Hello! How can I help you?" + + def test_sync_streaming(self, mock_streaming_chunks: List[Dict[str, Any]]) -> None: + """Test sync streaming with callback handler.""" + callback_handler = FakeCallbackHandler() + callback_manager = CallbackManager([callback_handler]) + + chat = ChatWriter( + streaming=True, + callback_manager=callback_manager, + max_tokens=10, + api_key=SecretStr("test-key"), + ) + + mock_client = MagicMock() + mock_response = MagicMock() + mock_response.__iter__.return_value = mock_streaming_chunks + mock_client.chat.chat.return_value = mock_response + + with patch.object(chat, "client", mock_client): + message = HumanMessage(content="Hi") + response = chat.invoke([message]) + + assert isinstance(response, AIMessage) + assert callback_handler.llm_streams > 0 + assert response.content == "Hello!" + + async def test_async_streaming( + self, mock_streaming_chunks: List[Dict[str, Any]] + ) -> None: + """Test async streaming with callback handler.""" + callback_handler = FakeCallbackHandler() + callback_manager = CallbackManager([callback_handler]) + + chat = ChatWriter( + streaming=True, + callback_manager=callback_manager, + max_tokens=10, + api_key=SecretStr("test-key"), + ) + + mock_client = AsyncMock() + mock_response = AsyncMock() + mock_response.__aiter__.return_value = mock_streaming_chunks + mock_client.chat.chat.return_value = mock_response + + with patch.object(chat, "async_client", mock_client): + message = HumanMessage(content="Hi") + response = await chat.ainvoke([message]) + + assert isinstance(response, AIMessage) + assert callback_handler.llm_streams > 0 + assert response.content == "Hello!" + + def test_sync_tool_calling(self, mock_response: Dict[str, Any]) -> None: + """Test synchronous tool calling functionality.""" + from pydantic import BaseModel, Field + + class GetWeather(BaseModel): + """Get the weather in a location.""" + + location: str = Field(..., description="The location to get weather for") + + mock_client = MagicMock() + mock_client.chat.chat.return_value = mock_response + + chat = ChatWriter(api_key=SecretStr("test-key"), client=mock_client) + + chat_with_tools = chat.bind_tools( + tools=[GetWeather], + tool_choice="GetWeather", + ) + + response = chat_with_tools.invoke("What's the weather in London?") + assert isinstance(response, AIMessage) + assert response.tool_calls + assert response.tool_calls[0]["name"] == "GetWeather" + assert response.tool_calls[0]["args"]["location"] == "London" + + async def test_async_tool_calling(self, mock_response: Dict[str, Any]) -> None: + """Test asynchronous tool calling functionality.""" + from pydantic import BaseModel, Field + + class GetWeather(BaseModel): + """Get the weather in a location.""" + + location: str = Field(..., description="The location to get weather for") + + mock_client = AsyncMock() + mock_client.chat.chat.return_value = mock_response + + chat = ChatWriter(api_key=SecretStr("test-key"), async_client=mock_client) + + chat_with_tools = chat.bind_tools( + tools=[GetWeather], + tool_choice="GetWeather", + ) + + response = await chat_with_tools.ainvoke("What's the weather in London?") + assert isinstance(response, AIMessage) + assert response.tool_calls + assert response.tool_calls[0]["name"] == "GetWeather" + assert response.tool_calls[0]["args"]["location"] == "London"