From 10205ce5df3fd655e4a91477335e6f35eb80800f Mon Sep 17 00:00:00 2001 From: TAG-Epic Date: Sun, 2 Apr 2023 23:47:39 +0200 Subject: [PATCH 1/2] feat(voice): initial rough draft of voice. --- nextcore/http/client/base_client.py | 83 ++++++++++++++++++++++- nextcore/http/client/client.py | 59 ++++++++++++++++ nextcore/voice/opcodes.py | 1 + nextcore/voice/voice_client.py | 101 ++++++++++++++++++++++++++++ 4 files changed, 242 insertions(+), 2 deletions(-) create mode 100644 nextcore/voice/opcodes.py create mode 100644 nextcore/voice/voice_client.py diff --git a/nextcore/http/client/base_client.py b/nextcore/http/client/base_client.py index 93a771126..d2ca356c8 100644 --- a/nextcore/http/client/base_client.py +++ b/nextcore/http/client/base_client.py @@ -24,13 +24,15 @@ from abc import ABC, abstractmethod from logging import getLogger from typing import TYPE_CHECKING +from nextcore.common import UNDEFINED from ..route import Route if TYPE_CHECKING: - from typing import Any, Final + from typing import Any, Final, Literal + from nextcore.common import UndefinedType - from aiohttp import ClientResponse + from aiohttp import ClientResponse, ClientWebSocketResponse logger = getLogger(__name__) @@ -58,3 +60,80 @@ async def request( **kwargs: Any, ) -> ClientResponse: ... + + @abstractmethod + async def connect_to_gateway( + self, + *, + version: Literal[6, 7, 8, 9, 10] | UndefinedType = UNDEFINED, + encoding: Literal["json", "etf"] | UndefinedType = UNDEFINED, + compress: Literal["zlib-stream"] | UndefinedType = UNDEFINED, + ) -> ClientWebSocketResponse: + """Connects to the gateway + + **Example usage:** + + .. code-block:: python + + ws = await http_client.connect_to_gateway() + + + Parameters + ---------- + version: + The major API version to use + + .. hint:: + It is a good idea to pin this to make sure something doesn't unexpectedly change + encoding: + Whether to use json or etf for payloads + compress: + Payload compression from data sent from Discord. + + Returns + ------- + aiohttp.ClientWebSocketResponse + The gateway websocket + """ + + ... + + + @abstractmethod + async def connect_to_voice_websocket( + self, + endpoint: str, + *, + version: Literal[1,2,3,4] | UndefinedType = UNDEFINED, + ) -> ClientWebSocketResponse: + """Connects to the voice WebSocket gateway + + **Example usage:** + + .. code-block:: python + + ws = await http_client.connect_to_voice_websocket() + + + Parameters + ---------- + endpoint: + The voice server to connect to. + + .. note:: + This can obtained from the `voice server update event ` and is usually in the format of ``servername.discord.media:443`` + version: + The major API version to use + + .. hint:: + It is a good idea to pin this to make sure something doesn't unexpectedly change + .. note:: + A list of versions can be found on the `voice versioning page `__ + + Returns + ------- + aiohttp.ClientWebSocketResponse + The voice websocket gateway + """ + + ... diff --git a/nextcore/http/client/client.py b/nextcore/http/client/client.py index 865bd79e4..2aed0b724 100644 --- a/nextcore/http/client/client.py +++ b/nextcore/http/client/client.py @@ -438,6 +438,65 @@ async def connect_to_gateway( # TODO: Aiohttp bug return await self._session.ws_connect("wss://gateway.discord.gg", params=params) # type: ignore [reportUnknownMemberType] + + async def connect_to_voice_websocket( + self, + endpoint: str, + *, + version: Literal[1,2,3,4] | UndefinedType = UNDEFINED, + ) -> ClientWebSocketResponse: + """Connects to the voice WebSocket gateway + + **Example usage:** + + .. code-block:: python + + ws = await http_client.connect_to_voice_websocket() + + + Parameters + ---------- + endpoint: + The voice server to connect to. + + .. note:: + This can obtained from the `voice server update event ` and is usually in the format of ``servername.discord.media:443`` + version: + The major API version to use + + .. hint:: + It is a good idea to pin this to make sure something doesn't unexpectedly change + .. note:: + A list of versions can be found on the `voice versioning page `__ + + Raises + ------ + RuntimeError + :meth:`HTTPClient.setup` was not called yet. + RuntimeError + HTTPClient was closed. + + Returns + ------- + aiohttp.ClientWebSocketResponse + The voice websocket gateway + """ + + if self._session is None: + raise RuntimeError("HTTPClient.setup was not called yet!") + if self._session.closed: + raise RuntimeError("HTTPClient is closed!") + + params = {} + + # These have different behaviour when not provided and set to None. + # This only adds them if they are provided (not Undefined) + if version is not UNDEFINED: + params["version"] = version + + # TODO: Aiohttp bug + return await self._session.ws_connect("wss://" + endpoint, params=params) # type: ignore [reportUnknownMemberType] + async def _get_bucket(self, route: Route, rate_limit_storage: RateLimitStorage) -> Bucket: """Gets a bucket object for a route. diff --git a/nextcore/voice/opcodes.py b/nextcore/voice/opcodes.py new file mode 100644 index 000000000..6e507a320 --- /dev/null +++ b/nextcore/voice/opcodes.py @@ -0,0 +1 @@ +from enum import IntEnum diff --git a/nextcore/voice/voice_client.py b/nextcore/voice/voice_client.py new file mode 100644 index 000000000..400aae06e --- /dev/null +++ b/nextcore/voice/voice_client.py @@ -0,0 +1,101 @@ +# The MIT License (MIT) +# Copyright (c) 2021-present nextcore developers +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +from __future__ import annotations +import asyncio +from logging import getLogger +from nextcore.http import HTTPClient # TODO: Replace with BaseHTTPClient +from typing import TYPE_CHECKING, Any +from nextcore.common import json_loads, json_dumps, Dispatcher +import time + +if TYPE_CHECKING: + from discord_typings import Snowflake + from aiohttp import ClientWebSocketResponse + +__all__ = ("VoiceClient", ) + +_logger = getLogger(__name__) + +class VoiceClient: + def __init__(self, guild_id: Snowflake, user_id: Snowflake, session_id: str, token: str, endpoint: str, http_client: HTTPClient) -> None: + self.guild_id: Snowflake = guild_id + self.user_id: Snowflake = user_id + self.session_id: str = session_id + self.token: str = token # TODO: Replace with Authentication? + self.endpoint: str = endpoint + self.raw_dispatcher: Dispatcher[int] = Dispatcher() + self._http_client: HTTPClient = http_client + self._ws: ClientWebSocketResponse | None = None + + # Default event handlers + self.raw_dispatcher.add_listener(self._handle_hello, 8) + + async def connect(self) -> None: + self._ws = await self._http_client.connect_to_voice_websocket(self.endpoint) + asyncio.create_task(self._receive_loop(self._ws)) + + async def send(self, message: Any) -> None: + if self._ws is None: + raise RuntimeError("Shame! Shame!") # TODO: Lol + _logger.debug("Send to websocket: %s", message) + await self._ws.send_json(message, dumps=json_dumps) + + + async def identify(self, guild_id: Snowflake, user_id: Snowflake, session_id: str, token: str): + await self.send({ + "op": 0, + "d": { + "server_id": guild_id, # Why is this called server_id? + "user_id": user_id, + "session_id": session_id, + "token": token + } + }) + + async def heartbeat(self) -> None: + await self.send({ + "op": 3, + "d": int(time.time()) # This should not be frequent enough that it becomes a issue. + }) + + async def _receive_loop(self, ws: ClientWebSocketResponse): + _logger.debug("Started listening for messages") + async for message in ws: + data = message.json(loads=json_loads) # TODO: Type hint! + _logger.debug("Received data from the websocket: %s", data) + await self.raw_dispatcher.dispatch(data["op"], data) + + async def _heartbeat_loop(self, ws: ClientWebSocketResponse, interval_seconds: float): + _logger.debug("Started heartbeating every %ss", interval_seconds) + while not ws.closed: + await self.heartbeat() + await asyncio.sleep(interval_seconds) + + async def _handle_hello(self, event_data: dict[str, Any]): + assert self._ws is not None, "WebSocket was None in hello" + + await self.identify(self.guild_id, self.user_id, self.session_id, self.token) + heartbeat_interval_ms = event_data["d"]["heartbeat_interval"] + heartbeat_interval_seconds = heartbeat_interval_ms / 1000 + + asyncio.create_task(self._heartbeat_loop(self._ws, heartbeat_interval_seconds)) + From 71ccdd9130e1ee85e1f35a8d5e4fdb9b5b3cad8e Mon Sep 17 00:00:00 2001 From: TAG-Epic Date: Sun, 23 Apr 2023 23:11:50 +0200 Subject: [PATCH 2/2] feat(voice): some voice progress, forgot what it was. Wont have any progress until we get a decent opus lib to use --- nextcore/voice/udp_client.py | 61 ++++++++++++++++++++++++++++++++++ nextcore/voice/voice_client.py | 52 ++++++++++++++++++++++++++--- pyproject.toml | 1 + 3 files changed, 110 insertions(+), 4 deletions(-) create mode 100644 nextcore/voice/udp_client.py diff --git a/nextcore/voice/udp_client.py b/nextcore/voice/udp_client.py new file mode 100644 index 000000000..c7c18f3f8 --- /dev/null +++ b/nextcore/voice/udp_client.py @@ -0,0 +1,61 @@ +# The MIT License (MIT) +# Copyright (c) 2021-present nextcore developers +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +# DEALINGS IN THE SOFTWARE. + +from __future__ import annotations +import asyncio +from logging import getLogger +from typing import TYPE_CHECKING +import anyio +import struct + +VoicePacketHeader = struct.Struct(">HH") + +if TYPE_CHECKING: + from anyio.abc import ConnectedUDPSocket + +__all__ = ("UDPClient", ) + +_logger = getLogger(__name__) + +class UDPClient: + def __init__(self) -> None: + self.socket: ConnectedUDPSocket | None = None + + async def send(self, message: bytes): + assert self.socket is not None + _logger.debug("Sent %s", hex(int.from_bytes(message, byteorder="big"))) + await self.socket.send(message) + + + + async def connect(self, host: str, port: int): + _logger.info("Connecting to %s:%s", host, port) + self.socket = await anyio.create_connected_udp_socket(host, port) + _logger.debug("Connected to %s:%s", host, port) + + async def receive_loop(self): + assert self.socket is not None + _logger.debug("Started udp receive loop") + + async for message in self.socket: + pass + # _logger.debug("Received %s", message) + diff --git a/nextcore/voice/voice_client.py b/nextcore/voice/voice_client.py index 400aae06e..9f2e3250d 100644 --- a/nextcore/voice/voice_client.py +++ b/nextcore/voice/voice_client.py @@ -22,11 +22,14 @@ from __future__ import annotations import asyncio from logging import getLogger +import struct from nextcore.http import HTTPClient # TODO: Replace with BaseHTTPClient from typing import TYPE_CHECKING, Any from nextcore.common import json_loads, json_dumps, Dispatcher import time +from .udp_client import UDPClient + if TYPE_CHECKING: from discord_typings import Snowflake from aiohttp import ClientWebSocketResponse @@ -42,12 +45,15 @@ def __init__(self, guild_id: Snowflake, user_id: Snowflake, session_id: str, tok self.session_id: str = session_id self.token: str = token # TODO: Replace with Authentication? self.endpoint: str = endpoint + self.ssrc: int | None = None self.raw_dispatcher: Dispatcher[int] = Dispatcher() self._http_client: HTTPClient = http_client self._ws: ClientWebSocketResponse | None = None + self._socket: UDPClient | None = None # Default event handlers self.raw_dispatcher.add_listener(self._handle_hello, 8) + self.raw_dispatcher.add_listener(self._handle_ready, 2) async def connect(self) -> None: self._ws = await self._http_client.connect_to_voice_websocket(self.endpoint) @@ -60,7 +66,7 @@ async def send(self, message: Any) -> None: await self._ws.send_json(message, dumps=json_dumps) - async def identify(self, guild_id: Snowflake, user_id: Snowflake, session_id: str, token: str): + async def identify(self, guild_id: Snowflake, user_id: Snowflake, session_id: str, token: str) -> None: await self.send({ "op": 0, "d": { @@ -77,20 +83,21 @@ async def heartbeat(self) -> None: "d": int(time.time()) # This should not be frequent enough that it becomes a issue. }) - async def _receive_loop(self, ws: ClientWebSocketResponse): + async def _receive_loop(self, ws: ClientWebSocketResponse) -> None: _logger.debug("Started listening for messages") async for message in ws: data = message.json(loads=json_loads) # TODO: Type hint! _logger.debug("Received data from the websocket: %s", data) await self.raw_dispatcher.dispatch(data["op"], data) + _logger.info("WebSocket closed with code %s!", ws.close_code) - async def _heartbeat_loop(self, ws: ClientWebSocketResponse, interval_seconds: float): + async def _heartbeat_loop(self, ws: ClientWebSocketResponse, interval_seconds: float) -> None: _logger.debug("Started heartbeating every %ss", interval_seconds) while not ws.closed: await self.heartbeat() await asyncio.sleep(interval_seconds) - async def _handle_hello(self, event_data: dict[str, Any]): + async def _handle_hello(self, event_data: dict[str, Any]) -> None: assert self._ws is not None, "WebSocket was None in hello" await self.identify(self.guild_id, self.user_id, self.session_id, self.token) @@ -99,3 +106,40 @@ async def _handle_hello(self, event_data: dict[str, Any]): asyncio.create_task(self._heartbeat_loop(self._ws, heartbeat_interval_seconds)) + async def _handle_ready(self, event: dict[str, Any]) -> None: + event_data = event["d"] + voice_ip = event_data["ip"] + voice_port = event_data["port"] + self.ssrc = event_data["ssrc"] + + self._socket = UDPClient() + await self._socket.connect(voice_ip, voice_port) + + # IP Discovery + HEADER_SIZE = 4 + PAYLOAD_SIZE = 70 + packet = bytearray(PAYLOAD_SIZE + HEADER_SIZE) + struct.pack_into(">HHI", packet, 0, 0x1, PAYLOAD_SIZE, self.ssrc) + await self._socket.send(packet) + + response = await self._socket.socket.receive() + + raw_ip, port = struct.unpack(">8x64sH", response) + ip = raw_ip.decode("ascii") + _logger.debug("Got public IP and port from discovery: %s:%s", ip, port) + + asyncio.create_task(self._socket.receive_loop()) + + await self.send({"op": 1, "d": { + "protocol": "udp", + "data": { + "address": ip, + "port": port, + "mode": "xsalsa20_poly1305_lite" + } + }}) + + + + + diff --git a/pyproject.toml b/pyproject.toml index e4a4932ee..10211e65b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ typing-extensions = "^4.1.1" # Same as above orjson = {version = "^3.6.8", optional = true} types-orjson = {version = "^3.6.2", optional = true} discord-typings = "^0.5.0" +anyio = "^3.6.2" [tool.poetry.group.dev.dependencies] Sphinx = "^5.0.0"