diff --git a/requirements.txt b/requirements.txt index b579af8..48eaa9a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,4 @@ systembridgeshared==4.0.7 typer==0.12.5 uvicorn[standard]==0.30.6 zeroconf==0.133.0 +monitorcontrol==3.1.0 diff --git a/systembridgebackend/const.py b/systembridgebackend/const.py index 3612635..4ddcd78 100644 --- a/systembridgebackend/const.py +++ b/systembridgebackend/const.py @@ -85,6 +85,8 @@ TYPE_SETTINGS_RESULT = "SETTINGS_RESULT" TYPE_UNREGISTER_DATA_LISTENER = "UNREGISTER_DATA_LISTENER" TYPE_UPDATE_SETTINGS = "UPDATE_SETTINGS" +TYPE_DISPLAY_UPDATE_SETTING = "DISPLAY_UPDATE_SETTING" +TYPE_DISPLAY_SETTING_UPDATED = "DISPLAY_SETTING_UPDATED" # Event Subtypes SUBTYPE_BAD_DIRECTORY = "BAD_DIRECTORY" @@ -108,4 +110,5 @@ SUBTYPE_MISSING_TITLE = "MISSING_TITLE" SUBTYPE_MISSING_TOKEN = "MISSING_TOKEN" SUBTYPE_MISSING_VALUE = "MISSING_VALUE" +SUBTYPE_OPERATION_FAILED = "OPERATION_FAILED" SUBTYPE_UNKNOWN_EVENT = "UNKNOWN_EVENT" diff --git a/systembridgebackend/handlers/data.py b/systembridgebackend/handlers/data.py index f5c9af3..42264d9 100644 --- a/systembridgebackend/handlers/data.py +++ b/systembridgebackend/handlers/data.py @@ -1,6 +1,7 @@ """Data.""" from collections.abc import Awaitable, Callable +from queue import Queue from typing import Any from systembridgemodels.modules import ModulesData @@ -11,45 +12,80 @@ class DataUpdate(Base): - """Data Update.""" + """ + The update threads scheduler, managing data update thread and media update thread. + Also holds a reference to the collected data. + """ def __init__( self, - updated_callback: Callable[[str], Awaitable[None]], + updated_callback: Callable[[str], Awaitable[None]] ) -> None: - """Initialise.""" + """ + Initialize a new instance of DataUpdate. + + :param updated_callback: The callback to be invoked when data is updated. + """ super().__init__() self._updated_callback = updated_callback self.update_data_thread: DataUpdateThread | None = None self.update_media_thread: MediaUpdateThread | None = None + self.update_data_queue: Queue[dict] = Queue() + self.update_media_queue: Queue[dict] = Queue() self.data = ModulesData() + """Data collected by the system bridge""" async def _data_updated_callback( self, name: str, data: Any, ) -> None: - """Update the data with the given name and value, and invoke the updated callback.""" + """ + Update collected data of given module, then invoke :field:`_updated_callback`. + + :param name: module name triggering the update. should be any field names of :class:`ModulesData`. + :param data: The dataclass object to be assigned to given module. + """ setattr(self.data, name, data) await self._updated_callback(name) - def request_update_data(self) -> None: - """Request update data.""" + def request_update_data(self, **kwargs) -> None: + """ + Trigger data update by enqueueing `kwargs` to :field:`update_data_queue`, + these args will be passed to `DataUpdateThread.update`, + and then to `ModulesUpdate.update_data`. + + will start :field:`update_data_thread` if necessary. + + :param kwargs: The parameters to be passed into `ModulesUpdate.update_data`. + """ + if self.update_data_thread is not None and self.update_data_thread.is_alive(): - self._logger.info("Update data thread already running") + self._logger.warning("Force update data with params: %s", kwargs) + self.update_data_queue.put_nowait(kwargs) return self._logger.info("Starting update data thread..") - self.update_data_thread = DataUpdateThread(self._data_updated_callback) + self.update_data_thread = DataUpdateThread(self._data_updated_callback, self.update_data_queue) self.update_data_thread.start() + if kwargs: + self.update_data_queue.put_nowait(kwargs) def request_update_media_data(self) -> None: - """Request update media data.""" + """ + Trigger media update by enqueueing `kwargs` to :field:`update_media_queue`, + these args will be passed to `MediaUpdateThread.update`, + and then to `Media.update_media_info`. + + will start :field:`update_media_thread` if necessary. + + :param kwargs: The parameters to be passed into `Media.update_media_info`. + """ if self.update_media_thread is not None and self.update_media_thread.is_alive(): self._logger.info("Update media thread already running") return self._logger.info("Starting update media thread..") - self.update_media_thread = MediaUpdateThread(self._data_updated_callback) + self.update_media_thread = MediaUpdateThread(self._data_updated_callback, self.update_media_queue) self.update_media_thread.start() diff --git a/systembridgebackend/handlers/display.py b/systembridgebackend/handlers/display.py new file mode 100644 index 0000000..7cc0fd2 --- /dev/null +++ b/systembridgebackend/handlers/display.py @@ -0,0 +1,99 @@ +"""display control DDC/CI handlers.""" +from monitorcontrol.monitorcontrol import get_monitors + +from systembridgemodels.modules.displays import InputSource, PowerMode + +from ..modules.displays import vcpcode_volume + + +def set_brightness( + monitor_id: int, + brightness: int, +) -> None: + """ + Set the brightness of a monitor. + + Args: + monitor_id (int): The ID of the monitor to set the brightness for. + brightness (int): The new brightness level, on a scale from 0 to 100. + + Raises: + ValueError: If the brightness value is not in the range [0, 100]. + """ + monitors = get_monitors() + with monitors[monitor_id] as monitor: + monitor.set_luminance(brightness) + + +def set_contrast( + monitor_id: int, + contrast: int, +) -> None: + """ + Set the contrast of a monitor. + + Args: + monitor_id (int): The ID of the monitor to set the contrast for. + contrast (int): The new contrast level, on a scale from 0 to 100. + + Raises: + ValueError: If the contrast value is not in the range [0, 100]. + """ + monitors = get_monitors() + with monitors[monitor_id] as monitor: + monitor.set_contrast(contrast) + + +def set_volume( + monitor_id: int, + volume: int, +) -> None: + """ + Set the volume of a monitor. + + Args: + monitor_id (int): The ID of the monitor to set the volume for. + volume (int): The new volume level, on a scale from 0 to 100. + + Raises: + ValueError: If the volume value is not in the range [0, 100]. + """ + monitors = get_monitors() + with monitors[monitor_id] as monitor: + monitor._set_vcp_feature(vcpcode_volume, volume) # pylint: disable=protected-access + + +def set_power_state( + monitor_id: int, + power_state: PowerMode | int | str, +) -> None: + """ + Set the power state of a monitor. + + Args: + monitor_id (int): The ID of the monitor to set the power state for. + power_state (int | str | PowerMode): The new power state, can be an integer, + a string representing the power mode, or a `PowerMode` enum value. + """ + monitors = get_monitors() + with monitors[monitor_id] as monitor: + monitor.set_power_mode(power_state) + + +def set_input_source( + monitor_id: int, + input_source: InputSource | int | str, +) -> None: + """ + Set the input source of a monitor. + + Args: + monitor_id: The ID of the monitor to set the input source for. + input_source: The new input source, which can be an integer, string, or `InputSource` enum value. + + Raises: + ValueError: If the input source is not recognized by the monitor. + """ + monitors = get_monitors() + with monitors[monitor_id] as monitor: + monitor.set_input_source(input_source) diff --git a/systembridgebackend/handlers/threads/__init__.py b/systembridgebackend/handlers/threads/__init__.py index d1778bb..c501ee6 100644 --- a/systembridgebackend/handlers/threads/__init__.py +++ b/systembridgebackend/handlers/threads/__init__.py @@ -23,12 +23,18 @@ def run(self) -> None: """Run.""" raise NotImplementedError - def join( + def interrupt( self, timeout: float | None = None, ) -> None: - """Join.""" - self._logger.info("Stopping thread") + """ + Interrupt thread running by setting the `self.stopping` flag. + Child classes should check `self.stopping` in its `run()` implementation + to support this feature. + + Should be called instead of `BaseThread.join()`. + """ + self._logger.info("Interrupting %s", self.__class__.__name__) self.stopping = True loop = asyncio.get_event_loop() asyncio.tasks.all_tasks(loop).clear() diff --git a/systembridgebackend/handlers/threads/data.py b/systembridgebackend/handlers/threads/data.py index 5c44ac6..e89a470 100644 --- a/systembridgebackend/handlers/threads/data.py +++ b/systembridgebackend/handlers/threads/data.py @@ -1,6 +1,7 @@ """Data update thread handler.""" from collections.abc import Awaitable, Callable +from queue import Queue from typing import Any, Final, override from ...modules import ModulesUpdate @@ -15,15 +16,16 @@ class DataUpdateThread(UpdateThread): def __init__( self, updated_callback: Callable[[str, Any], Awaitable[None]], + update_queue: Queue[str], ) -> None: """Initialise.""" - super().__init__(UPDATE_INTERVAL) + super().__init__(UPDATE_INTERVAL, update_queue) self._update_cls = ModulesUpdate(updated_callback) @override - async def update(self) -> None: + async def update(self, modules=None) -> None: """Update.""" if self.stopping: return - await self._update_cls.update_data() + await self._update_cls.update_data(modules=modules) diff --git a/systembridgebackend/handlers/threads/media.py b/systembridgebackend/handlers/threads/media.py index fd44379..afdb709 100644 --- a/systembridgebackend/handlers/threads/media.py +++ b/systembridgebackend/handlers/threads/media.py @@ -3,6 +3,7 @@ from collections.abc import Awaitable, Callable import datetime import platform +from queue import Queue from typing import Final, override from systembridgemodels.modules.media import Media as MediaInfo @@ -18,9 +19,10 @@ class MediaUpdateThread(UpdateThread): def __init__( self, updated_callback: Callable[[str, MediaInfo], Awaitable[None]], + update_queue: Queue[str], ) -> None: """Initialise.""" - super().__init__(UPDATE_INTERVAL) + super().__init__(UPDATE_INTERVAL, update_queue) self._updated_callback = updated_callback if platform.system() != "Windows": diff --git a/systembridgebackend/handlers/threads/update.py b/systembridgebackend/handlers/threads/update.py index 32b7412..c74847d 100644 --- a/systembridgebackend/handlers/threads/update.py +++ b/systembridgebackend/handlers/threads/update.py @@ -2,6 +2,7 @@ import asyncio from datetime import datetime, timedelta +from queue import Queue, Empty import threading import time from typing import override @@ -15,40 +16,55 @@ class UpdateThread(BaseThread): def __init__( self, interval: int, + update_queue: Queue[dict], ) -> None: """Initialise.""" super().__init__() self.interval = interval self.next_run: datetime = datetime.now() self._thread: threading.Thread | None = None + self.update_queue = update_queue - def _run(self) -> None: - """Automatically update the schedule.""" - while not self.stopping: - # Wait for the next run - if self.next_run > datetime.now(): - interval = self.next_run.timestamp() - datetime.now().timestamp() - self._logger.info( - "Waiting for next update in %s seconds", round(interval, 2) - ) - time.sleep(interval) - - if self.stopping: - return - - # Update the next run before running the update - self.update_next_run() - - # Run the update - try: - asyncio.new_event_loop().run_until_complete(self.update()) - except Exception as exception: # pylint: disable=broad-except - self._logger.exception(exception) - - if self.stopping: - return - - self._logger.info("Update finished, next run will be at: %s", self.next_run) + @override + def run(self) -> None: + """ + Run update loop. Trigger update on interval or when a signal from `update_queue` is received. + """ + try: + while not self.stopping: + update_triggered_by_signal = False + update_params = {} + + # Wait for the next run + if (sleep_time := (self.next_run - datetime.now()).total_seconds()) > 0: + self._logger.info("Waiting for next update in %.2f seconds", sleep_time) + try: + update_params = self.update_queue.get(block=True, timeout=sleep_time) + self._logger.debug("Update triggered by force update, with params: %s", update_params) + # Update is triggered by signal from another thread + update_triggered_by_signal = True + except Empty: + self._logger.debug("Update triggered by time interval") + + if self.stopping: + return + + if not update_triggered_by_signal: + # Update the next run before running the update + self.update_next_run() + + # Run the update + try: + asyncio.new_event_loop().run_until_complete(self.update(**update_params)) + except Exception as exception: # pylint: disable=broad-except + self._logger.exception(exception) + + if self.stopping: + return + + self._logger.info("Update finished, next run will be at: %s", self.next_run) + finally: + self._logger.info("%s Stopped", self.__class__.__name__) def _update_interval( self, @@ -61,38 +77,13 @@ def _update_interval( self.interval = interval self._logger.info("Updated update interval to: %s", self.interval) - @override - def run(self) -> None: - """Run.""" - # Start the automatic update in a separate thread - self._thread = threading.Thread(target=self._run) - self._thread.start() - self._logger.info("Started update thread") - - def join(self, timeout=8) -> None: - """Stop the automatic update.""" - self.stopping = True - - # Stop the automatic update thread if it is running - if self._thread and self._thread.is_alive(): - self._thread.join(timeout=4) - self._logger.info("Stopped update thread") - - super().join(timeout) - async def update(self) -> None: - """Update.""" + """The actual data/media update function, should be implemented in subclasses.""" raise NotImplementedError def update_next_run(self) -> None: - """Update next run.""" + """Update `self.next_run` to be the current time plus the update interval.""" if self.stopping: return - - # Log how long the update took - time_taken = datetime.now() - self.next_run - self._logger.info("Update took %s seconds", round(time_taken.seconds, 2)) - - # Update the next run time to be the current time plus the interval self.next_run = datetime.now() + timedelta(seconds=self.interval) self._logger.info("Scheduled next update for: %s", self.next_run) diff --git a/systembridgebackend/modules/__init__.py b/systembridgebackend/modules/__init__.py index d0f3494..add44ab 100644 --- a/systembridgebackend/modules/__init__.py +++ b/systembridgebackend/modules/__init__.py @@ -3,6 +3,8 @@ from asyncio import Task from collections.abc import Awaitable, Callable from dataclasses import dataclass +from queue import Queue +import time from typing import Any from systembridgeshared.base import Base @@ -52,7 +54,7 @@ def __init__( super().__init__() self._updated_callback = updated_callback - self._classes: list[ModuleClass] = [ + self._classes: dict[str, ModuleClass] = dict((m.name, m) for m in [ ModuleClass(name="system", cls=SystemUpdate()), ModuleClass(name="battery", cls=BatteryUpdate()), ModuleClass(name="cpu", cls=CPUUpdate()), @@ -62,13 +64,14 @@ def __init__( ModuleClass(name="memory", cls=MemoryUpdate()), ModuleClass(name="networks", cls=NetworksUpdate()), ModuleClass(name="processes", cls=ProcessesUpdate()), - ] + ]) self.tasks: dict[str, Task] = {} async def update_module(self, module_class: ModuleClass) -> None: """Update Module.""" - self._logger.info("Request update module: %s", module_class.name) + time_update_start = time.perf_counter() + self._logger.debug("Start update module: %s", module_class.name) try: module_data = await module_class.cls.update_all_data() @@ -79,16 +82,26 @@ async def update_module(self, module_class: ModuleClass) -> None: module_class.name, exc_info=exception, ) + self._logger.info( + "Module updated: %s time=%0.3fs", + module_class.name, + time.perf_counter() - time_update_start, + ) - async def update_data(self) -> None: + async def update_data(self, modules: list[str] | None = None) -> None: """Update Data.""" - self._logger.info("Update data") + self._logger.info("Update data, modules=%s", modules or 'ALL') sensors_update = SensorsUpdate() sensors_data = await sensors_update.update_all_data() await self._updated_callback("sensors", sensors_data) - for module_class in self._classes: + if modules: + classes = (self._classes[cls] for cls in modules) + else: + classes = self._classes.values() + + for module_class in classes: # If the class has a sensors attribute, set it if hasattr(module_class.cls, "sensors"): module_class.cls.sensors = sensors_data @@ -98,6 +111,7 @@ async def update_data(self) -> None: module_class.name in self.tasks and not self.tasks[module_class.name].done() ): + self._logger.debug("Skip already running task %s", module_class.name) continue # Start the task diff --git a/systembridgebackend/modules/displays.py b/systembridgebackend/modules/displays.py index 4f868ea..a0b0d32 100644 --- a/systembridgebackend/modules/displays.py +++ b/systembridgebackend/modules/displays.py @@ -1,7 +1,10 @@ """Displays.""" -from typing import override +from typing import NamedTuple, override +from monitorcontrol.monitorcontrol import get_monitors as vcp_get_monitors +from monitorcontrol.vcp import VCPCode +from monitorcontrol.vcp.vcp_abc import VCPError from screeninfo import ScreenInfoError, get_monitors from systembridgemodels.modules.displays import Display @@ -10,6 +13,41 @@ from .base import ModuleUpdateBase +class CustomVCPCode(VCPCode): + """ + Subclass `VCPCode` to allow for custom VCP code definitions. + + Args: + definition (dict): A dictionary containing the following keys: + - name (str): The name of the VCP code. + - value (int): The value of the VCP code. usually a hexadecimal value. + - type (str): The type of the VCP code. Can be "rw" (read-write) or "ro" (read-only). + - function (str): The function of the VCP code. Can be "c" (Continuous), or "nc" (Non-continuous). + """ + def __init__(self, definition: dict): # pylint: disable=super-init-not-called + self.definition = definition + + +class VCPMonitorInfo(NamedTuple): + """ + A named tuple to hold monitor information. + """ + vcp_supported: bool + brightness: int = None + contrast: int = None + volume: int = None + power_state: int = None + input_source: int = None + + +vcpcode_volume = CustomVCPCode({ + "name": "audio speaker volume", + "value": 0x62, + "type": "rw", + "function": "c", +}) + + class DisplaysUpdate(ModuleUpdateBase): """Displays Update.""" @@ -17,6 +55,7 @@ def __init__(self) -> None: """Initialise.""" super().__init__() self.sensors: Sensors | None = None + self.vcp_monitor_blacklist = set() def _get_pixel_clock( self, @@ -142,14 +181,46 @@ def sensors_resolution_vertical( return int(sensor.value) if sensor.value is not None else None return None + def sensors_vcp_info( + self, + index: int, + name: str, + ) -> VCPMonitorInfo: + """Get VCP info for a specific monitor.""" + if name not in self.vcp_monitor_blacklist: + vcp_monitors = vcp_get_monitors() + try: + def permissive(lambda_func): + try: return lambda_func() + except VCPError: pass + + vcp_monitor = vcp_monitors[index] + with vcp_monitor: + brightness = vcp_monitor.get_luminance() + contrast = permissive(vcp_monitor.get_contrast) + volume = permissive(lambda: vcp_monitor._get_vcp_feature(vcpcode_volume)) # pylint: disable=protected-access + power_state = permissive(lambda: vcp_monitor.get_power_mode().value) + input_source = permissive(lambda: vcp_monitor.get_input_source().value) + + return VCPMonitorInfo(True, brightness, contrast, volume, power_state, input_source) + except VCPError as e: + self._logger.error("Error querying Monitor %d %s through VCP: %s", index, name, str(e)) + self.vcp_monitor_blacklist.add(name) + return VCPMonitorInfo(False) + else: + self._logger.info("Skipped VCP query for blacklisted monitor %d %s", index, name) + return VCPMonitorInfo(False) + @override async def update_all_data(self) -> list[Display]: """Update all data.""" self._logger.debug("Update all data") try: - return [ - Display( + monitors = [] + for key, monitor in enumerate(get_monitors()): + vcp_info = self.sensors_vcp_info(key, monitor.name) + monitors.append(Display( id=str(key), name=monitor.name if monitor.name is not None else str(key), resolution_horizontal=monitor.width, @@ -161,9 +232,15 @@ async def update_all_data(self) -> list[Display]: is_primary=monitor.is_primary, pixel_clock=self._get_pixel_clock(str(key)), refresh_rate=self.sensors_refresh_rate(str(key)), - ) - for key, monitor in enumerate(get_monitors()) - ] + vcp_supported=vcp_info.vcp_supported, + brightness=vcp_info.brightness, + contrast=vcp_info.contrast, + volume=vcp_info.volume, + power_state=vcp_info.power_state, + input_source=vcp_info.input_source, + sdr_white_level=None, + )) + return monitors except ScreenInfoError as error: self._logger.error(error) return [] diff --git a/systembridgebackend/server/__init__.py b/systembridgebackend/server/__init__.py index c1041a1..40a7a1c 100644 --- a/systembridgebackend/server/__init__.py +++ b/systembridgebackend/server/__init__.py @@ -118,10 +118,12 @@ def exit_application(self) -> None: self._logger.info("Exiting application") # Stop update threads - if api_app.data_update.update_data_thread is not None: - api_app.data_update.update_data_thread.join() - if api_app.data_update.update_media_thread is not None: - api_app.data_update.update_media_thread.join() + for thread in ( + api_app.data_update.update_data_thread, + api_app.data_update.update_media_thread, + ): + if thread is not None: + thread.interrupt() self._logger.info("Update threads joined") # Stop all tasks diff --git a/systembridgebackend/server/websocket.py b/systembridgebackend/server/websocket.py index 94f918b..00bfbdf 100644 --- a/systembridgebackend/server/websocket.py +++ b/systembridgebackend/server/websocket.py @@ -7,8 +7,10 @@ from uuid import uuid4 from fastapi import WebSocket +from monitorcontrol.vcp.vcp_abc import VCPError from starlette.websockets import WebSocketDisconnect +from systembridgemodels.display import DisplaySetting, DisplayUpdateSettingOp from systembridgemodels.keyboard_key import KeyboardKey from systembridgemodels.keyboard_text import KeyboardText from systembridgemodels.media_control import MediaAction, MediaControl @@ -47,6 +49,7 @@ SUBTYPE_MISSING_TEXT, SUBTYPE_MISSING_TITLE, SUBTYPE_MISSING_VALUE, + SUBTYPE_OPERATION_FAILED, SUBTYPE_UNKNOWN_EVENT, TYPE_DATA_GET, TYPE_DATA_LISTENER_REGISTERED, @@ -87,8 +90,18 @@ TYPE_SETTINGS_RESULT, TYPE_UNREGISTER_DATA_LISTENER, TYPE_UPDATE_SETTINGS, + TYPE_DISPLAY_UPDATE_SETTING, + TYPE_DISPLAY_SETTING_UPDATED, + ) from ..handlers.data import DataUpdate +from ..handlers.display import ( + set_brightness, + set_contrast, + set_volume, + set_power_state, + set_input_source, +) from ..handlers.keyboard import keyboard_keypress, keyboard_text from ..handlers.media import ( control_fastforward, @@ -895,6 +908,74 @@ async def _handle_event( # noqa: C901 ) ) logout() + elif request.event == TYPE_DISPLAY_UPDATE_SETTING: + try: + model = DisplayUpdateSettingOp(**response_data[EVENT_DATA]) + except ValueError as error: + message = f"DisplayUpdateSettingOp parse error: {error}" + self._logger.warning(message, exc_info=error) + await self._send_response( + Response( + id=request.id, + type=TYPE_ERROR, + subtype=SUBTYPE_BAD_REQUEST, + message=message, + data={}, + ) + ) + return + if model.setting not in DisplaySetting or not isinstance(model.monitor_id, int) or not isinstance(model.value, int): + self._logger.warning("Invalid parameters: %s", model) + await self._send_response( + Response( + id=request.id, + type=TYPE_ERROR, + subtype=SUBTYPE_INVALID_ACTION, + message="Invalid parameters", + data={}, + ) + ) + return + + try: + self._logger.info("Updating display %d %s to %d", model.monitor_id, model.setting, model.value) + if model.setting == DisplaySetting.BRIGHTNESS: + set_brightness(model.monitor_id, model.value) + elif model.setting == DisplaySetting.CONTRAST: + set_contrast(model.monitor_id, model.value) + elif model.setting == DisplaySetting.VOLUME: + set_volume(model.monitor_id, model.value) + elif model.setting == DisplaySetting.POWER_STATE: + set_power_state(model.monitor_id, model.value) + elif model.setting == DisplaySetting.INPUT_SOURCE: + set_input_source(model.monitor_id, model.value) + else: + raise NotImplementedError(f"{model.setting} not implemented") + + await self._send_response( + Response( + id=request.id, + type=TYPE_DISPLAY_SETTING_UPDATED, + message=f"Display setting {model.setting} updated", + data=asdict(model), + ) + ) + self._data_update.request_update_data(modules=["displays"]) + self._logger.info("display %d updated", model.monitor_id) + except (VCPError, NotImplementedError) as e: + message = f"{e.__class__.__name__}: {str(e)}" + self._logger.warning(message, exc_info=e) + await self._send_response( + Response( + id=request.id, + type=TYPE_ERROR, + subtype=SUBTYPE_OPERATION_FAILED, + message=message, + data={}, + ) + ) + return + else: self._logger.warning("Unknown event: %s", request.event) await self._send_response(