From 50d6b5105fa588545c4b496b89682867b859f88d Mon Sep 17 00:00:00 2001 From: TomasLiu Date: Mon, 14 Oct 2024 10:46:00 +0800 Subject: [PATCH 01/10] vision is working. --- .../openai_chatgpt_python/extension.py | 86 ++++- .../extension/openai_chatgpt_python/openai.py | 67 ++++ .../extension/vision_tool_python/BUILD.gn | 21 ++ .../extension/vision_tool_python/README.md | 29 ++ .../extension/vision_tool_python/__init__.py | 11 + .../extension/vision_tool_python/addon.py | 22 ++ .../extension/vision_tool_python/extension.py | 294 ++++++++++++++++++ .../extension/vision_tool_python/log.py | 22 ++ .../vision_tool_python/manifest.json | 81 +++++ .../vision_tool_python/property.json | 1 + 10 files changed, 631 insertions(+), 3 deletions(-) create mode 100644 agents/ten_packages/extension/vision_tool_python/BUILD.gn create mode 100644 agents/ten_packages/extension/vision_tool_python/README.md create mode 100644 agents/ten_packages/extension/vision_tool_python/__init__.py create mode 100644 agents/ten_packages/extension/vision_tool_python/addon.py create mode 100644 agents/ten_packages/extension/vision_tool_python/extension.py create mode 100644 agents/ten_packages/extension/vision_tool_python/log.py create mode 100644 agents/ten_packages/extension/vision_tool_python/manifest.json create mode 100644 agents/ten_packages/extension/vision_tool_python/property.json diff --git a/agents/ten_packages/extension/openai_chatgpt_python/extension.py b/agents/ten_packages/extension/openai_chatgpt_python/extension.py index 4a5af8b853..802a28d03a 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/extension.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/extension.py @@ -10,6 +10,9 @@ import random import threading import traceback +import functools + +from typing import List, Callable from .helper import AsyncEventEmitter, AsyncQueue, get_current_time, get_property_bool, get_property_float, get_property_int, get_property_string, parse_sentences, rgb2base64jpeg from .openai import OpenAIChatGPT, OpenAIChatGPTConfig @@ -29,6 +32,7 @@ CMD_IN_ON_USER_JOINED = "on_user_joined" CMD_IN_ON_USER_LEFT = "on_user_left" CMD_OUT_FLUSH = "flush" +CMD_IN_CHAT_COMPLETION = "chat_completion" DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" @@ -52,7 +56,7 @@ TASK_TYPE_CHAT_COMPLETION = "chat_completion" TASK_TYPE_CHAT_COMPLETION_WITH_VISION = "chat_completion_with_vision" - +TASK_TYPE_CHAT_COMPLETION_CMD = "chat_completion_cmd" class OpenAIChatGPTExtension(Extension): memory = [] @@ -193,6 +197,24 @@ def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: elif cmd_name == CMD_IN_ON_USER_LEFT: self.users_count -= 1 status_code, detail = StatusCode.OK, "success" + elif cmd_name == CMD_IN_CHAT_COMPLETION: + m_str = cmd.get_property_string("messages") + messages = json.loads(m_str) + stream = False + is_json = False + try: + stream = cmd.get_property_bool("stream") + except: + pass + + try: + is_json = cmd.get_property_bool("json") + except: + pass + + asyncio.run_coroutine_threadsafe(self.queue.put( + [TASK_TYPE_CHAT_COMPLETION_CMD, (messages, stream, is_json, cmd)]), self.loop) + return else: logger.info(f"on_cmd unknown cmd: {cmd_name}") status_code, detail = StatusCode.ERROR, "unknown cmd" @@ -237,11 +259,30 @@ async def _process_queue(self, ten_env: TenEnv): [task_type, message] = await self.queue.get() try: # Create a new task for the new message - self.current_task = asyncio.create_task( - self._run_chatflow(ten_env, task_type, message, self.memory)) + if task_type == TASK_TYPE_CHAT_COMPLETION_CMD: + logger.info("on chat completion cmd") + messages, stream, is_json, cmd = message + logger.info(f"on chat completion cmd with params {messages}, {stream}, {is_json}") + def callback(ten_env: TenEnv, cmd: Cmd, delta: str, is_final: bool, status_code = StatusCode.OK) -> None: + logger.info(f"on callback of chat completion {delta} {is_final} {status_code}") + cmd_result = CmdResult.create(status_code) + cmd_result.set_property_string("response", delta) + if is_final: + cmd_result.set_is_final(True) # end of streaming return + else: + cmd_result.set_is_final(False) # keep streaming return + ten_env.return_result(cmd_result, cmd) + + self.current_task = asyncio.create_task( + self._run_chat_completion_cmd(ten_env, messages, stream, is_json, functools.partial(callback, ten_env, cmd))) + else: + self.current_task = asyncio.create_task( + self._run_chatflow(ten_env, task_type, message, self.memory)) await self.current_task # Wait for the current task to finish or be cancelled except asyncio.CancelledError: logger.info(f"Task cancelled: {message}") + except: + logger.exception(f"failed to handle queue") async def _flush_queue(self): """Flushes the self.queue and cancels the current task.""" @@ -253,6 +294,45 @@ async def _flush_queue(self): logger.info("Cancelling the current task during flush.") self.current_task.cancel() + async def _run_chat_completion_cmd(self, ten_env: TenEnv, messages: List, stream: bool, is_json: bool, callback: Callable) -> None: + # Create an asyncio.Event to signal when content is finished + content_finished_event = asyncio.Event() + + async def handle_stream_content_update(content: str): + nonlocal callback + callback(content, False) + + async def handle_stream_content_finished(full_content: str): + nonlocal callback + callback("", True) + content_finished_event.set() + + async def handle_content_finished(full_content: str): + nonlocal callback + logger.info(f"handle_content_finished {full_content}") + callback(full_content, True) + content_finished_event.set() + + async def handle_refusal(refusal: str): + nonlocal callback + logger.info(f"handle_refusal {refusal}") + callback(refusal, True, StatusCode.ERROR) + content_finished_event.set() + + listener = AsyncEventEmitter() + if stream: + listener.on("content_update", handle_stream_content_update) + listener.on("content_finished", handle_stream_content_finished) + else: + listener.on("content_finished", handle_content_finished) + listener.on("on_refusal", handle_refusal) + + # Make an async API call to get chat completions + await self.openai_chatgpt.get_chat_completions(messages=messages, stream=stream, is_json=is_json, listener=listener) + + # Wait for the content to be finished + await content_finished_event.wait() + async def _run_chatflow(self, ten_env: TenEnv, task_type: str, input_text: str, memory): """Run the chatflow asynchronously.""" memory_cache = [] diff --git a/agents/ten_packages/extension/openai_chatgpt_python/openai.py b/agents/ten_packages/extension/openai_chatgpt_python/openai.py index 3449126d54..9402a39d8c 100644 --- a/agents/ten_packages/extension/openai_chatgpt_python/openai.py +++ b/agents/ten_packages/extension/openai_chatgpt_python/openai.py @@ -72,6 +72,73 @@ def __init__(self, config: OpenAIChatGPTConfig): self.session.proxies.update(proxies) self.client.session = self.session + async def get_chat_completions(self, messages, tools = None, stream = False, is_json = False, listener = None): + req = { + "model": self.config.model, + "messages": messages, + "tools": tools, + "temperature": self.config.temperature, + "top_p": self.config.top_p, + "presence_penalty": self.config.presence_penalty, + "frequency_penalty": self.config.frequency_penalty, + "max_tokens": self.config.max_tokens, + "seed": self.config.seed, + "stream": stream, + } + if is_json: + req["messages"] = [{"role": "system", "content": "You need to response in json format."}, *messages] + req["response_format"] = { "type": "json_object" } + + # logger.info(f"before request {req}") + try: + response = await self.client.chat.completions.create(**req) + except Exception as e: + raise Exception(f"CreateChatCompletionStream failed, err: {e}") + + full_content = "" + + if stream: + async for chat_completion in response: + choice = chat_completion.choices[0] + delta = choice.delta + + content = delta.content if delta and delta.content else "" + + # Emit content update event (fire-and-forget) + if listener and content: + listener.emit('content_update', content) + + full_content += content + + # Check for tool calls + if delta.tool_calls: + for tool_call in delta.tool_calls: + logger.info(f"tool_call: {tool_call}") + + # Emit tool call event (fire-and-forget) + if listener: + listener.emit('tool_call', tool_call) + else: + choice = response.choices[0] + logger.info(f"on response {response}") + if choice.message.refusal and listener: + listener.emit('on_refusal', choice.message.refusal) + return + + full_content = choice.message.content + + if choice.message.tool_calls: + for tool_call in choice.message.tool_calls: + logger.info(f"tool_call: {tool_call}") + + # Emit tool call event (fire-and-forget) + if listener: + listener.emit('tool_call', tool_call) + + # Emit content finished event after the loop completes + if listener: + listener.emit('content_finished', full_content) + async def get_chat_completions_stream(self, messages, tools = None, listener = None): req = { "model": self.config.model, diff --git a/agents/ten_packages/extension/vision_tool_python/BUILD.gn b/agents/ten_packages/extension/vision_tool_python/BUILD.gn new file mode 100644 index 0000000000..460593d0cc --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/BUILD.gn @@ -0,0 +1,21 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("vision_tool_python") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + ] +} diff --git a/agents/ten_packages/extension/vision_tool_python/README.md b/agents/ten_packages/extension/vision_tool_python/README.md new file mode 100644 index 0000000000..b00a31511a --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/README.md @@ -0,0 +1,29 @@ +# vision_tool_python + + + +## Features + + + +- xxx feature + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + + + +## Development + +### Build + + + +### Unit test + + + +## Misc + + diff --git a/agents/ten_packages/extension/vision_tool_python/__init__.py b/agents/ten_packages/extension/vision_tool_python/__init__.py new file mode 100644 index 0000000000..f8700b0553 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from . import addon +from .log import logger + +logger.info("vision_tool_python extension loaded") diff --git a/agents/ten_packages/extension/vision_tool_python/addon.py b/agents/ten_packages/extension/vision_tool_python/addon.py new file mode 100644 index 0000000000..afd8d21b51 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import VisionToolExtension +from .log import logger + + +@register_addon_as_extension("vision_tool_python") +class VisionToolExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("VisionToolExtensionAddon on_create_instance") + ten_env.on_create_instance_done(VisionToolExtension(name), context) diff --git a/agents/ten_packages/extension/vision_tool_python/extension.py b/agents/ten_packages/extension/vision_tool_python/extension.py new file mode 100644 index 0000000000..4afbbf4c19 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/extension.py @@ -0,0 +1,294 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import json +import functools + +from queue import Queue +from threading import Event, Thread +from typing import Any +from PIL import Image +from io import BytesIO +from base64 import b64encode + +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +from .log import logger + +CMD_TOOL_REGISTER = "tool_register" +CMD_TOOL_CALL = "tool_call" +CMD_PROPERTY_NAME = "name" +CMD_PROPERTY_ARGS = "args" + +CMD_CHAT_COMPLETION = "chat_completion" + +TOOL_REGISTER_PROPERTY_NAME = "name" +TOOL_REGISTER_PROPERTY_DESCRIPTON = "description" +TOOL_REGISTER_PROPERTY_PARAMETERS = "parameters" +TOOL_CALLBACK = "callback" + +# TODO auto register and unregister +SINGLE_FRAME_TOOL_NAME = "query_single_image" +SINGLE_FRAME_TOOL_DESCRIPTION = "Query to the latest frame from camera. The camera is always on, always use latest frame to answer user's question. Call this whenever you need to understand the input camera image like you have vision capability, for example when user asks 'What can you see?', 'Can you see me?', 'take a look.'" +SINGLE_FRAME_TOOL_PARAMETERS = { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The detail infomation use is interested in. You need to summary the conversation context first and ask for detail information, e.g. We saw a laptop on the desk just now, can you identify what language is the code shown in the laptop screen?" + } + }, + "required": ["query"], +} + +def resize_image_keep_aspect(image, max_size=512): + """ + Resize an image while maintaining its aspect ratio, ensuring the larger dimension is max_size. + If both dimensions are smaller than max_size, the image is not resized. + + :param image: A PIL Image object + :param max_size: The maximum size for the larger dimension (width or height) + :return: A PIL Image object (resized or original) + """ + # Get current width and height + width, height = image.size + + # If both dimensions are already smaller than max_size, return the original image + if width <= max_size and height <= max_size: + return image + + # Calculate the aspect ratio + aspect_ratio = width / height + + # Determine the new dimensions + if width > height: + new_width = max_size + new_height = int(max_size / aspect_ratio) + else: + new_height = max_size + new_width = int(max_size * aspect_ratio) + + # Resize the image with the new dimensions + resized_image = image.resize((new_width, new_height)) + + return resized_image + +def rgb2base64jpeg(rgb_data, width, height): + # Convert the RGB image to a PIL Image + pil_image = Image.frombytes("RGBA", (width, height), bytes(rgb_data)) + pil_image = pil_image.convert("RGB") + + # Resize the image while maintaining its aspect ratio + pil_image = resize_image_keep_aspect(pil_image, 320) + + # Save the image to a BytesIO object in JPEG format + buffered = BytesIO() + pil_image.save(buffered, format="JPEG") + # pil_image.save("test.jpg", format="JPEG") + + # Get the byte data of the JPEG image + jpeg_image_data = buffered.getvalue() + + # Convert the JPEG byte data to a Base64 encoded string + base64_encoded_image = b64encode(jpeg_image_data).decode("utf-8") + + # Create the data URL + mime_type = "image/jpeg" + base64_url = f"data:{mime_type};base64,{base64_encoded_image}" + return base64_url + +class VisionToolExtension(Extension): + max_history: int = 1 + history: list = [] + queue: Queue = Queue() + + thread: Thread = None + stopped: bool = False + + ten_env: TenEnv = None + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("VisionToolExtension on_init") + + self.tools = { + SINGLE_FRAME_TOOL_NAME: { + TOOL_REGISTER_PROPERTY_NAME: SINGLE_FRAME_TOOL_NAME, + TOOL_REGISTER_PROPERTY_DESCRIPTON: SINGLE_FRAME_TOOL_DESCRIPTION, + TOOL_REGISTER_PROPERTY_PARAMETERS: SINGLE_FRAME_TOOL_PARAMETERS, + TOOL_CALLBACK: self._ask_to_single_frame + } + } + + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("VisionToolExtension on_start") + + self.ten_env = ten_env + + self.thread = Thread(target=self.loop) + self.thread.start() + + # Register func + for name, tool in self.tools.items(): + c = Cmd.create(CMD_TOOL_REGISTER) + c.set_property_string(TOOL_REGISTER_PROPERTY_NAME, name) + c.set_property_string(TOOL_REGISTER_PROPERTY_DESCRIPTON, tool[TOOL_REGISTER_PROPERTY_DESCRIPTON]) + c.set_property_string(TOOL_REGISTER_PROPERTY_PARAMETERS, json.dumps(tool[TOOL_REGISTER_PROPERTY_PARAMETERS])) + ten_env.send_cmd(c, lambda ten, result: logger.info(f"register done, {result}")) + + ten_env.on_start_done() + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("VisionToolExtension on_stop") + + self.stopped = True + self.queue.put(None) + self.thread.join() + + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("VisionToolExtension on_deinit") + ten_env.on_deinit_done() + + def loop(self) -> None: + while not self.stopped: + t = self.queue.get() + if t is None: + break + + try: + # unpack + callback, args, cmd = t + logger.info(f"before callback {args}") + resp = callback(args) + logger.info(f"after callback {resp}") + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("response", json.dumps(resp)) + self.ten_env.return_result(cmd_result, cmd) + except: + logger.exception(f"Failed to fetch from queue") + if cmd: + cmd_result = CmdResult.create(StatusCode.ERROR) + self.ten_env.return_result(cmd_result, cmd) + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + logger.info("on_cmd name {}".format(cmd_name)) + + # FIXME need to handle async + try: + name = cmd.get_property_string(CMD_PROPERTY_NAME) + if name in self.tools: + try: + tool = self.tools[name] + args = cmd.get_property_string(CMD_PROPERTY_ARGS) + arg_dict = json.loads(args) + self.queue.put((tool[TOOL_CALLBACK], arg_dict, cmd)) + # will return result later + return + except: + logger.exception("Failed to callback") + cmd_result = CmdResult.create(StatusCode.ERROR) + ten_env.return_result(cmd_result, cmd) + return + else: + logger.error(f"unknown tool name {name}") + except: + logger.exception("Failed to get tool name") + cmd_result = CmdResult.create(StatusCode.ERROR) + ten_env.return_result(cmd_result, cmd) + return + + cmd_result = CmdResult.create(StatusCode.OK) + ten_env.return_result(cmd_result, cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + pass + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + self.history.append((video_frame.get_buf(), video_frame.get_width(), video_frame.get_height())) + diff = len(self.history) > self.max_history + if diff > 0: + self.history = self.history[diff:] + + def _get_latest_frame(self, args:dict) -> Any: + return None + + def _get_multi_frame(self, args:dict) -> Any: + return None + + # TODO async + def _ask_to_single_frame(self, args:dict) -> Any: + if "query" not in args: + raise Exception("Failed to get property") + + if not self.history: + raise Exception("Failed to get frames") + + query = args["query"] + buff, width, height = self.history[len(self.history) - 1] + logger.info(f"get frame ok {width} {height} for {query}") + + url = rgb2base64jpeg(buff, width, height) + messages = [{ + "role": "system", + "content": "You need to describe all the object in this image first, and then focus on the user's query. Keep your response short and simple unless the query ask you to." + },{ + "role": "user", + "content": [ + {"type": "text", "text": query}, + {"type": "image_url", "image_url": {"url": url}}, + ], + }] + # logger.debug(f"after prepare message: {messages}") + # Send message + cmd = Cmd.create("chat_completion") + cmd.set_property_string("messages", json.dumps(messages)) + cmd.set_property_bool("stream", False) # this is function call, we need to have complete result + # cmd.set_property_bool("json", True) + + e = Event() + rst = None + failed = True + def on_result(evt:Event, ten_env: TenEnv, result: CmdResult) -> None: + nonlocal rst + nonlocal failed + try: + if result.get_status_code() == StatusCode.OK: + rst = result.get_property_string("response") + # rst = json.loads(resp_str) + failed = False + else: + logger.error(f"Failed to get ok result") + rst = result.get_property_string("reason") + except: + logger.exception(f"Failed to get response") + finally: + evt.set() + + self.ten_env.send_cmd(cmd, functools.partial(on_result, e)) + e.wait() + if failed: + raise Exception("Failed to get resp") + else: + return rst + + def _ask_to_multi_frame(self, args:dict) -> Any: + return None \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/log.py b/agents/ten_packages/extension/vision_tool_python/log.py new file mode 100644 index 0000000000..cb7f95807f --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/log.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("vision_tool_python") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/extension/vision_tool_python/manifest.json b/agents/ten_packages/extension/vision_tool_python/manifest.json new file mode 100644 index 0000000000..f400730c17 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/manifest.json @@ -0,0 +1,81 @@ +{ + "type": "extension", + "name": "vision_tool_python", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.2" + }, + { + "type": "extension", + "name": "agora_rtc", + "version": "=0.7.0-rc2" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md" + ] + }, + "api": { + "property": { + "history": { + "type": "int64" + }, + "frequency_ms": { + "type": "int64" + } + }, + "cmd_out": [ + { + "name": "tool_register", + "property": { + "name": { + "type": "string" + }, + "description": { + "type": "string" + }, + "parameters": { + "type": "string" + } + }, + "required": [ + "name", + "description", + "parameters" + ], + "result": { + "property": { + "response": { + "type": "string" + } + } + } + } + ], + "cmd_in": [ + { + "name": "tool_call", + "property": { + "name": { + "type": "string" + }, + "args": { + "type": "string" + } + }, + "required": [ + "name" + ] + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/property.json b/agents/ten_packages/extension/vision_tool_python/property.json new file mode 100644 index 0000000000..9e26dfeeb6 --- /dev/null +++ b/agents/ten_packages/extension/vision_tool_python/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file From b0eb9bf36f80b0a02b09f9f1593bfdd65324fce2 Mon Sep 17 00:00:00 2001 From: TomasLiu Date: Mon, 14 Oct 2024 22:17:13 +0800 Subject: [PATCH 02/10] add azure vision --- .../extension/azure_vision_python/BUILD.gn | 21 +++ .../extension/azure_vision_python/README.md | 29 +++ .../extension/azure_vision_python/__init__.py | 11 ++ .../extension/azure_vision_python/addon.py | 22 +++ .../azure_vision_python/extension.py | 153 ++++++++++++++++ .../extension/azure_vision_python/log.py | 22 +++ .../azure_vision_python/manifest.json | 23 +++ .../azure_vision_python/property.json | 1 + .../azure_vision_python/requirements.txt | 1 + .../extension/openai_v2v_python/extension.py | 3 + .../extension/vision_tool_python/extension.py | 166 ++++++++++++------ .../vision_tool_python/manifest.json | 3 + 12 files changed, 406 insertions(+), 49 deletions(-) create mode 100644 agents/ten_packages/extension/azure_vision_python/BUILD.gn create mode 100644 agents/ten_packages/extension/azure_vision_python/README.md create mode 100644 agents/ten_packages/extension/azure_vision_python/__init__.py create mode 100644 agents/ten_packages/extension/azure_vision_python/addon.py create mode 100644 agents/ten_packages/extension/azure_vision_python/extension.py create mode 100644 agents/ten_packages/extension/azure_vision_python/log.py create mode 100644 agents/ten_packages/extension/azure_vision_python/manifest.json create mode 100644 agents/ten_packages/extension/azure_vision_python/property.json create mode 100644 agents/ten_packages/extension/azure_vision_python/requirements.txt diff --git a/agents/ten_packages/extension/azure_vision_python/BUILD.gn b/agents/ten_packages/extension/azure_vision_python/BUILD.gn new file mode 100644 index 0000000000..1211cff40d --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/BUILD.gn @@ -0,0 +1,21 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("azure_vision_python") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + ] +} diff --git a/agents/ten_packages/extension/azure_vision_python/README.md b/agents/ten_packages/extension/azure_vision_python/README.md new file mode 100644 index 0000000000..525362b3c1 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/README.md @@ -0,0 +1,29 @@ +# azure_vision_python + + + +## Features + + + +- xxx feature + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + + + +## Development + +### Build + + + +### Unit test + + + +## Misc + + diff --git a/agents/ten_packages/extension/azure_vision_python/__init__.py b/agents/ten_packages/extension/azure_vision_python/__init__.py new file mode 100644 index 0000000000..e742dc3ef1 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from . import addon +from .log import logger + +logger.info("azure_vision_python extension loaded") diff --git a/agents/ten_packages/extension/azure_vision_python/addon.py b/agents/ten_packages/extension/azure_vision_python/addon.py new file mode 100644 index 0000000000..161ce5c358 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import AzureVisionExtension +from .log import logger + + +@register_addon_as_extension("azure_vision_python") +class AzureVisionExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("AzureVisionExtensionAddon on_create_instance") + ten_env.on_create_instance_done(AzureVisionExtension(name), context) diff --git a/agents/ten_packages/extension/azure_vision_python/extension.py b/agents/ten_packages/extension/azure_vision_python/extension.py new file mode 100644 index 0000000000..84eeadce82 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/extension.py @@ -0,0 +1,153 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import json + +from typing import Any +from azure.ai.vision.imageanalysis import ImageAnalysisClient +from azure.ai.vision.imageanalysis.models import VisualFeatures +from azure.core.credentials import AzureKeyCredential + +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +from .log import logger + +PROPERTY_KEY = "key" +PROPERTY_ENDPOINT = "endpoint" + +CMD_IMAGE_ANALYZE = "image_analyze" + +class AzureVisionExtension(Extension): + key: str = "" + endpoint: str = "https://tenagentvision.cognitiveservices.azure.com/" + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("AzureVisionExtension on_init") + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("AzureVisionExtension on_start") + + try: + self.key = ten_env.get_property_string(PROPERTY_KEY) + except Exception as err: + logger.error(f"GetProperty optional {PROPERTY_KEY} error: {err}") + return + + try: + self.endpoint = ten_env.get_property_string(PROPERTY_ENDPOINT) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_ENDPOINT} error: {err}") + + ten_env.on_start_done() + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("AzureVisionExtension on_stop") + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("AzureVisionExtension on_deinit") + ten_env.on_deinit_done() + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + cmd_name = cmd.get_name() + logger.info("on_cmd name {}".format(cmd_name)) + + if cmd_name == CMD_IMAGE_ANALYZE: + try: + image_data = cmd.get_property_buf("image_data") + resp = self._analyze_image(image_data) + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("response", json.dumps(resp)) + ten_env.return_result(cmd_result, cmd) + return + except: + logger.exception("Failed to handle analyze") + + cmd_result = CmdResult.create(StatusCode.OK) + ten_env.return_result(cmd_result, cmd) + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + pass + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + pass + + def _analyze_image(self, image_data: bytes) -> Any: + client = ImageAnalysisClient( + endpoint=self.endpoint, + credential=AzureKeyCredential(self.key) + ) + + # Get a caption for the image. This will be a synchronously (blocking) call. + result = client.analyze( + image_data=image_data, + visual_features=[VisualFeatures.TAGS, VisualFeatures.CAPTION, VisualFeatures.READ, VisualFeatures.PEOPLE, VisualFeatures.OBJECTS], + gender_neutral_caption=True, + ) + + logger.info(f"before return {result}") + + rst = {} + if result.tags is not None: + tags = [] + for tag in result.tags.list: + tags.append({ + "name": tag.name, + "confidence": tag.confidence + }) + rst["tags"] = tags + + if result.caption is not None: + rst["caption"] = { + "text": result.caption.text, + "confidence": result.caption.confidence + } + + if result.read is not None: + lines = [] + for block in result.read.blocks: + for line in block.lines: + lines.append({ + "text": line.text, + "bounding_box": str(line.bounding_polygon), + }) + rst["read"] = lines + + if result.objects is not None: + objects = [] + for object in result.objects.list: + objects.append({ + "name": object.tags[0].name, + "bounding_box": str(object.bounding_box), + "confidence": object.tags[0].confidence + }) + rst["objects"] = objects + + if result.people is not None: + people = [] + for person in result.people.list: + people.append({ + "bounding_box": str(person.bounding_box), + "confidence": person.confidence + }) + rst["people"] = people + + logger.info(f"after parse {rst}") + + return rst \ No newline at end of file diff --git a/agents/ten_packages/extension/azure_vision_python/log.py b/agents/ten_packages/extension/azure_vision_python/log.py new file mode 100644 index 0000000000..dd55cbfdd5 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/log.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("azure_vision_python") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/extension/azure_vision_python/manifest.json b/agents/ten_packages/extension/azure_vision_python/manifest.json new file mode 100644 index 0000000000..c6e5cf1a84 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/manifest.json @@ -0,0 +1,23 @@ +{ + "type": "extension", + "name": "azure_vision_python", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.2" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md" + ] + }, + "api": {} +} \ No newline at end of file diff --git a/agents/ten_packages/extension/azure_vision_python/property.json b/agents/ten_packages/extension/azure_vision_python/property.json new file mode 100644 index 0000000000..9e26dfeeb6 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/azure_vision_python/requirements.txt b/agents/ten_packages/extension/azure_vision_python/requirements.txt new file mode 100644 index 0000000000..830deb56e5 --- /dev/null +++ b/agents/ten_packages/extension/azure_vision_python/requirements.txt @@ -0,0 +1 @@ +azure-ai-vision-imageanalysis \ No newline at end of file diff --git a/agents/ten_packages/extension/openai_v2v_python/extension.py b/agents/ten_packages/extension/openai_v2v_python/extension.py index 57ee39456f..0dd18656a9 100644 --- a/agents/ten_packages/extension/openai_v2v_python/extension.py +++ b/agents/ten_packages/extension/openai_v2v_python/extension.py @@ -172,6 +172,9 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: def on_config_changed(self) -> None: # update session again + if self._update_session: + logger.info("update session after config changed") + self._update_session() return async def _init_connection(self): diff --git a/agents/ten_packages/extension/vision_tool_python/extension.py b/agents/ten_packages/extension/vision_tool_python/extension.py index 4afbbf4c19..21cd865e32 100644 --- a/agents/ten_packages/extension/vision_tool_python/extension.py +++ b/agents/ten_packages/extension/vision_tool_python/extension.py @@ -8,9 +8,10 @@ import json import functools +from datetime import datetime from queue import Queue from threading import Event, Thread -from typing import Any +from typing import Any, List, Tuple, Union from PIL import Image from io import BytesIO from base64 import b64encode @@ -27,6 +28,10 @@ ) from .log import logger +PROPERTY_HISTORY = "history" +PROPERTY_FREQUENCY_MS = "frequency_ms" +PROPERTY_USE_LLM = "use_llm" + CMD_TOOL_REGISTER = "tool_register" CMD_TOOL_CALL = "tool_call" CMD_PROPERTY_NAME = "name" @@ -53,6 +58,8 @@ "required": ["query"], } +SINGLE_FRAME_TOOL_NON_LLM_PARAMETERS = {} + def resize_image_keep_aspect(image, max_size=512): """ Resize an image while maintaining its aspect ratio, ensuring the larger dimension is max_size. @@ -85,7 +92,7 @@ def resize_image_keep_aspect(image, max_size=512): return resized_image -def rgb2base64jpeg(rgb_data, width, height): +def rgb2base64jpeg(rgb_data: bytes, width: int, height: int, raw: bool = False) -> Union[bytes, str]: # Convert the RGB image to a PIL Image pil_image = Image.frombytes("RGBA", (width, height), bytes(rgb_data)) pil_image = pil_image.convert("RGB") @@ -100,6 +107,8 @@ def rgb2base64jpeg(rgb_data, width, height): # Get the byte data of the JPEG image jpeg_image_data = buffered.getvalue() + if raw: + return jpeg_image_data # Convert the JPEG byte data to a Base64 encoded string base64_encoded_image = b64encode(jpeg_image_data).decode("utf-8") @@ -111,8 +120,14 @@ def rgb2base64jpeg(rgb_data, width, height): class VisionToolExtension(Extension): max_history: int = 1 + frequency_ms: int = 60 + use_llm: bool = True + history: list = [] queue: Queue = Queue() + last_capture: datetime = None + llm_tools = {} + tools = {} thread: Thread = None stopped: bool = False @@ -122,12 +137,22 @@ class VisionToolExtension(Extension): def on_init(self, ten_env: TenEnv) -> None: logger.info("VisionToolExtension on_init") + # Change tool! self.tools = { + SINGLE_FRAME_TOOL_NAME: { + TOOL_REGISTER_PROPERTY_NAME: SINGLE_FRAME_TOOL_NAME, + TOOL_REGISTER_PROPERTY_DESCRIPTON: SINGLE_FRAME_TOOL_DESCRIPTION, + TOOL_REGISTER_PROPERTY_PARAMETERS: SINGLE_FRAME_TOOL_NON_LLM_PARAMETERS, + TOOL_CALLBACK: functools.partial(self._ask_to_latest_frames, 1) + } + } + + self.llm_tools = { SINGLE_FRAME_TOOL_NAME: { TOOL_REGISTER_PROPERTY_NAME: SINGLE_FRAME_TOOL_NAME, TOOL_REGISTER_PROPERTY_DESCRIPTON: SINGLE_FRAME_TOOL_DESCRIPTION, TOOL_REGISTER_PROPERTY_PARAMETERS: SINGLE_FRAME_TOOL_PARAMETERS, - TOOL_CALLBACK: self._ask_to_single_frame + TOOL_CALLBACK: functools.partial(self._ask_to_latest_frames, 1) } } @@ -141,13 +166,20 @@ def on_start(self, ten_env: TenEnv) -> None: self.thread = Thread(target=self.loop) self.thread.start() - # Register func - for name, tool in self.tools.items(): - c = Cmd.create(CMD_TOOL_REGISTER) - c.set_property_string(TOOL_REGISTER_PROPERTY_NAME, name) - c.set_property_string(TOOL_REGISTER_PROPERTY_DESCRIPTON, tool[TOOL_REGISTER_PROPERTY_DESCRIPTON]) - c.set_property_string(TOOL_REGISTER_PROPERTY_PARAMETERS, json.dumps(tool[TOOL_REGISTER_PROPERTY_PARAMETERS])) - ten_env.send_cmd(c, lambda ten, result: logger.info(f"register done, {result}")) + try: + self.max_history = ten_env.get_property_string(PROPERTY_HISTORY) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_HISTORY} error: {err}") + + try: + self.frequency_ms = ten_env.get_property_int(PROPERTY_FREQUENCY_MS) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_FREQUENCY_MS} error: {err}") + + try: + self.use_llm = ten_env.get_property_bool(PROPERTY_USE_LLM) + except Exception as err: + logger.info(f"GetProperty optional {PROPERTY_USE_LLM} error: {err}") ten_env.on_start_done() @@ -223,46 +255,54 @@ def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: pass def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: - self.history.append((video_frame.get_buf(), video_frame.get_width(), video_frame.get_height())) - diff = len(self.history) > self.max_history - if diff > 0: - self.history = self.history[diff:] - - def _get_latest_frame(self, args:dict) -> Any: - return None - - def _get_multi_frame(self, args:dict) -> Any: - return None - - # TODO async - def _ask_to_single_frame(self, args:dict) -> Any: - if "query" not in args: - raise Exception("Failed to get property") + if self.last_capture is None: + # Register func after video is captured + tool_targets = self.tools + if self.use_llm: + tool_targets = self.llm_tools + + for name, tool in tool_targets.items(): + c = Cmd.create(CMD_TOOL_REGISTER) + c.set_property_string(TOOL_REGISTER_PROPERTY_NAME, name) + c.set_property_string(TOOL_REGISTER_PROPERTY_DESCRIPTON, tool[TOOL_REGISTER_PROPERTY_DESCRIPTON]) + c.set_property_string(TOOL_REGISTER_PROPERTY_PARAMETERS, json.dumps(tool[TOOL_REGISTER_PROPERTY_PARAMETERS])) + ten_env.send_cmd(c, lambda ten, result: logger.info(f"register done, {result}")) + + now = datetime.now() + if self.frequency_ms and (not self.last_capture or (now - self.last_capture).total_seconds() * 1000 > self.frequency_ms): + self.history.append((now, video_frame.get_buf(), video_frame.get_width(), video_frame.get_height())) + self.last_capture = now + + diff = len(self.history) > self.max_history + if diff > 0: + self.history = self.history[diff:] + + def _get_latest_frames(self, count:int = 3, raw: bool = False) -> Tuple[datetime, List[Union[bytes, str]]]: + start = len(self.history) - count + if start < 0: + start = 0 + result = [] + min_ts = None + for i in range(start, len(self.history)): + ts, buff, width, height = self.history[i] + if not min_ts or ts < min_ts: + min_ts = ts + result.append(rgb2base64jpeg(buff, width, height, raw = raw)) + + return min_ts, result + + def _ask_to_latest_frames(self, count:int, args:dict = {}) -> Any: if not self.history: raise Exception("Failed to get frames") - - query = args["query"] - buff, width, height = self.history[len(self.history) - 1] - logger.info(f"get frame ok {width} {height} for {query}") - - url = rgb2base64jpeg(buff, width, height) - messages = [{ - "role": "system", - "content": "You need to describe all the object in this image first, and then focus on the user's query. Keep your response short and simple unless the query ask you to." - },{ - "role": "user", - "content": [ - {"type": "text", "text": query}, - {"type": "image_url", "image_url": {"url": url}}, - ], - }] - # logger.debug(f"after prepare message: {messages}") - # Send message - cmd = Cmd.create("chat_completion") - cmd.set_property_string("messages", json.dumps(messages)) - cmd.set_property_bool("stream", False) # this is function call, we need to have complete result - # cmd.set_property_bool("json", True) + + if self.use_llm: + min_ts, frames = self._get_latest_frames(count=count) + ts = min_ts.strftime("%Y-%m-%d %H:%M:%S") + cmd = self._chat_completion(args, frames, ts) + else: + _, frames = self._get_latest_frames(count=1, raw=True) + cmd = self._analyze_frame(frames[0]) e = Event() rst = None @@ -289,6 +329,34 @@ def on_result(evt:Event, ten_env: TenEnv, result: CmdResult) -> None: raise Exception("Failed to get resp") else: return rst + + def _analyze_frame(self, frame: bytes) -> Cmd: + cmd = Cmd.create("image_analyze") + cmd.set_property_buf("image_data", frame) + # TODO What to analyze + return cmd + + def _chat_completion(self, args:dict, frames: List[str], ts: datetime) -> Cmd: + if "query" not in args: + raise Exception("Failed to get property") + + query = args["query"] + contents = [{"type": "text", "text": f"This is the image captured within {self.frequency_ms} ms at {ts}. {query}"}] + for f in frames: + contents.append({"type": "image_url", "image_url": {"url": f}}) + + messages = [{ + "role": "system", + "content": "You need to describe all the object in this image first, and then focus on the user's query. Keep your response short and simple unless the query ask you to." + },{ + "role": "user", + "content": contents, + }] - def _ask_to_multi_frame(self, args:dict) -> Any: - return None \ No newline at end of file + # logger.debug(f"after prepare message: {messages}") + # Send message + cmd = Cmd.create("chat_completion") + cmd.set_property_string("messages", json.dumps(messages)) + cmd.set_property_bool("stream", False) # this is function call, we need to have complete result + # cmd.set_property_bool("json", True) + return cmd \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/manifest.json b/agents/ten_packages/extension/vision_tool_python/manifest.json index f400730c17..980f8210bc 100644 --- a/agents/ten_packages/extension/vision_tool_python/manifest.json +++ b/agents/ten_packages/extension/vision_tool_python/manifest.json @@ -31,6 +31,9 @@ }, "frequency_ms": { "type": "int64" + }, + "use_llm": { + "type": "bool" } }, "cmd_out": [ From 1e36a3da8422b87079bfb1a1c6aa250cce5917d0 Mon Sep 17 00:00:00 2001 From: TomasLiu Date: Tue, 15 Oct 2024 12:09:40 +0800 Subject: [PATCH 03/10] fix readme and manifest --- .../extension/azure_vision_python/README.md | 47 +++++++++++------ .../azure_vision_python/manifest.json | 31 ++++++++++- .../bingsearch_tool_python/README.md | 27 +++++----- .../bingsearch_tool_python/manifest.json | 52 ++++++++++++++++++- .../extension/vision_tool_python/README.md | 33 +++++++----- .../weatherapi_tool_python/README.md | 20 +++---- 6 files changed, 155 insertions(+), 55 deletions(-) diff --git a/agents/ten_packages/extension/azure_vision_python/README.md b/agents/ten_packages/extension/azure_vision_python/README.md index 525362b3c1..e8cb3ca4b6 100644 --- a/agents/ten_packages/extension/azure_vision_python/README.md +++ b/agents/ten_packages/extension/azure_vision_python/README.md @@ -1,29 +1,44 @@ # azure_vision_python - +This is the extension calling azure ai vision. -## Features - - +The document is as follow: https://learn.microsoft.com/zh-cn/azure/ai-services/computer-vision/overview -- xxx feature +## Properties -## API - -Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). +- key +- endpoint - - -## Development +## Features -### Build +- Only support one frame of image +- No customization for feature +- By default will include `TAGS`, `CAPTION`, `READ`, `PEOPLE`, `OBJECTS` - +## API -### Unit test +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). - +Other extensions can call `analyze_image` cmd and will get all analyze result from result in `response` property, the result will looks like this: + +``` json +{ + "modelVersion": "2023-10-01", + "captionResult": { + "text": "a group of toys on a table", + "confidence": 0.7558467388153076 + }, + "metadata": { + "width": 320, + "height": 240 + }, + "objectsResult": {}, + "readResult": {}, + "peopleResult": {} +} +``` ## Misc - +- Video analyze +- Multi-frame analyze \ No newline at end of file diff --git a/agents/ten_packages/extension/azure_vision_python/manifest.json b/agents/ten_packages/extension/azure_vision_python/manifest.json index c6e5cf1a84..d9448f453d 100644 --- a/agents/ten_packages/extension/azure_vision_python/manifest.json +++ b/agents/ten_packages/extension/azure_vision_python/manifest.json @@ -19,5 +19,34 @@ "README.md" ] }, - "api": {} + "api": { + "property": { + "key": { + "type": "string" + }, + "endpoint": { + "type": "string" + } + } + }, + "cmd_in": [ + { + "name": "analyze_image", + "property": { + "image_data": { + "type": "buf" + } + }, + "required": [ + "image_data" + ], + "result": { + "property": { + "response": { + "type": "string" + } + } + } + } + ] } \ No newline at end of file diff --git a/agents/ten_packages/extension/bingsearch_tool_python/README.md b/agents/ten_packages/extension/bingsearch_tool_python/README.md index 581fdf5ef1..b5b20b98bc 100644 --- a/agents/ten_packages/extension/bingsearch_tool_python/README.md +++ b/agents/ten_packages/extension/bingsearch_tool_python/README.md @@ -1,29 +1,26 @@ # bingsearch_tool_python - +This is tool for bing search, the document link is as follow: https://learn.microsoft.com/en-us/bing/search-apis/bing-web-search/quickstarts/rest/python + +It is built using TEN Tool Call Protocol (Beta). ## Features - +It is the bing search tool that will auto register to any llm extension. + +The tool description is as follow: -- xxx feature +*Use Bing.com to search for latest information. Call this function if you are not sure about the answer.* ## API Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). - - -## Development - -### Build - - - -### Unit test - - +- out: tool_register +- in: tool_call ## Misc - +- use Tool Call Protocol Standard +- support async call +- apply asyncio template diff --git a/agents/ten_packages/extension/bingsearch_tool_python/manifest.json b/agents/ten_packages/extension/bingsearch_tool_python/manifest.json index 3e5a4193eb..47f22fdd55 100644 --- a/agents/ten_packages/extension/bingsearch_tool_python/manifest.json +++ b/agents/ten_packages/extension/bingsearch_tool_python/manifest.json @@ -19,5 +19,55 @@ "README.md" ] }, - "api": {} + "api": { + "property": { + "api_key": { + "type": "string" + } + }, + "cmd_out": [ + { + "name": "tool_register", + "property": { + "name": { + "type": "string" + }, + "description": { + "type": "string" + }, + "parameters": { + "type": "string" + } + }, + "required": [ + "name", + "description", + "parameters" + ], + "result": { + "property": { + "response": { + "type": "string" + } + } + } + } + ], + "cmd_in": [ + { + "name": "tool_call", + "property": { + "name": { + "type": "string" + }, + "args": { + "type": "string" + } + }, + "required": [ + "name" + ] + } + ] + } } \ No newline at end of file diff --git a/agents/ten_packages/extension/vision_tool_python/README.md b/agents/ten_packages/extension/vision_tool_python/README.md index b00a31511a..74278fb921 100644 --- a/agents/ten_packages/extension/vision_tool_python/README.md +++ b/agents/ten_packages/extension/vision_tool_python/README.md @@ -1,29 +1,36 @@ # vision_tool_python - +This is tool for vision ability, currently there are two patterns: +- use triditional model +- use multimodal llm model -## Features +The pattern can be switched by `use_llm` pattern to use different cmd protocol. - +Tool description is as follow: -- xxx feature +*Query to the latest frame from camera. The camera is always on, always use latest frame to answer user's question. Call this whenever you need to understand the input camera image like you have vision capability, for example when user asks 'What can you see?', 'Can you see me?', 'take a look.'* -## API +It is built using TEN Tool Call Protocol (Beta). -Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). +## Features - +The tool can accept video frame from rtc extension. -## Development +The tool will only register itself to llm extension as soon as the video frame is received. -### Build +The tool will cache video frame every `frequency_ms` ms. - +## API -### Unit test +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). - +- out: `tool_register` +- in: `tool_call` +- out(`use_llm=false`): `analyze_image` +- out(`use_llm=true`): `chat_completion` ## Misc - +- Multi-frame support +- Movement detection +- Prompt Engineering \ No newline at end of file diff --git a/agents/ten_packages/extension/weatherapi_tool_python/README.md b/agents/ten_packages/extension/weatherapi_tool_python/README.md index de7c18aa2d..81808f90df 100644 --- a/agents/ten_packages/extension/weatherapi_tool_python/README.md +++ b/agents/ten_packages/extension/weatherapi_tool_python/README.md @@ -1,21 +1,23 @@ # weatherapi_tool_python -This is the tool demo for weather query. +This is the tool for weather query, including current weather, broadcast and history weather check, the document link is as follow: https://www.weatherapi.com/docs/ + +It is built using TEN Tool Call Protocol (Beta). ## Features +For free plan: - Fetch today's weather. -- Search for history weather. +- Search for history weather within 7 days. - Forcast weather in 3 days. -## API - -Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). +You can extend by using other plan in your project. -### Out: +https://www.weatherapi.com/pricing.aspx -- `tool_register`: auto register tool to llm +## API -### In: +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). -- `tool_call`: sync cmd to fetch weather +- out: tool_register +- in: tool_call From 0a9b4260b517b3458df2d1c1bc5af8fc5e158669 Mon Sep 17 00:00:00 2001 From: TomasLiu Date: Wed, 16 Oct 2024 23:09:41 +0800 Subject: [PATCH 04/10] wip --- .../azure_vision_python/manifest.json | 36 ++--- .../bedrock_llm_python/bedrock_llm.py | 28 ++++ .../bedrock_llm_extension.py | 139 +++++++++++++++++- .../extension/vision_tool_python/extension.py | 9 +- 4 files changed, 184 insertions(+), 28 deletions(-) diff --git a/agents/ten_packages/extension/azure_vision_python/manifest.json b/agents/ten_packages/extension/azure_vision_python/manifest.json index d9448f453d..c8b3b8fa47 100644 --- a/agents/ten_packages/extension/azure_vision_python/manifest.json +++ b/agents/ten_packages/extension/azure_vision_python/manifest.json @@ -27,26 +27,26 @@ "endpoint": { "type": "string" } - } - }, - "cmd_in": [ - { - "name": "analyze_image", - "property": { - "image_data": { - "type": "buf" - } - }, - "required": [ - "image_data" - ], - "result": { + }, + "cmd_in": [ + { + "name": "analyze_image", "property": { - "response": { - "type": "string" + "image_data": { + "type": "buf" + } + }, + "required": [ + "image_data" + ], + "result": { + "property": { + "response": { + "type": "string" + } } } } - } - ] + ] + } } \ No newline at end of file diff --git a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py b/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py index c833f7b7c5..71dc934c5f 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py +++ b/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm.py @@ -1,4 +1,6 @@ import boto3 + +from typing import List, Any from .log import logger class BedrockLLMConfig: @@ -71,5 +73,31 @@ def get_converse_stream(self, messages): try: response = self.client.converse_stream(**bedrock_req_params) return response + except Exception as e: + raise Exception(f"GetConverseStream failed, err: {e}") + + def chat_completion_cmd(self, messages: List[Any], stream: bool, is_json: bool): + bedrock_req_params = { + "modelId": self.config.model, + "messages": messages, + "inferenceConfig": { + "temperature": self.config.temperature, + "maxTokens": self.config.max_tokens, + "topP": self.config.top_p, + # "stopSequences": [], + }, + # "additionalModelRequestFields": additional_model_fields, + } + + logger.info(f"before chat {bedrock_req_params}") + + f = self.client.converse_stream + if not stream: + f = self.client.converse + + try: + response = f(**bedrock_req_params) + logger.info(f"after chat {response}") + return response except Exception as e: raise Exception(f"GetConverseStream failed, err: {e}") \ No newline at end of file diff --git a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py b/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py index 5e7918a8d8..cf6e5d52d5 100644 --- a/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py +++ b/agents/ten_packages/extension/bedrock_llm_python/bedrock_llm_extension.py @@ -1,6 +1,14 @@ -from .bedrock_llm import BedrockLLM, BedrockLLMConfig + +import json +import copy + from datetime import datetime from threading import Thread +from queue import Queue +from threading import Thread +from typing import List, Any, AnyStr +from base64 import b64decode + from ten import ( Addon, Extension, @@ -12,6 +20,7 @@ CmdResult, ) from .log import logger +from .bedrock_llm import BedrockLLM, BedrockLLMConfig CMD_IN_FLUSH = "flush" @@ -21,6 +30,8 @@ DATA_OUT_TEXT_DATA_PROPERTY_TEXT = "text" DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT = "end_of_segment" +CMD_IN_CHAT_COMPLETION = "chat_completion" + PROPERTY_REGION = "region" # Optional PROPERTY_ACCESS_KEY = "access_key" # Optional PROPERTY_SECRET_KEY = "secret_key" # Optional @@ -68,10 +79,17 @@ class BedrockLLMExtension(Extension): max_memory_length = 10 outdate_ts = 0 bedrock_llm = None + greeting = "" + + stopped: bool = False + cmd_queue = Queue() + thread: Thread = None + ten_env: TenEnv = None def on_start(self, ten: TenEnv) -> None: logger.info("BedrockLLMExtension on_start") # Prepare configuration + self.ten_env = ten bedrock_llm_config = BedrockLLMConfig.default_config() for optional_str_param in [ @@ -110,7 +128,7 @@ def on_start(self, ten: TenEnv) -> None: ) try: - greeting = ten.get_property_string(PROPERTY_GREETING) + self.greeting = ten.get_property_string(PROPERTY_GREETING) except Exception as err: logger.debug( f"GetProperty optional {PROPERTY_GREETING} failed, err: {err}." @@ -134,24 +152,30 @@ def on_start(self, ten: TenEnv) -> None: except Exception as err: logger.exception(f"newBedrockLLM failed, err: {err}") + self.thread = Thread(target=self.loop) + self.thread.start() + # Send greeting if available - if greeting: + if self.greeting: try: output_data = Data.create("text_data") output_data.set_property_string( - DATA_OUT_TEXT_DATA_PROPERTY_TEXT, greeting + DATA_OUT_TEXT_DATA_PROPERTY_TEXT, self.greeting ) output_data.set_property_bool( DATA_OUT_TEXT_DATA_PROPERTY_TEXT_END_OF_SEGMENT, True ) ten.send_data(output_data) - logger.info(f"greeting [{greeting}] sent") + logger.info(f"greeting [{self.greeting}] sent") except Exception as err: - logger.info(f"greeting [{greeting}] send failed, err: {err}") + logger.info(f"greeting [{self.greeting}] send failed, err: {err}") ten.on_start_done() def on_stop(self, ten: TenEnv) -> None: logger.info("BedrockLLMExtension on_stop") + self.stopped = True + self.cmd_queue.put(None) + self.thread.join() ten.on_stop_done() def on_cmd(self, ten: TenEnv, cmd: Cmd) -> None: @@ -166,6 +190,23 @@ def on_cmd(self, ten: TenEnv, cmd: Cmd) -> None: cmd_out = Cmd.create(CMD_OUT_FLUSH) ten.send_cmd(cmd_out, None) logger.info(f"BedrockLLMExtension on_cmd sent flush") + elif cmd_name == CMD_IN_CHAT_COMPLETION: + m_str = cmd.get_property_string("messages") + messages = json.loads(m_str) + stream = False + is_json = False + try: + stream = cmd.get_property_bool("stream") + except: + pass + + try: + is_json = cmd.get_property_bool("json") + except: + pass + + self.cmd_queue.put((messages, stream, is_json, cmd)) + return else: logger.info(f"BedrockLLMExtension on_cmd unknown cmd: {cmd_name}") cmd_result = CmdResult.create(StatusCode.ERROR) @@ -365,6 +406,92 @@ def converse_stream_worker(start_time, input_text, memory): thread.start() logger.info(f"BedrockLLMExtension on_data end") + def loop(self): + logger.info(f"starting loop {self.stopped}") + while not self.stopped: + c = self.cmd_queue.get() + if c is None: + break + + try: + messages, stream, is_json, cmd = c + try: + messages = self._convert_messages(messages) + logger.info(f"after convert {messages}") + resp = self.bedrock_llm.chat_completion_cmd(messages=messages, stream=stream, is_json=is_json) + if not stream: + output_message = resp['output']['message'] + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("response", output_message) + self.ten_env.return_result(cmd_result, cmd) + else: + stream = resp.get("stream") + status_code = StatusCode.OK + for event in stream: + if "contentBlockDelta" in event: + delta_types = event["contentBlockDelta"]["delta"].keys() + # ignore other types of content: e.g toolUse + if "text" in delta_types: + content = event["contentBlockDelta"]["delta"]["text"] + cmd_result = CmdResult.create(StatusCode.OK) + cmd_result.set_property_string("response", content) + cmd_result.set_is_final(False) + self.ten_env.return_result(cmd_result, cmd) + elif ( + "internalServerException" in event + or "modelStreamErrorException" in event + or "throttlingException" in event + or "validationException" in event + ): + logger.error(f"GetConverseStream Error occured: {event}") + status_code = StatusCode.ERROR + break + else: + # ingore other events + continue + + cmd_result = CmdResult.create(status_code) + cmd_result.set_property_string("response", "") + cmd_result.set_is_final(True) + self.ten_env.return_result(cmd_result, cmd) + except: + logger.exception("Failed to handle queue") + except: + logger.exception("failed") + + def _convert_messages(self, messages: List[Any]): + result = [] + logger.info(f"_convert_messages {messages}") + for message in messages: + parts = [] + #if message["role"] == "system": + # result.append(message) + # continue + content = message["content"] + if type(content) == list: + for part in content: + if part["type"] == "image_url": + # rewrite image + # part["type"] = "image" + origin = part["image_url"]["url"] + del part["image_url"] + partial = str(origin[23:]) + part["image"] = { + "format": "jpeg", + "source": { + "bytes": b64decode(partial.encode('utf-8')) + } + } + del part["type"] + parts.append(part) + elif type(content) == str: + parts.append({"text": content}) + + del message["content"] + message["content"] = parts + result.append(message) + logger.info(f"after _convert_messages {result}") + return result @register_addon_as_extension("bedrock_llm_python") class BedrockLLMExtensionAddon(Addon): diff --git a/agents/ten_packages/extension/vision_tool_python/extension.py b/agents/ten_packages/extension/vision_tool_python/extension.py index 21cd865e32..2507edeb5c 100644 --- a/agents/ten_packages/extension/vision_tool_python/extension.py +++ b/agents/ten_packages/extension/vision_tool_python/extension.py @@ -116,6 +116,7 @@ def rgb2base64jpeg(rgb_data: bytes, width: int, height: int, raw: bool = False) # Create the data URL mime_type = "image/jpeg" base64_url = f"data:{mime_type};base64,{base64_encoded_image}" + return base64_url class VisionToolExtension(Extension): @@ -341,14 +342,14 @@ def _chat_completion(self, args:dict, frames: List[str], ts: datetime) -> Cmd: raise Exception("Failed to get property") query = args["query"] - contents = [{"type": "text", "text": f"This is the image captured within {self.frequency_ms} ms at {ts}. {query}"}] + contents = [ + {"type": "text", "text": "You need to describe all the object in this image first, and then focus on the user's query. Keep your response short and simple unless the query ask you to."}, + {"type": "text", "text": f"This is the image captured within {self.frequency_ms} ms at {ts}. {query}"} + ] for f in frames: contents.append({"type": "image_url", "image_url": {"url": f}}) messages = [{ - "role": "system", - "content": "You need to describe all the object in this image first, and then focus on the user's query. Keep your response short and simple unless the query ask you to." - },{ "role": "user", "content": contents, }] From da7a39f3553fbf96b3fc74f42ab4ccd047a031a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E9=94=9F=E9=94=B4?= Date: Sat, 19 Oct 2024 16:16:47 +0800 Subject: [PATCH 05/10] update firestore tsdb --- agents/manifest-lock.json | 13 + .../extension/tsdb_firestore/BUILD.gn | 21 ++ .../extension/tsdb_firestore/README.md | 29 +++ .../extension/tsdb_firestore/__init__.py | 11 + .../extension/tsdb_firestore/addon.py | 22 ++ .../extension/tsdb_firestore/extension.py | 230 ++++++++++++++++++ .../extension/tsdb_firestore/log.py | 22 ++ .../extension/tsdb_firestore/manifest.json | 49 ++++ .../extension/tsdb_firestore/property.json | 1 + .../extension/tsdb_firestore/requirements.txt | 1 + 10 files changed, 399 insertions(+) create mode 100644 agents/ten_packages/extension/tsdb_firestore/BUILD.gn create mode 100644 agents/ten_packages/extension/tsdb_firestore/README.md create mode 100644 agents/ten_packages/extension/tsdb_firestore/__init__.py create mode 100644 agents/ten_packages/extension/tsdb_firestore/addon.py create mode 100644 agents/ten_packages/extension/tsdb_firestore/extension.py create mode 100644 agents/ten_packages/extension/tsdb_firestore/log.py create mode 100644 agents/ten_packages/extension/tsdb_firestore/manifest.json create mode 100644 agents/ten_packages/extension/tsdb_firestore/property.json create mode 100644 agents/ten_packages/extension/tsdb_firestore/requirements.txt diff --git a/agents/manifest-lock.json b/agents/manifest-lock.json index 67fe30ef22..2c1c704deb 100644 --- a/agents/manifest-lock.json +++ b/agents/manifest-lock.json @@ -1,6 +1,19 @@ { "version": 1, "packages": [ + { + "type": "extension", + "name": "default_extension_python", + "version": "0.2.0", + "hash": "e9a45737eb95cd9deca485f6d4274b46533a8346d59edf1e1b7c8cd29754a15b", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python" + } + ], + "supports": [] + }, { "type": "system", "name": "ten_runtime_go", diff --git a/agents/ten_packages/extension/tsdb_firestore/BUILD.gn b/agents/ten_packages/extension/tsdb_firestore/BUILD.gn new file mode 100644 index 0000000000..66830a25b5 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/BUILD.gn @@ -0,0 +1,21 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2022-11. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import("//build/feature/ten_package.gni") + +ten_package("tsdb_firestore") { + package_kind = "extension" + + resources = [ + "__init__.py", + "addon.py", + "extension.py", + "log.py", + "manifest.json", + "property.json", + ] +} diff --git a/agents/ten_packages/extension/tsdb_firestore/README.md b/agents/ten_packages/extension/tsdb_firestore/README.md new file mode 100644 index 0000000000..9fd6d38472 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/README.md @@ -0,0 +1,29 @@ +# tsdb_firestore + + + +## Features + + + +- xxx feature + +## API + +Refer to `api` definition in [manifest.json] and default values in [property.json](property.json). + + + +## Development + +### Build + + + +### Unit test + + + +## Misc + + diff --git a/agents/ten_packages/extension/tsdb_firestore/__init__.py b/agents/ten_packages/extension/tsdb_firestore/__init__.py new file mode 100644 index 0000000000..0f29620335 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/__init__.py @@ -0,0 +1,11 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from . import addon +from .log import logger + +logger.info("tsdb_firestore extension loaded") diff --git a/agents/ten_packages/extension/tsdb_firestore/addon.py b/agents/ten_packages/extension/tsdb_firestore/addon.py new file mode 100644 index 0000000000..b264634f7e --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/addon.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +from ten import ( + Addon, + register_addon_as_extension, + TenEnv, +) +from .extension import TSDBFirestoreExtension +from .log import logger + + +@register_addon_as_extension("tsdb_firestore") +class TSDBFirestoreExtensionAddon(Addon): + + def on_create_instance(self, ten_env: TenEnv, name: str, context) -> None: + logger.info("TSDBFirestoreExtensionAddon on_create_instance") + ten_env.on_create_instance_done(TSDBFirestoreExtension(name), context) diff --git a/agents/ten_packages/extension/tsdb_firestore/extension.py b/agents/ten_packages/extension/tsdb_firestore/extension.py new file mode 100644 index 0000000000..1ec042be9c --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/extension.py @@ -0,0 +1,230 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# + +from ten import ( + AudioFrame, + VideoFrame, + Extension, + TenEnv, + Cmd, + StatusCode, + CmdResult, + Data, +) +import firebase_admin +from firebase_admin import credentials +from firebase_admin import firestore +import datetime +import queue +import threading +import json +from .log import logger +from typing import List, Any + +DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" +DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" +DATA_IN_TEXT_DATA_PROPERTY_STREAM_ID = "stream_id" +PROPERTY_CREDENTIALS_PATH = "credentials_path" +PROPERTY_CHANNEL_NAME = "channel_name" +PROPERTY_COLLECTION_NAME = "collection_name" +PROPERTY_TTL = "ttl" + +RETRIEVE_CMD = "retrieve" +DOC_EXPIRE_PATH = "expireAt" +DOC_CONTENTS_PATH = "contents" + +def get_current_time(): + # Get the current time + start_time = datetime.datetime.now() + # Get the number of microseconds since the Unix epoch + unix_microseconds = int(start_time.timestamp() * 1_000_000) + return unix_microseconds + +def order_by_ts(contents: List[str]) -> List[str]: + tmp = [] + for c in contents: + tmp.append(json.loads(c)) + sorted_contents = sorted(tmp, key=lambda x: x["ts"]) + res = [] + for sc in sorted_contents: + res.append(json.dumps({"id": sc["id"], "input": sc["input"]})) + return res + +class TSDBFirestoreExtension(Extension): + def __init__(self, name: str): + super().__init__(name) + self.stopped = False + self.thread = None + self.queue = queue.Queue() + self.credentials_path = "" + self.channel_name = "" + self.collection_name = "" + self.client = None + self.document_ref = None + + def on_init(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_init") + ten_env.on_init_done() + + def on_start(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_start") + + try: + self.credentials_path = ten_env.get_property_string(PROPERTY_CREDENTIALS_PATH) + except Exception as err: + logger.info(f"GetProperty required {PROPERTY_CREDENTIALS_PATH} failed, err: {err}") + + try: + self.channel_name = ten_env.get_property_string(PROPERTY_CHANNEL_NAME) + except Exception as err: + logger.info(f"GetProperty required {PROPERTY_CHANNEL_NAME} failed, err: {err}") + + try: + self.collection_name = ten_env.get_property_string(PROPERTY_COLLECTION_NAME) + except Exception as err: + logger.info(f"GetProperty required {PROPERTY_COLLECTION_NAME} failed, err: {err}") + + try: + self.ttl = ten_env.get_property_int(PROPERTY_TTL) + except Exception as err: + logger.info(f"GetProperty required {PROPERTY_TTL} failed, err: {err}") + + # start firestore db + cred = credentials.Certificate(self.credentials_path) + firebase_admin.initialize_app(cred) + self.client = firestore.client() + + self.document_ref = self.client.collection(self.collection_name).document(self.channel_name) + # update ttl + expiration_time = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=self.ttl) + self.document_ref.update( + { + DOC_EXPIRE_PATH: expiration_time + } + ) + + self.thread = threading.Thread(target=self.async_handle, args=[ten_env]) + self.thread.start() + + ten_env.on_start_done() + + def async_handle(self, ten_env: TenEnv) -> None: + while not self.stopped: + try: + value = self.queue.get() + if value is None: + logger.info("exit handle loop") + break + ts, input, id = value + msg = {"id": id, "input": input, "ts": ts} + self.insert(ten_env, json.dumps(msg)) + except Exception as e: + logger.exception("Failed to store chat contents") + + def on_stop(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_stop") + self.stopped = True + while not self.queue.empty(): + self.queue.get() + self.queue.put(None) + if self.thread is not None: + self.thread.join() + self.thread = None + + ten_env.on_stop_done() + + def on_deinit(self, ten_env: TenEnv) -> None: + logger.info("TSDBFirestoreExtension on_deinit") + ten_env.on_deinit_done() + + def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: + try: + cmd_name = cmd.get_name() + logger.info("on_cmd name {}".format(cmd_name)) + if cmd_name == RETRIEVE_CMD: + self.retrieve(ten_env, cmd) + else: + logger.info("unknown cmd name {}".format(cmd_name)) + cmd_result = CmdResult.create(StatusCode.ERROR) + ten_env.return_result(cmd_result, cmd) + except Exception as e: + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + + def retrieve(self, ten_env: TenEnv, cmd: Cmd): + doc = self.document_ref.get() + if doc.exists: + doc_dict = doc.to_dict() + if DOC_CONTENTS_PATH in doc_dict: + contents = doc_dict[DOC_CONTENTS_PATH] + ret = CmdResult.create(StatusCode.OK) + ret.set_property_from_json("response", order_by_ts(contents)) + ten_env.return_result(ret, cmd) + else: + logger.info(f"no contents for the channel {self.channel_name}") + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + else: + logger.info(f"no corresponding document found for the channel {self.channel_name}") + ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) + + def insert(self, ten_env: TenEnv, content: str): + self.document_ref.update({ + DOC_CONTENTS_PATH: firestore.ArrayUnion([content]) + }) + logger.info(f"append {content} to firestore document {self.channel_name}") + + def on_data(self, ten_env: TenEnv, data: Data) -> None: + logger.info(f"TSDBFirestoreExtension on_data") + + # Assume 'data' is an object from which we can get properties + try: + is_final = data.get_property_bool(DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL) + if not is_final: + logger.info("ignore non-final input") + return + except Exception as err: + logger.info( + f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL} failed, err: {err}" + ) + return + + # Get input text + try: + input_text = data.get_property_string(DATA_IN_TEXT_DATA_PROPERTY_TEXT) + if not input_text: + logger.info("ignore empty text") + return + logger.info(f"OnData input text: [{input_text}]") + except Exception as err: + logger.info( + f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_TEXT} failed, err: {err}" + ) + return + + # Get input text + try: + stream_id = data.get_property_int(DATA_IN_TEXT_DATA_PROPERTY_STREAM_ID) + if not stream_id: + logger.warning("ignore empty stream_id") + return + except Exception as err: + logger.info( + f"OnData GetProperty {DATA_IN_TEXT_DATA_PROPERTY_STREAM_ID} failed, err: {err}" + ) + return + + ts = get_current_time + self.queue.put((ts, input_text, stream_id)) + + + + def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: + pass + + def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None: + pass diff --git a/agents/ten_packages/extension/tsdb_firestore/log.py b/agents/ten_packages/extension/tsdb_firestore/log.py new file mode 100644 index 0000000000..aa14bacd07 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/log.py @@ -0,0 +1,22 @@ +# +# +# Agora Real Time Engagement +# Created by Wei Hu in 2024-08. +# Copyright (c) 2024 Agora IO. All rights reserved. +# +# +import logging + +logger = logging.getLogger("tsdb_firestore") +logger.setLevel(logging.INFO) + +formatter_str = ( + "%(asctime)s - %(name)s - %(levelname)s - %(process)d - " + "[%(filename)s:%(lineno)d] - %(message)s" +) +formatter = logging.Formatter(formatter_str) + +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) + +logger.addHandler(console_handler) diff --git a/agents/ten_packages/extension/tsdb_firestore/manifest.json b/agents/ten_packages/extension/tsdb_firestore/manifest.json new file mode 100644 index 0000000000..0a678926d5 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/manifest.json @@ -0,0 +1,49 @@ +{ + "type": "extension", + "name": "tsdb_firestore", + "version": "0.1.0", + "dependencies": [ + { + "type": "system", + "name": "ten_runtime_python", + "version": "0.2" + } + ], + "package": { + "include": [ + "manifest.json", + "property.json", + "BUILD.gn", + "**.tent", + "**.py", + "README.md" + ] + }, + "api": { + "data_in": [ + { + "name": "text_data", + "property": { + "text": { + "type": "string" + } + } + } + ], + "cmd_in": [ + { + "name": "retrieve", + "result": { + "property": { + "response": { + "type": "array", + "items": { + "type": "string" + } + } + } + } + } + ] + } +} \ No newline at end of file diff --git a/agents/ten_packages/extension/tsdb_firestore/property.json b/agents/ten_packages/extension/tsdb_firestore/property.json new file mode 100644 index 0000000000..9e26dfeeb6 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/property.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/agents/ten_packages/extension/tsdb_firestore/requirements.txt b/agents/ten_packages/extension/tsdb_firestore/requirements.txt new file mode 100644 index 0000000000..4720fc6ff6 --- /dev/null +++ b/agents/ten_packages/extension/tsdb_firestore/requirements.txt @@ -0,0 +1 @@ +firebase-admin \ No newline at end of file From 14a34e3ecce562e0c9fb72eb801cf967ff3e2202 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E9=94=9F=E9=94=B4?= Date: Sat, 19 Oct 2024 17:02:22 +0800 Subject: [PATCH 06/10] fix --- .../extension/tsdb_firestore/extension.py | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/agents/ten_packages/extension/tsdb_firestore/extension.py b/agents/ten_packages/extension/tsdb_firestore/extension.py index 1ec042be9c..8404819589 100644 --- a/agents/ten_packages/extension/tsdb_firestore/extension.py +++ b/agents/ten_packages/extension/tsdb_firestore/extension.py @@ -29,16 +29,18 @@ DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" DATA_IN_TEXT_DATA_PROPERTY_STREAM_ID = "stream_id" + PROPERTY_CREDENTIALS_PATH = "credentials_path" PROPERTY_CHANNEL_NAME = "channel_name" PROPERTY_COLLECTION_NAME = "collection_name" PROPERTY_TTL = "ttl" RETRIEVE_CMD = "retrieve" +CMD_OUT_PROPERTY_RESPONSE = "response" DOC_EXPIRE_PATH = "expireAt" DOC_CONTENTS_PATH = "contents" -def get_current_time(): +def get_current_time(tz=""): # Get the current time start_time = datetime.datetime.now() # Get the number of microseconds since the Unix epoch @@ -102,11 +104,20 @@ def on_start(self, ten_env: TenEnv) -> None: self.document_ref = self.client.collection(self.collection_name).document(self.channel_name) # update ttl expiration_time = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=self.ttl) - self.document_ref.update( - { - DOC_EXPIRE_PATH: expiration_time - } - ) + exists = self.document_ref.get().exists + if exists: + self.document_ref.update( + { + DOC_EXPIRE_PATH: expiration_time + } + ) + else: + # not exists yet, set to create one + self.document_ref.set( + { + DOC_EXPIRE_PATH: expiration_time + } + ) self.thread = threading.Thread(target=self.async_handle, args=[ten_env]) self.thread.start() @@ -163,7 +174,7 @@ def retrieve(self, ten_env: TenEnv, cmd: Cmd): if DOC_CONTENTS_PATH in doc_dict: contents = doc_dict[DOC_CONTENTS_PATH] ret = CmdResult.create(StatusCode.OK) - ret.set_property_from_json("response", order_by_ts(contents)) + ret.set_property_from_json(CMD_OUT_PROPERTY_RESPONSE, order_by_ts(contents)) ten_env.return_result(ret, cmd) else: logger.info(f"no contents for the channel {self.channel_name}") @@ -220,8 +231,6 @@ def on_data(self, ten_env: TenEnv, data: Data) -> None: ts = get_current_time self.queue.put((ts, input_text, stream_id)) - - def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None: pass From 4d00bdd1001d819b25fb7391ce7db2afc1d308f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E9=94=9F=E9=94=B4?= Date: Sat, 19 Oct 2024 17:38:49 +0800 Subject: [PATCH 07/10] add firestore txn --- .../extension/tsdb_firestore/extension.py | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/agents/ten_packages/extension/tsdb_firestore/extension.py b/agents/ten_packages/extension/tsdb_firestore/extension.py index 8404819589..05cb929d5c 100644 --- a/agents/ten_packages/extension/tsdb_firestore/extension.py +++ b/agents/ten_packages/extension/tsdb_firestore/extension.py @@ -24,7 +24,7 @@ import threading import json from .log import logger -from typing import List, Any +from typing import List DATA_IN_TEXT_DATA_PROPERTY_IS_FINAL = "is_final" DATA_IN_TEXT_DATA_PROPERTY_TEXT = "text" @@ -40,7 +40,7 @@ DOC_EXPIRE_PATH = "expireAt" DOC_CONTENTS_PATH = "contents" -def get_current_time(tz=""): +def get_current_time(): # Get the current time start_time = datetime.datetime.now() # Get the number of microseconds since the Unix epoch @@ -56,7 +56,18 @@ def order_by_ts(contents: List[str]) -> List[str]: for sc in sorted_contents: res.append(json.dumps({"id": sc["id"], "input": sc["input"]})) return res - + +@firestore.transactional +def update_in_transaction(transaction, doc_ref, content): + transaction.update(doc_ref, content) + +@firestore.transactional +def read_in_transaction(transaction, doc_ref): + doc = doc_ref.get(transaction=transaction) + if doc.exists: + return doc.to_dict() + return None + class TSDBFirestoreExtension(Extension): def __init__(self, name: str): super().__init__(name) @@ -168,9 +179,8 @@ def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None: def retrieve(self, ten_env: TenEnv, cmd: Cmd): - doc = self.document_ref.get() - if doc.exists: - doc_dict = doc.to_dict() + doc_dict = read_in_transaction(self.client.transaction(), self.document_ref) + if doc_dict is not None: if DOC_CONTENTS_PATH in doc_dict: contents = doc_dict[DOC_CONTENTS_PATH] ret = CmdResult.create(StatusCode.OK) @@ -184,9 +194,13 @@ def retrieve(self, ten_env: TenEnv, cmd: Cmd): ten_env.return_result(CmdResult.create(StatusCode.ERROR), cmd) def insert(self, ten_env: TenEnv, content: str): - self.document_ref.update({ - DOC_CONTENTS_PATH: firestore.ArrayUnion([content]) - }) + update_in_transaction( + self.client.transaction(), + self.document_ref, + { + DOC_CONTENTS_PATH: firestore.ArrayUnion([content]) + } + ) logger.info(f"append {content} to firestore document {self.channel_name}") def on_data(self, ten_env: TenEnv, data: Data) -> None: From 7c910a40a55f4e4c9c7c768308f85e61090a43f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E9=94=9F=E9=94=B4?= Date: Sat, 19 Oct 2024 18:14:48 +0800 Subject: [PATCH 08/10] abstract path name --- .../ten_packages/extension/tsdb_firestore/extension.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/agents/ten_packages/extension/tsdb_firestore/extension.py b/agents/ten_packages/extension/tsdb_firestore/extension.py index 05cb929d5c..da82bcb570 100644 --- a/agents/ten_packages/extension/tsdb_firestore/extension.py +++ b/agents/ten_packages/extension/tsdb_firestore/extension.py @@ -39,6 +39,9 @@ CMD_OUT_PROPERTY_RESPONSE = "response" DOC_EXPIRE_PATH = "expireAt" DOC_CONTENTS_PATH = "contents" +CONTENT_ID_PATH = "id" +CONTENT_TS_PATH = "ts" +CONTENT_INPUT_PATH = "input" def get_current_time(): # Get the current time @@ -51,10 +54,10 @@ def order_by_ts(contents: List[str]) -> List[str]: tmp = [] for c in contents: tmp.append(json.loads(c)) - sorted_contents = sorted(tmp, key=lambda x: x["ts"]) + sorted_contents = sorted(tmp, key=lambda x: x[CONTENT_TS_PATH]) res = [] for sc in sorted_contents: - res.append(json.dumps({"id": sc["id"], "input": sc["input"]})) + res.append(json.dumps({CONTENT_ID_PATH: sc[CONTENT_ID_PATH], CONTENT_INPUT_PATH: sc[CONTENT_INPUT_PATH]})) return res @firestore.transactional @@ -143,7 +146,7 @@ def async_handle(self, ten_env: TenEnv) -> None: logger.info("exit handle loop") break ts, input, id = value - msg = {"id": id, "input": input, "ts": ts} + msg = {g} self.insert(ten_env, json.dumps(msg)) except Exception as e: logger.exception("Failed to store chat contents") From 76e5dedd7efe225b07b19b15e31fa7c9b5d451a5 Mon Sep 17 00:00:00 2001 From: TomasLiu Date: Fri, 25 Oct 2024 09:36:23 +0800 Subject: [PATCH 09/10] update graph --- .env.example | 4 + agents/manifest-lock.json | 13 - agents/property.json | 319 ++++++++++++++++++++++++ demo/src/app/api/agents/start/graph.tsx | 2 +- demo/src/common/constant.ts | 4 + 5 files changed, 328 insertions(+), 14 deletions(-) diff --git a/.env.example b/.env.example index 70a544aeab..f78c214190 100644 --- a/.env.example +++ b/.env.example @@ -67,6 +67,10 @@ ELEVENLABS_TTS_KEY= # Gemini API key GEMINI_API_KEY= +# Extension: azure_vision_python +# Azure vision key +AZURE_VISION_KEY= + # Extension: litellm # Using Environment Variables, refer to https://docs.litellm.ai/docs/providers # For example: diff --git a/agents/manifest-lock.json b/agents/manifest-lock.json index 2c1c704deb..67fe30ef22 100644 --- a/agents/manifest-lock.json +++ b/agents/manifest-lock.json @@ -1,19 +1,6 @@ { "version": 1, "packages": [ - { - "type": "extension", - "name": "default_extension_python", - "version": "0.2.0", - "hash": "e9a45737eb95cd9deca485f6d4274b46533a8346d59edf1e1b7c8cd29754a15b", - "dependencies": [ - { - "type": "system", - "name": "ten_runtime_python" - } - ], - "supports": [] - }, { "type": "system", "name": "ten_runtime_go", diff --git a/agents/property.json b/agents/property.json index 12c36d2991..ac7e525ac0 100644 --- a/agents/property.json +++ b/agents/property.json @@ -2659,6 +2659,325 @@ ] } ] + }, + { + "name": "va.openai.v2v.vision", + "auto_start": false, + "nodes": [ + { + "type": "extension", + "extension_group": "rtc", + "addon": "agora_rtc", + "name": "agora_rtc", + "property": { + "app_id": "${env:AGORA_APP_ID}", + "token": "", + "channel": "ten_agent_test", + "stream_id": 1234, + "remote_stream_id": 123, + "subscribe_audio": true, + "subscribe_video": true, + "publish_audio": true, + "publish_data": true, + "subscribe_audio_sample_rate": 24000 + } + }, + { + "type": "extension", + "extension_group": "llm", + "addon": "openai_v2v_python", + "name": "openai_v2v_python", + "property": { + "api_key": "${env:OPENAI_REALTIME_API_KEY}", + "temperature": 0.9, + "model": "gpt-4o-realtime-preview", + "max_tokens": 2048, + "voice": "alloy", + "language": "en-US", + "server_vad": true, + "dump": true, + "history": 10 + } + }, + { + "type": "extension", + "extension_group": "transcriber", + "addon": "message_collector", + "name": "message_collector" + }, + { + "type": "extension", + "extension_group": "tools", + "addon": "weatherapi_tool_python", + "name": "weatherapi_tool_python", + "property": { + "api_key": "${env:WEATHERAPI_API_KEY}" + } + }, + { + "type": "extension", + "extension_group": "tools", + "addon": "bingsearch_tool_python", + "name": "bingsearch_tool_python", + "property": { + "api_key": "${env:BING_API_KEY}" + } + }, + { + "type": "extension", + "extension_group": "tools", + "addon": "vision_tool_python", + "name": "vision_tool_python", + "property": { + "frequency_ms": 1000, + "use_llm": false + } + }, + { + "type": "extension", + "extension_group": "azure_vision", + "addon": "azure_vision_python", + "name": "azure_vision_python", + "property": { + "key": "${env:AZURE_VISION_KEY}", + "endpoint": "https://tenagentvision.cognitiveservices.azure.com/" + } + } + ], + "connections": [ + { + "extension_group": "rtc", + "extension": "agora_rtc", + "audio_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension_group": "llm", + "extension": "openai_v2v_python" + } + ] + } + ], + "video_frame": [ + { + "name": "video_frame", + "dest": [ + { + "extension_group": "tools", + "extension": "vision_tool_python" + } + ] + } + ] + }, + { + "extension_group": "tools", + "extension": "vision_tool_python", + "cmd": [ + { + "name": "tool_register", + "dest": [ + { + "extension_group": "llm", + "extension": "openai_v2v_python" + } + ] + }, + { + "name": "image_analyze", + "dest": [ + { + "extension_group": "azure_vision", + "extension": "azure_vision_python" + } + ] + } + ] + }, + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "cmd": [ + { + "name": "tool_register", + "dest": [ + { + "extension_group": "llm", + "extension": "openai_v2v_python" + } + ] + } + ] + }, + { + "extension_group": "tools", + "extension": "bingsearch_tool_python", + "cmd": [ + { + "name": "tool_register", + "dest": [ + { + "extension_group": "llm", + "extension": "openai_v2v_python" + } + ] + } + ] + }, + { + "extension_group": "llm", + "extension": "openai_v2v_python", + "audio_frame": [ + { + "name": "pcm_frame", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + } + ], + "data": [ + { + "name": "text_data", + "dest": [ + { + "extension_group": "transcriber", + "extension": "message_collector" + } + ] + } + ], + "cmd": [ + { + "name": "flush", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + }, + { + "name": "tool_call_get_current_weather", + "dest": [ + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_get_past_weather", + "dest": [ + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_get_future_weather", + "dest": [ + { + "extension_group": "tools", + "extension": "weatherapi_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_bing_search", + "dest": [ + { + "extension_group": "tools", + "extension": "bingsearch_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + }, + { + "name": "tool_call_query_single_image", + "dest": [ + { + "extension_group": "tools", + "extension": "vision_tool_python", + "msg_conversion": { + "type": "per_property", + "keep_original": true, + "rules": [ + { + "path": "_ten.name", + "conversion_mode": "fixed_value", + "value": "tool_call" + } + ] + } + } + ] + } + ] + }, + { + "extension_group": "transcriber", + "extension": "message_collector", + "data": [ + { + "name": "data", + "dest": [ + { + "extension_group": "rtc", + "extension": "agora_rtc" + } + ] + } + ] + } + ] } ] } diff --git a/demo/src/app/api/agents/start/graph.tsx b/demo/src/app/api/agents/start/graph.tsx index f01d6d95b9..beb5694aeb 100644 --- a/demo/src/app/api/agents/start/graph.tsx +++ b/demo/src/app/api/agents/start/graph.tsx @@ -105,7 +105,7 @@ export const getGraphProperties = ( "azure_synthesis_voice_name": voiceNameMap[language]["azure"][voiceType] } } - } else if (graphName == "va.openai.v2v") { + } else if (graphName == "va.openai.v2v" || graphName == "va.openai.v2v.vision") { return { "openai_v2v_python": { "model": "gpt-4o-realtime-preview", diff --git a/demo/src/common/constant.ts b/demo/src/common/constant.ts index 00fe177d2c..bbee7e82a9 100644 --- a/demo/src/common/constant.ts +++ b/demo/src/common/constant.ts @@ -48,6 +48,10 @@ export const GRAPH_OPTIONS: GraphOptionItem[] = [ { label: "Voice Agent with OpenAI Realtime API (Beta)", value: "va.openai.v2v" + }, + { + label: "Voice Agent with OpenAI Realtime API (Beta) with Vision", + value: "va.openai.v2v.vision" } ] From 1169b4dfd1196dc82915fae9cf5792eb26364e88 Mon Sep 17 00:00:00 2001 From: TomasLiu Date: Fri, 25 Oct 2024 10:09:16 +0800 Subject: [PATCH 10/10] fix --- agents/property.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/agents/property.json b/agents/property.json index ac7e525ac0..aae833f8d8 100644 --- a/agents/property.json +++ b/agents/property.json @@ -2679,7 +2679,8 @@ "subscribe_video": true, "publish_audio": true, "publish_data": true, - "subscribe_audio_sample_rate": 24000 + "subscribe_audio_sample_rate": 24000, + "subscribe_video_pix_fmt": 4 } }, {