From f4be749bd010d9f5d325e268c314f05a014f4f2b Mon Sep 17 00:00:00 2001 From: Waldemar Hummer Date: Thu, 13 Nov 2025 21:24:18 -0500 Subject: [PATCH 01/18] add initial version of TypeDB extension --- typedb/.gitignore | 5 + typedb/Makefile | 44 +++++ typedb/README.md | 41 ++++ typedb/localstack_typedb/__init__.py | 1 + typedb/localstack_typedb/extension.py | 47 +++++ typedb/localstack_typedb/utils/__init__.py | 0 typedb/localstack_typedb/utils/docker.py | 207 +++++++++++++++++++++ typedb/localstack_typedb/utils/h2_proxy.py | 166 +++++++++++++++++ typedb/pyproject.toml | 39 ++++ typedb/tests/test_extension.py | 84 +++++++++ 10 files changed, 634 insertions(+) create mode 100644 typedb/.gitignore create mode 100644 typedb/Makefile create mode 100644 typedb/README.md create mode 100644 typedb/localstack_typedb/__init__.py create mode 100644 typedb/localstack_typedb/extension.py create mode 100644 typedb/localstack_typedb/utils/__init__.py create mode 100644 typedb/localstack_typedb/utils/docker.py create mode 100644 typedb/localstack_typedb/utils/h2_proxy.py create mode 100644 typedb/pyproject.toml create mode 100644 typedb/tests/test_extension.py diff --git a/typedb/.gitignore b/typedb/.gitignore new file mode 100644 index 0000000..77be714 --- /dev/null +++ b/typedb/.gitignore @@ -0,0 +1,5 @@ +.venv +dist +build +**/*.egg-info +.eggs \ No newline at end of file diff --git a/typedb/Makefile b/typedb/Makefile new file mode 100644 index 0000000..d274608 --- /dev/null +++ b/typedb/Makefile @@ -0,0 +1,44 @@ +VENV_BIN = python3 -m venv +VENV_DIR ?= .venv +VENV_ACTIVATE = $(VENV_DIR)/bin/activate +VENV_RUN = . $(VENV_ACTIVATE) + +usage: ## Shows usage for this Makefile + @cat Makefile | grep -E '^[a-zA-Z_-]+:.*?## .*$$' | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-15s\033[0m %s\n", $$1, $$2}' + +venv: $(VENV_ACTIVATE) + +$(VENV_ACTIVATE): pyproject.toml + test -d .venv || $(VENV_BIN) .venv + $(VENV_RUN); pip install --upgrade pip setuptools plux + $(VENV_RUN); pip install -e .[dev] + touch $(VENV_DIR)/bin/activate + +clean: + rm -rf .venv/ + rm -rf build/ + rm -rf .eggs/ + rm -rf *.egg-info/ + +install: venv ## Install dependencies + $(VENV_RUN); python -m plux entrypoints + +dist: venv ## Create distribution + $(VENV_RUN); python -m build + +publish: clean-dist venv dist ## Publish extension to pypi + $(VENV_RUN); pip install --upgrade twine; twine upload dist/* + +entrypoints: venv ## Generate plugin entrypoints for Python package + $(VENV_RUN); python -m plux entrypoints + +format: ## Run ruff to format the codebase + $(VENV_RUN); python -m ruff format .; python -m ruff check --output-format=full --fix . + +test: ## Run integration tests (requires LocalStack running with the Extension installed) + $(VENV_RUN); pytest tests $(PYTEST_ARGS) + +clean-dist: clean + rm -rf dist/ + +.PHONY: clean clean-dist dist install publish usage venv format test diff --git a/typedb/README.md b/typedb/README.md new file mode 100644 index 0000000..8a18509 --- /dev/null +++ b/typedb/README.md @@ -0,0 +1,41 @@ +TypeDB on LocalStack +===================== + +This repo contains a [LocalStack Extension](https://github.com/localstack/localstack-extensions) that facilitates developing [TypeDB](https://typedb.com)-based applications locally. + +## Prerequisites + +* Docker +* LocalStack Pro (free trial available) +* `localstack` CLI +* `make` + +## Install from GitHub repository + +This extension can be installed directly from this Github repo via: + +```bash +localstack extensions install "git+https://github.com/localstack/localstack-extensions.git#egg=localstack-typedb&subdirectory=typedb" +``` + +## Install local development version + +To install the extension into LocalStack in developer mode, you will need Python 3.13, and create a virtual environment in the extensions project. + +In the newly generated project, simply run + +```bash +make install +``` + +Then, to enable the extension for LocalStack, run + +```bash +localstack extensions dev enable . +``` + +You can then start LocalStack with `EXTENSION_DEV_MODE=1` to load all enabled extensions: + +```bash +EXTENSION_DEV_MODE=1 localstack start +``` diff --git a/typedb/localstack_typedb/__init__.py b/typedb/localstack_typedb/__init__.py new file mode 100644 index 0000000..2230ab6 --- /dev/null +++ b/typedb/localstack_typedb/__init__.py @@ -0,0 +1 @@ +name = "localstack_typedb" diff --git a/typedb/localstack_typedb/extension.py b/typedb/localstack_typedb/extension.py new file mode 100644 index 0000000..1bb4adf --- /dev/null +++ b/typedb/localstack_typedb/extension.py @@ -0,0 +1,47 @@ +import os +import shlex + +from localstack.config import is_env_not_false +from localstack.utils.docker_utils import DOCKER_CLIENT +from localstack_typedb.utils.docker import ProxiedDockerContainerExtension +from rolo import Request + +# environment variable for user-defined command args to pass to TypeDB +ENV_CMD_FLAGS = "TYPEDB_FLAGS" +# environment variable for flag to enable/disable HTTP2 proxy for gRPC traffic +ENV_HTTP2_PROXY = "TYPEDB_HTTP2_PROXY" + + +class TypeDbExtension(ProxiedDockerContainerExtension): + name = "localstack-typedb" + + HOST = "typedb." + # name of the Docker image to spin up + DOCKER_IMAGE = "typedb/typedb" + # default command args to pass to TypeDB + DEFAULT_CMD_FLAGS = ["--diagnostics.reporting.metrics=false"] + # default port for TypeDB HTTP2/gRPC endpoint + TYPEDB_PORT = 1729 + + def __init__(self): + command_flags = (os.environ.get(ENV_CMD_FLAGS) or "").strip() + command_flags = self.DEFAULT_CMD_FLAGS + shlex.split(command_flags) + command = self._get_image_command() + command_flags + http2_ports = [self.TYPEDB_PORT] if is_env_not_false(ENV_HTTP2_PROXY) else [] + super().__init__( + image_name=self.DOCKER_IMAGE, + container_ports=[8000, 1729], + host=self.HOST, + request_to_port_router=self.request_to_port_router, + command=command, + http2_ports=http2_ports, + ) + + def _get_image_command(self) -> list[str]: + result = DOCKER_CLIENT.inspect_image(self.DOCKER_IMAGE) + image_command = result["Config"]["Cmd"] + return image_command + + def request_to_port_router(self, request: Request) -> int: + # TODO add REST API / gRPC routing based on request + return 1729 diff --git a/typedb/localstack_typedb/utils/__init__.py b/typedb/localstack_typedb/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/typedb/localstack_typedb/utils/docker.py b/typedb/localstack_typedb/utils/docker.py new file mode 100644 index 0000000..8916418 --- /dev/null +++ b/typedb/localstack_typedb/utils/docker.py @@ -0,0 +1,207 @@ +import re +import logging +from functools import cache +from typing import Callable +import requests + +from localstack import config +from localstack.config import is_env_true +from localstack_typedb.utils.h2_proxy import ( + apply_http2_patches_for_grpc_support, + ProxyRequestMatcher, +) +from localstack.utils.docker_utils import DOCKER_CLIENT +from localstack.extensions.api import Extension, http +from localstack.http import Request +from localstack.utils.container_utils.container_client import PortMappings +from localstack.utils.net import get_addressable_container_host +from localstack.utils.sync import retry +from rolo import route +from rolo.proxy import Proxy +from rolo.routing import RuleAdapter, WithHost +from werkzeug.datastructures import Headers + +LOG = logging.getLogger(__name__) +logging.getLogger("localstack_typedb").setLevel( + logging.DEBUG if config.DEBUG else logging.INFO +) +logging.basicConfig() + + +class ProxiedDockerContainerExtension(Extension, ProxyRequestMatcher): + """ + Utility class to create a LocalStack Extension backed by a Docker container that exposes a service + on a network port (or several ports), with requests being proxied through the LocalStack gateway. + + Requests may potentially use HTTP2 with binary content as the protocol (e.g., gRPC over HTTP2). + To ensure proper routing of requests, subclasses can define the `http2_ports`. + """ + + name: str + """Name of this extension""" + image_name: str + """Docker image name""" + container_name: str | None + """Name of the Docker container spun up by the extension""" + container_ports: list[int] + """List of network ports of the Docker container spun up by the extension""" + host: str | None + """ + Optional host on which to expose the container endpoints. + Can be either a static hostname, or a pattern like `myext.` + """ + path: str | None + """Optional path on which to expose the container endpoints.""" + command: list[str] | None + """Optional command (and flags) to execute in the container.""" + + request_to_port_router: Callable[[Request], int] | None + """Callable that returns the target port for a given request, for routing purposes""" + http2_ports: list[int] | None + """List of ports for which HTTP2 proxy forwarding into the container should be enabled.""" + + def __init__( + self, + image_name: str, + container_ports: list[int], + host: str | None = None, + path: str | None = None, + container_name: str | None = None, + command: list[str] | None = None, + request_to_port_router: Callable[[Request], int] | None = None, + http2_ports: list[int] | None = None, + ): + self.image_name = image_name + self.container_ports = container_ports + self.host = host + self.path = path + self.container_name = container_name + self.command = command + self.request_to_port_router = request_to_port_router + self.http2_ports = http2_ports + + def update_gateway_routes(self, router: http.Router[http.RouteHandler]): + if self.path: + raise NotImplementedError( + "Path-based routing not yet implemented for this extension" + ) + # note: for simplicity, starting the external container at startup - could be optimized over time ... + self.start_container() + # add resource for HTTP/1.1 requests + resource = RuleAdapter(ProxyResource(self)) + if self.host: + resource = WithHost(self.host, [resource]) + router.add(resource) + + # apply patches to serve HTTP/2 requests + for port in self.http2_ports or []: + apply_http2_patches_for_grpc_support( + get_addressable_container_host(), port, self + ) + + def on_platform_shutdown(self): + self._remove_container() + + def _get_container_name(self) -> str: + if self.container_name: + return self.container_name + name = f"ls-ext-{self.name}" + name = re.sub(r"\W", "-", name) + return name + + def should_proxy_request(self, headers: Headers) -> bool: + # determine if this is a gRPC request targeting TypeDB + content_type = headers.get("content-type") or "" + req_path = headers.get(":path") or "" + is_typedb_grpc_request = ( + "grpc" in content_type and "/typedb.protocol.TypeDB" in req_path + ) + return is_typedb_grpc_request + + @cache + def start_container(self) -> None: + container_name = self._get_container_name() + LOG.debug("Starting extension container %s", container_name) + + ports = PortMappings() + for port in self.container_ports: + ports.add(port) + + kwargs = {} + if self.command: + kwargs["command"] = self.command + + try: + DOCKER_CLIENT.run_container( + self.image_name, + detach=True, + remove=True, + name=container_name, + ports=ports, + **kwargs, + ) + except Exception as e: + LOG.debug("Failed to start container %s: %s", container_name, e) + # allow running TypeDB in a local server in dev mode, if TYPEDB_DEV_MODE is enabled + if not is_env_true("TYPEDB_DEV_MODE"): + raise + + main_port = self.container_ports[0] + container_host = get_addressable_container_host() + + def _ping_endpoint(): + # TODO: allow defining a custom healthcheck endpoint ... + response = requests.get(f"http://{container_host}:{main_port}/") + assert response.ok + + try: + retry(_ping_endpoint, retries=40, sleep=1) + except Exception as e: + LOG.info("Failed to connect to container %s: %s", container_name, e) + self._remove_container() + raise + + LOG.debug("Successfully started extension container %s", container_name) + + def _remove_container(self): + container_name = self._get_container_name() + LOG.debug("Stopping extension container %s", container_name) + DOCKER_CLIENT.remove_container( + container_name, force=True, check_existence=False + ) + + +class ProxyResource: + """ + Simple proxy resource that forwards incoming requests from the + LocalStack Gateway to the target Docker container. + """ + + extension: ProxiedDockerContainerExtension + + def __init__(self, extension: ProxiedDockerContainerExtension): + self.extension = extension + + @route("/") + def index(self, request: Request, path: str, *args, **kwargs): + return self._proxy_request(request, forward_path=f"/{path}") + + def _proxy_request(self, request: Request, forward_path: str, *args, **kwargs): + self.extension.start_container() + + port = self.extension.container_ports[0] + container_host = get_addressable_container_host() + base_url = f"http://{container_host}:{port}" + proxy = Proxy(forward_base_url=base_url) + + # update content length (may have changed due to content compression) + if request.method not in ("GET", "OPTIONS"): + request.headers["Content-Length"] = str(len(request.data)) + + # make sure we're forwarding the correct Host header + request.headers["Host"] = f"localhost:{port}" + + # forward the request to the target + result = proxy.forward(request, forward_path=forward_path) + + return result diff --git a/typedb/localstack_typedb/utils/h2_proxy.py b/typedb/localstack_typedb/utils/h2_proxy.py new file mode 100644 index 0000000..2beccca --- /dev/null +++ b/typedb/localstack_typedb/utils/h2_proxy.py @@ -0,0 +1,166 @@ +import logging +import socket +from abc import abstractmethod + +from h2.frame_buffer import FrameBuffer +from hpack import Decoder +from hyperframe.frame import HeadersFrame, Frame +from twisted.internet import reactor + +from localstack.utils.patch import patch +from twisted.web._http2 import H2Connection +from werkzeug.datastructures import Headers + +LOG = logging.getLogger(__name__) + + +class ProxyRequestMatcher: + """ + Abstract base class that defines a request matcher, for an extension to define which incoming + request messages should be proxied to an upstream target (and which ones shouldn't). + """ + + @abstractmethod + def should_proxy_request(self, headers: Headers) -> bool: + """Define whether a request should be proxied, based on request headers.""" + + +class TcpForwarder: + """Simple helper class for bidirectional forwarding of TPC traffic.""" + + buffer_size: int = 1024 + """Data buffer size for receiving data from upstream socket.""" + + def __init__(self, port: int, host: str = "localhost"): + self.port = port + self.host = host + self._socket = None + self.connect() + + def connect(self): + if not self._socket: + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect((self.host, self.port)) + + def receive_loop(self, callback): + while True: + data = self._socket.recv(self.buffer_size) + callback(data) + if not data: + break + + def send(self, data): + self._socket.sendall(data) + + def close(self): + LOG.debug("Closing connection to upstream HTTP2 server on port %s", self.port) + try: + self._socket.shutdown(socket.SHUT_RDWR) + self._socket.close() + except Exception: + # swallow exceptions here (e.g., "bad file descriptor") + pass + + +def apply_http2_patches_for_grpc_support( + target_host: str, target_port: int, request_matcher: ProxyRequestMatcher +): + """ + Apply some patches to proxy incoming gRPC requests and forward them to a target port. + Note: this is a very brute-force approach and needs to be fixed/enhanced over time! + """ + + @patch(H2Connection.connectionMade) + def _connectionMade(fn, self, *args, **kwargs): + def _process(data): + LOG.debug("Received data (%s bytes) from upstream HTTP2 server", len(data)) + self.transport.write(data) + + # TODO: make port configurable + self._ls_forwarder = TcpForwarder(target_port, host=target_host) + LOG.debug( + "Starting TCP forwarder to port %s for new HTTP2 connection", target_port + ) + reactor.getThreadPool().callInThread(self._ls_forwarder.receive_loop, _process) + + @patch(H2Connection.dataReceived) + def _dataReceived(fn, self, data, *args, **kwargs): + forwarder = getattr(self, "_ls_forwarder", None) + should_proxy_request = getattr(self, "_ls_should_proxy_request", None) + if not forwarder or should_proxy_request is False: + return fn(self, data, *args, **kwargs) + + if should_proxy_request: + forwarder.send(data) + return + + setattr(self, "_data_received", getattr(self, "_data_received", [])) + self._data_received.append(data) + + # parse headers from request frames received so far + headers = get_headers_from_data_stream(self._data_received) + if not headers: + # if no headers received yet, then return (method will be called again for next chunk of data) + return + + # check if the incoming request should be proxies, based on the request headers + self._ls_should_proxy_request = request_matcher.should_proxy_request(headers) + + if not self._ls_should_proxy_request: + # if this is not a target request, then call the upstream function + result = None + for chunk in self._data_received: + result = fn(self, chunk, *args, **kwargs) + self._data_received = [] + return result + + # forward data chunks to the target + for chunk in self._data_received: + LOG.debug( + "Forwarding data (%s bytes) from HTTP2 client to server", len(chunk) + ) + forwarder.send(chunk) + self._data_received = [] + + @patch(H2Connection.connectionLost) + def connectionLost(fn, self, *args, **kwargs): + forwarder = getattr(self, "_ls_forwarder", None) + if not forwarder: + return fn(self, *args, **kwargs) + forwarder.close() + + +def get_headers_from_data_stream(data_list: list[bytes]) -> Headers: + """Get headers from a data stream (list of bytes data), if any headers are contained.""" + data_combined = b"".join(data_list) + frames = parse_http2_stream(data_combined) + headers = get_headers_from_frames(frames) + return headers + + +def get_headers_from_frames(frames: list[Frame]) -> Headers: + """Parse the given list of HTTP2 frames and return a dict of headers, if any""" + result = {} + decoder = Decoder() + for frame in frames: + if isinstance(frame, HeadersFrame): + try: + headers = decoder.decode(frame.data) + result.update(dict(headers)) + except Exception: + pass + return Headers(result) + + +def parse_http2_stream(data: bytes) -> list[Frame]: + """Parse the data from an HTTP2 stream into a list of frames""" + frames = [] + buffer = FrameBuffer(server=True) + buffer.max_frame_size = 16384 + buffer.add_data(data) + try: + for frame in buffer: + frames.append(frame) + except Exception: + pass + return frames diff --git a/typedb/pyproject.toml b/typedb/pyproject.toml new file mode 100644 index 0000000..307c3fb --- /dev/null +++ b/typedb/pyproject.toml @@ -0,0 +1,39 @@ +[build-system] +requires = ["setuptools", "wheel", "plux>=1.3.1"] +build-backend = "setuptools.build_meta" + +[project] +name = "localstack-typedb" +version = "0.1.0" +description = "LocalStack Extension: TypeDB on LocalStack" +readme = {file = "README.md", content-type = "text/markdown; charset=UTF-8"} +requires-python = ">=3.9" +authors = [ + { name = "LocalStack + TypeDB team"} +] +keywords = ["LocalStack", "TypeDB"] +classifiers = [] +dependencies = [ + "httpx", + "h2", + "priority", +] + +[project.urls] +Homepage = "https://github.com/whummer/localstack-utils" + +[project.optional-dependencies] +dev = [ + "boto3", + "build", + "jsonpatch", + "localstack", + "pytest", + "rolo", + "ruff", + "twisted", + "typedb-driver", +] + +[project.entry-points."localstack.extensions"] +localstack_typedb = "localstack_typedb.extension:TypeDbExtension" diff --git a/typedb/tests/test_extension.py b/typedb/tests/test_extension.py new file mode 100644 index 0000000..4d8c3bc --- /dev/null +++ b/typedb/tests/test_extension.py @@ -0,0 +1,84 @@ +import requests +from localstack.utils.strings import short_uid +from localstack_typedb.utils.h2_proxy import parse_http2_stream, get_headers_from_frames +from typedb.driver import TypeDB, Credentials, DriverOptions, TransactionType + + +def test_connect_to_db_via_http_api(): + host = "typedb.localhost.localstack.cloud:4566" + + # get auth token + response = requests.post( + f"http://{host}/v1/signin", json={"username": "admin", "password": "password"} + ) + assert response.ok + token = response.json()["token"] + + # create database + db_name = f"db{short_uid()}" + response = requests.post( + f"http://{host}/v1/databases/{db_name}", + json={}, + headers={"Authorization": f"bearer {token}"}, + ) + assert response.ok + + # list databases + response = requests.get( + f"http://{host}/v1/databases", headers={"Authorization": f"bearer {token}"} + ) + assert response.ok + databases = [db["name"] for db in response.json()["databases"]] + assert db_name in databases + + # clean up + response = requests.delete( + f"http://{host}/v1/databases/{db_name}", + headers={"Authorization": f"bearer {token}"}, + ) + assert response.ok + + +def test_connect_to_db_via_grpc_endpoint(): + db_name = "access-management-db" + server_host = "typedb.localhost.localstack.cloud:4566" + + driver_cfg = TypeDB.driver( + server_host, + Credentials("admin", "password"), + DriverOptions(is_tls_enabled=False), + ) + with driver_cfg as driver: + if driver.databases.contains(db_name): + driver.databases.get(db_name).delete() + driver.databases.create(db_name) + + with driver.transaction(db_name, TransactionType.SCHEMA) as tx: + tx.query("define entity person;").resolve() + tx.query("define attribute name, value string; person owns name;").resolve() + tx.commit() + + with driver.transaction(db_name, TransactionType.WRITE) as tx: + tx.query("insert $p isa person, has name 'Alice';").resolve() + tx.query("insert $p isa person, has name 'Bob';").resolve() + tx.commit() + with driver.transaction(db_name, TransactionType.READ) as tx: + results = tx.query( + 'match $p isa person; fetch {"name": $p.name};' + ).resolve() + for json in results: + print(json) + + +def test_parse_http2_frames(): + # note: the data below is a dump taken from a browser request made against the emulator + data = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n\x00\x00\x18\x04\x00\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00\x00\x00\x04\x00\x02\x00\x00\x00\x05\x00\x00@\x00\x00\x00\x04\x08\x00\x00\x00\x00\x00\x00\xbf\x00\x01" + data += b"\x00\x01V\x01%\x00\x00\x00\x03\x00\x00\x00\x00\x15C\x87\xd5\xaf~MZw\x7f\x05\x8eb*\x0eA\xd0\x84\x8c\x9dX\x9c\xa3\xa13\xffA\x96\xa0\xe4\x1d\x13\x9d\t^\x83\x90t!#'U\xc9A\xed\x92\xe3M\xb8\xe7\x87z\xbe\xd0\x7ff\xa2\x81\xb0\xda\xe0S\xfa\xd02\x1a\xa4\x9d\x13\xfd\xa9\x92\xa4\x96\x854\x0c\x8aj\xdc\xa7\xe2\x81\x02\xe1o\xedK;\xdc\x0bM.\x0f\xedLE'S\xb0 \x04\x00\x08\x02\xa6\x13XYO\xe5\x80\xb4\xd2\xe0S\x83\xf9c\xe7Q\x8b-Kp\xdd\xf4Z\xbe\xfb@\x05\xdbP\x92\x9b\xd9\xab\xfaRB\xcb@\xd2_\xa5#\xb3\xe9OhL\x9f@\x94\x19\x08T!b\x1e\xa4\xd8z\x16\xb0\xbd\xad*\x12\xb5%L\xe7\x93\x83\xc5\x83\x7f@\x95\x19\x08T!b\x1e\xa4\xd8z\x16\xb0\xbd\xad*\x12\xb4\xe5\x1c\x85\xb1\x1f\x89\x1d\xa9\x9c\xf6\x1b\xd8\xd2c\xd5s\x95\x9d)\xad\x17\x18`u\xd6\xbd\x07 \xe8BFN\xab\x92\x83\xdb#\x1f@\x85=\x86\x98\xd5\x7f\x94\x9d)\xad\x17\x18`u\xd6\xbd\x07 \xe8BFN\xab\x92\x83\xdb'@\x8aAH\xb4\xa5I'ZB\xa1?\x84-5\xa7\xd7@\x8aAH\xb4\xa5I'Z\x93\xc8_\x83!\xecG@\x8aAH\xb4\xa5I'Y\x06I\x7f\x86@\xe9*\xc82K@\x86\xae\xc3\x1e\xc3'\xd7\x83\xb6\x06\xbf@\x82I\x7f\x86M\x835\x05\xb1\x1f\x00\x00\x04\x08\x00\x00\x00\x00\x03\x00\xbe\x00\x00" + + frames = parse_http2_stream(data) + assert frames + headers = get_headers_from_frames(frames) + assert headers + assert headers[":scheme"] == "https" + assert headers[":method"] == "OPTIONS" + assert headers[":path"] == "/_localstack/health" From a4c5cf5419377665bae803bbf40d58ebfa25db3b Mon Sep 17 00:00:00 2001 From: Waldemar Hummer Date: Fri, 14 Nov 2025 17:41:30 -0500 Subject: [PATCH 02/18] add build config --- .github/workflows/typedb.yml | 44 ++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 .github/workflows/typedb.yml diff --git a/.github/workflows/typedb.yml b/.github/workflows/typedb.yml new file mode 100644 index 0000000..c042546 --- /dev/null +++ b/.github/workflows/typedb.yml @@ -0,0 +1,44 @@ +name: LocalStack TypeDB Extension Tests + +on: + pull_request: + workflow_dispatch: + +env: + LOCALSTACK_DISABLE_EVENTS: "1" + LOCALSTACK_AUTH_TOKEN: ${{ secrets.LOCALSTACK_AUTH_TOKEN }} + +jobs: + integration-tests: + name: Run Integration Tests + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup LocalStack and extension + run: | + cd typedb + + docker pull localstack/localstack-pro & + docker pull typedb/typedb & + pip install localstack + + make install + make dist + localstack extensions -v install file://$(ls ./dist/localstack_typedb-*.tar.gz) + + DEBUG=1 localstack start -d + localstack wait + + - name: Run integration tests + run: | + cd typedb + make test + + - name: Print logs + if: always() + run: | + localstack logs + localstack stop From 582f9923b106d572dd000c2d3593b8a2192f0aca Mon Sep 17 00:00:00 2001 From: Waldemar Hummer Date: Sat, 15 Nov 2025 08:22:40 -0500 Subject: [PATCH 03/18] minor polishing in README --- .github/workflows/typedb.yml | 2 +- README.md | 9 +++++---- typedb/README.md | 2 +- typedb/localstack_typedb/extension.py | 2 +- typedb/pyproject.toml | 2 +- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/.github/workflows/typedb.yml b/.github/workflows/typedb.yml index c042546..b4fa7e0 100644 --- a/.github/workflows/typedb.yml +++ b/.github/workflows/typedb.yml @@ -27,7 +27,7 @@ jobs: make install make dist - localstack extensions -v install file://$(ls ./dist/localstack_typedb-*.tar.gz) + localstack extensions -v install file://$(ls ./dist/localstack_extension_typedb-*.tar.gz) DEBUG=1 localstack start -d localstack wait diff --git a/README.md b/README.md index f3c2a49..1ffe6fc 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ $ localstack extensions install "git+https://github.com/localstack/localstack-ex ## Official LocalStack Extensions Here is the current list of extensions developed by the LocalStack team and their support status. -You can install the respective extension by calling `localstack install `. +You can install the respective extension by calling `localstack extensions install `. | Extension | Install name | Version | Support status | |----------------------------------------------------------------------------------------------------| ------------ |---------| -------------- | @@ -75,6 +75,7 @@ You can install the respective extension by calling `localstack install =1.3.1"] build-backend = "setuptools.build_meta" [project] -name = "localstack-typedb" +name = "localstack-extension-typedb" version = "0.1.0" description = "LocalStack Extension: TypeDB on LocalStack" readme = {file = "README.md", content-type = "text/markdown; charset=UTF-8"} From c9163ab18379178e46b8dfdf29d547fb8d6a9b2f Mon Sep 17 00:00:00 2001 From: Waldemar Hummer Date: Tue, 18 Nov 2025 15:02:11 -0500 Subject: [PATCH 04/18] adjust docs in README for TypeDB extension --- typedb/README.md | 31 ++++++++++++--------------- typedb/localstack_typedb/extension.py | 1 + 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/typedb/README.md b/typedb/README.md index 97abc7f..430496c 100644 --- a/typedb/README.md +++ b/typedb/README.md @@ -3,6 +3,17 @@ TypeDB on LocalStack This repo contains a [LocalStack Extension](https://github.com/localstack/localstack-extensions) that facilitates developing [TypeDB](https://typedb.com)-based applications locally. +After installing the extension, a TypeDB server instance will become available under `typedb.localhost.localstack.cloud:4566`, allowing you to create and manage TypeDB databases directly from your AWS applications running in LocalStack. + +For example, you could create a microservice backed by a Lambda function that connects to a TypeDB database upon invocation. See [here](https://github.com/typedb-osi/typedb-localstack-demo) for a simple example application that makes use of this extension. + +## Configuration + +The following environment variables can be passed to the LocalStack container (make sure to prefix them with `LOCALSTACK_...` when using the `localstack start` CLI), to steer the behavior of the extension: + +* `TYPEDB_FLAGS`: Additional user-defined command args to pass to the TypeDB container. +* `TYPEDB_HTTP2_PROXY`: Flag to enable/disable HTTP2 proxy for gRPC traffic (use this if you experience network issues, and use the HTTP variant of the TypeDB driver). + ## Prerequisites * Docker @@ -20,22 +31,8 @@ localstack extensions install "git+https://github.com/localstack/localstack-exte ## Install local development version -To install the extension into LocalStack in developer mode, you will need Python 3.13, and create a virtual environment in the extensions project. - -In the newly generated project, simply run +Please refer to the docs [here](https://github.com/localstack/localstack-extensions?tab=readme-ov-file#start-localstack-with-the-extension) for instructions on how to start the extension in developer mode. -```bash -make install -``` +## License -Then, to enable the extension for LocalStack, run - -```bash -localstack extensions dev enable . -``` - -You can then start LocalStack with `EXTENSION_DEV_MODE=1` to load all enabled extensions: - -```bash -EXTENSION_DEV_MODE=1 localstack start -``` +The code in this repo is available under the Apache 2.0 license. diff --git a/typedb/localstack_typedb/extension.py b/typedb/localstack_typedb/extension.py index 50312d8..2510db6 100644 --- a/typedb/localstack_typedb/extension.py +++ b/typedb/localstack_typedb/extension.py @@ -15,6 +15,7 @@ class TypeDbExtension(ProxiedDockerContainerExtension): name = "typedb" + # pattern of the hostname under which the extension is accessible HOST = "typedb." # name of the Docker image to spin up DOCKER_IMAGE = "typedb/typedb" From 1cc63b7a27351ee2d07a8e74eef40f8bceb5dc4c Mon Sep 17 00:00:00 2001 From: Waldemar Hummer Date: Tue, 18 Nov 2025 23:21:19 -0500 Subject: [PATCH 05/18] minor polishing --- typedb/localstack_typedb/extension.py | 10 ++++++++++ typedb/localstack_typedb/utils/docker.py | 10 ---------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/typedb/localstack_typedb/extension.py b/typedb/localstack_typedb/extension.py index 2510db6..d681516 100644 --- a/typedb/localstack_typedb/extension.py +++ b/typedb/localstack_typedb/extension.py @@ -5,6 +5,7 @@ from localstack.utils.docker_utils import DOCKER_CLIENT from localstack_typedb.utils.docker import ProxiedDockerContainerExtension from rolo import Request +from werkzeug.datastructures import Headers # environment variable for user-defined command args to pass to TypeDB ENV_CMD_FLAGS = "TYPEDB_FLAGS" @@ -43,6 +44,15 @@ def _get_image_command(self) -> list[str]: image_command = result["Config"]["Cmd"] return image_command + def should_proxy_request(self, headers: Headers) -> bool: + # determine if this is a gRPC request targeting TypeDB + content_type = headers.get("content-type") or "" + req_path = headers.get(":path") or "" + is_typedb_grpc_request = ( + "grpc" in content_type and "/typedb.protocol.TypeDB" in req_path + ) + return is_typedb_grpc_request + def request_to_port_router(self, request: Request) -> int: # TODO add REST API / gRPC routing based on request return 1729 diff --git a/typedb/localstack_typedb/utils/docker.py b/typedb/localstack_typedb/utils/docker.py index 8916418..bac92f6 100644 --- a/typedb/localstack_typedb/utils/docker.py +++ b/typedb/localstack_typedb/utils/docker.py @@ -19,7 +19,6 @@ from rolo import route from rolo.proxy import Proxy from rolo.routing import RuleAdapter, WithHost -from werkzeug.datastructures import Headers LOG = logging.getLogger(__name__) logging.getLogger("localstack_typedb").setLevel( @@ -109,15 +108,6 @@ def _get_container_name(self) -> str: name = re.sub(r"\W", "-", name) return name - def should_proxy_request(self, headers: Headers) -> bool: - # determine if this is a gRPC request targeting TypeDB - content_type = headers.get("content-type") or "" - req_path = headers.get(":path") or "" - is_typedb_grpc_request = ( - "grpc" in content_type and "/typedb.protocol.TypeDB" in req_path - ) - return is_typedb_grpc_request - @cache def start_container(self) -> None: container_name = self._get_container_name() From ac080454dc8117775d664354b8633b3b09c2a173 Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Mon, 15 Dec 2025 10:31:34 +0000 Subject: [PATCH 06/18] typedb 3.7.2 has no Cmd in its config, using only an entrypoint instead --- typedb/localstack_typedb/extension.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/typedb/localstack_typedb/extension.py b/typedb/localstack_typedb/extension.py index d681516..226778c 100644 --- a/typedb/localstack_typedb/extension.py +++ b/typedb/localstack_typedb/extension.py @@ -28,22 +28,16 @@ class TypeDbExtension(ProxiedDockerContainerExtension): def __init__(self): command_flags = (os.environ.get(ENV_CMD_FLAGS) or "").strip() command_flags = self.DEFAULT_CMD_FLAGS + shlex.split(command_flags) - command = self._get_image_command() + command_flags http2_ports = [self.TYPEDB_PORT] if is_env_not_false(ENV_HTTP2_PROXY) else [] super().__init__( image_name=self.DOCKER_IMAGE, container_ports=[8000, 1729], host=self.HOST, request_to_port_router=self.request_to_port_router, - command=command, + command=command_flags, http2_ports=http2_ports, ) - def _get_image_command(self) -> list[str]: - result = DOCKER_CLIENT.inspect_image(self.DOCKER_IMAGE) - image_command = result["Config"]["Cmd"] - return image_command - def should_proxy_request(self, headers: Headers) -> bool: # determine if this is a gRPC request targeting TypeDB content_type = headers.get("content-type") or "" From 5f29c50102e0eed400642c80a6722bea26a03bd7 Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Fri, 12 Dec 2025 12:46:07 +0000 Subject: [PATCH 07/18] typedb: Always set container_name in the constructor --- typedb/localstack_typedb/utils/docker.py | 32 ++++++++---------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/typedb/localstack_typedb/utils/docker.py b/typedb/localstack_typedb/utils/docker.py index bac92f6..8ddc55a 100644 --- a/typedb/localstack_typedb/utils/docker.py +++ b/typedb/localstack_typedb/utils/docker.py @@ -37,11 +37,9 @@ class ProxiedDockerContainerExtension(Extension, ProxyRequestMatcher): """ name: str - """Name of this extension""" + """Name of this extension, which must be overridden in a subclass.""" image_name: str """Docker image name""" - container_name: str | None - """Name of the Docker container spun up by the extension""" container_ports: list[int] """List of network ports of the Docker container spun up by the extension""" host: str | None @@ -65,7 +63,6 @@ def __init__( container_ports: list[int], host: str | None = None, path: str | None = None, - container_name: str | None = None, command: list[str] | None = None, request_to_port_router: Callable[[Request], int] | None = None, http2_ports: list[int] | None = None, @@ -74,7 +71,9 @@ def __init__( self.container_ports = container_ports self.host = host self.path = path - self.container_name = container_name + self.container_name = re.sub( + r"\W", "-", f"ls-ext-{self.name}" + ) self.command = command self.request_to_port_router = request_to_port_router self.http2_ports = http2_ports @@ -101,17 +100,9 @@ def update_gateway_routes(self, router: http.Router[http.RouteHandler]): def on_platform_shutdown(self): self._remove_container() - def _get_container_name(self) -> str: - if self.container_name: - return self.container_name - name = f"ls-ext-{self.name}" - name = re.sub(r"\W", "-", name) - return name - @cache def start_container(self) -> None: - container_name = self._get_container_name() - LOG.debug("Starting extension container %s", container_name) + LOG.debug("Starting extension container %s", self.container_name) ports = PortMappings() for port in self.container_ports: @@ -126,12 +117,12 @@ def start_container(self) -> None: self.image_name, detach=True, remove=True, - name=container_name, + name=self.container_name, ports=ports, **kwargs, ) except Exception as e: - LOG.debug("Failed to start container %s: %s", container_name, e) + LOG.debug("Failed to start container %s: %s", self.container_name, e) # allow running TypeDB in a local server in dev mode, if TYPEDB_DEV_MODE is enabled if not is_env_true("TYPEDB_DEV_MODE"): raise @@ -147,17 +138,16 @@ def _ping_endpoint(): try: retry(_ping_endpoint, retries=40, sleep=1) except Exception as e: - LOG.info("Failed to connect to container %s: %s", container_name, e) + LOG.info("Failed to connect to container %s: %s", self.container_name, e) self._remove_container() raise - LOG.debug("Successfully started extension container %s", container_name) + LOG.debug("Successfully started extension container %s", self.container_name) def _remove_container(self): - container_name = self._get_container_name() - LOG.debug("Stopping extension container %s", container_name) + LOG.debug("Stopping extension container %s", self.container_name) DOCKER_CLIENT.remove_container( - container_name, force=True, check_existence=False + self.container_name, force=True, check_existence=False ) From 01f0d7aa80c0eefaa19f35219c616ab21857fbf2 Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Fri, 12 Dec 2025 13:36:44 +0000 Subject: [PATCH 08/18] ProxyResource doesn't need to be passed the whole extension --- typedb/localstack_typedb/utils/docker.py | 29 ++++++++++++------------ 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/typedb/localstack_typedb/utils/docker.py b/typedb/localstack_typedb/utils/docker.py index 8ddc55a..ed126f4 100644 --- a/typedb/localstack_typedb/utils/docker.py +++ b/typedb/localstack_typedb/utils/docker.py @@ -68,6 +68,8 @@ def __init__( http2_ports: list[int] | None = None, ): self.image_name = image_name + if not container_ports: + raise ArgumentError("container_ports is required") self.container_ports = container_ports self.host = host self.path = path @@ -77,6 +79,8 @@ def __init__( self.command = command self.request_to_port_router = request_to_port_router self.http2_ports = http2_ports + self.main_port = self.container_ports[0] + self.container_host = get_addressable_container_host() def update_gateway_routes(self, router: http.Router[http.RouteHandler]): if self.path: @@ -86,7 +90,7 @@ def update_gateway_routes(self, router: http.Router[http.RouteHandler]): # note: for simplicity, starting the external container at startup - could be optimized over time ... self.start_container() # add resource for HTTP/1.1 requests - resource = RuleAdapter(ProxyResource(self)) + resource = RuleAdapter(ProxyResource(self.container_host, self.main_port)) if self.host: resource = WithHost(self.host, [resource]) router.add(resource) @@ -94,7 +98,7 @@ def update_gateway_routes(self, router: http.Router[http.RouteHandler]): # apply patches to serve HTTP/2 requests for port in self.http2_ports or []: apply_http2_patches_for_grpc_support( - get_addressable_container_host(), port, self + self.container_host, port, self ) def on_platform_shutdown(self): @@ -127,12 +131,9 @@ def start_container(self) -> None: if not is_env_true("TYPEDB_DEV_MODE"): raise - main_port = self.container_ports[0] - container_host = get_addressable_container_host() - def _ping_endpoint(): # TODO: allow defining a custom healthcheck endpoint ... - response = requests.get(f"http://{container_host}:{main_port}/") + response = requests.get(f"http://{self.container_host}:{self.main_port}/") assert response.ok try: @@ -157,21 +158,19 @@ class ProxyResource: LocalStack Gateway to the target Docker container. """ - extension: ProxiedDockerContainerExtension + host: str + port: int - def __init__(self, extension: ProxiedDockerContainerExtension): - self.extension = extension + def __init__(self, host: str, port: int): + self.host = host + self.port = port @route("/") def index(self, request: Request, path: str, *args, **kwargs): return self._proxy_request(request, forward_path=f"/{path}") def _proxy_request(self, request: Request, forward_path: str, *args, **kwargs): - self.extension.start_container() - - port = self.extension.container_ports[0] - container_host = get_addressable_container_host() - base_url = f"http://{container_host}:{port}" + base_url = f"http://{self.host}:{self.port}" proxy = Proxy(forward_base_url=base_url) # update content length (may have changed due to content compression) @@ -179,7 +178,7 @@ def _proxy_request(self, request: Request, forward_path: str, *args, **kwargs): request.headers["Content-Length"] = str(len(request.data)) # make sure we're forwarding the correct Host header - request.headers["Host"] = f"localhost:{port}" + request.headers["Host"] = f"localhost:{self.port}" # forward the request to the target result = proxy.forward(request, forward_path=forward_path) From af3baccce04ebab62be6b4338552072110032eb7 Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Fri, 12 Dec 2025 13:47:51 +0000 Subject: [PATCH 09/18] Improve class docstring --- typedb/localstack_typedb/utils/docker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/typedb/localstack_typedb/utils/docker.py b/typedb/localstack_typedb/utils/docker.py index ed126f4..18b8b6c 100644 --- a/typedb/localstack_typedb/utils/docker.py +++ b/typedb/localstack_typedb/utils/docker.py @@ -29,8 +29,8 @@ class ProxiedDockerContainerExtension(Extension, ProxyRequestMatcher): """ - Utility class to create a LocalStack Extension backed by a Docker container that exposes a service - on a network port (or several ports), with requests being proxied through the LocalStack gateway. + Utility class to create a LocalStack Extension which runs a Docker container that exposes a service + on one or more ports, with requests being proxied to that container through the LocalStack gateway. Requests may potentially use HTTP2 with binary content as the protocol (e.g., gRPC over HTTP2). To ensure proper routing of requests, subclasses can define the `http2_ports`. From 3ef7c976193ccfc3a2d8f5e130eb1de09cf84585 Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Fri, 12 Dec 2025 13:48:00 +0000 Subject: [PATCH 10/18] Rename var to reduce confusion --- typedb/localstack_typedb/utils/docker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/typedb/localstack_typedb/utils/docker.py b/typedb/localstack_typedb/utils/docker.py index 18b8b6c..f4460e2 100644 --- a/typedb/localstack_typedb/utils/docker.py +++ b/typedb/localstack_typedb/utils/docker.py @@ -108,9 +108,9 @@ def on_platform_shutdown(self): def start_container(self) -> None: LOG.debug("Starting extension container %s", self.container_name) - ports = PortMappings() + port_mapping = PortMappings() for port in self.container_ports: - ports.add(port) + port_mapping.add(port) kwargs = {} if self.command: @@ -122,7 +122,7 @@ def start_container(self) -> None: detach=True, remove=True, name=self.container_name, - ports=ports, + ports=port_mapping, **kwargs, ) except Exception as e: From a222ba54dfb99c69ab6de6b97f4db6fb42ef09ff Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Fri, 12 Dec 2025 14:11:34 +0000 Subject: [PATCH 11/18] Inline TcpForwarder.connect --- typedb/localstack_typedb/utils/h2_proxy.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/typedb/localstack_typedb/utils/h2_proxy.py b/typedb/localstack_typedb/utils/h2_proxy.py index 2beccca..cb24d8c 100644 --- a/typedb/localstack_typedb/utils/h2_proxy.py +++ b/typedb/localstack_typedb/utils/h2_proxy.py @@ -34,13 +34,8 @@ class TcpForwarder: def __init__(self, port: int, host: str = "localhost"): self.port = port self.host = host - self._socket = None - self.connect() - - def connect(self): - if not self._socket: - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.connect((self.host, self.port)) + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect((self.host, self.port)) def receive_loop(self, callback): while True: From 925f70426b1be694d35713b1390f4acf7544503a Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Fri, 12 Dec 2025 14:20:09 +0000 Subject: [PATCH 12/18] Simplify http2 frame and header parsing --- typedb/localstack_typedb/utils/h2_proxy.py | 16 +++++++--------- typedb/tests/test_extension.py | 6 +++--- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/typedb/localstack_typedb/utils/h2_proxy.py b/typedb/localstack_typedb/utils/h2_proxy.py index cb24d8c..0eac928 100644 --- a/typedb/localstack_typedb/utils/h2_proxy.py +++ b/typedb/localstack_typedb/utils/h2_proxy.py @@ -1,6 +1,7 @@ import logging import socket from abc import abstractmethod +from typing import Iterable from h2.frame_buffer import FrameBuffer from hpack import Decoder @@ -125,15 +126,13 @@ def connectionLost(fn, self, *args, **kwargs): forwarder.close() -def get_headers_from_data_stream(data_list: list[bytes]) -> Headers: +def get_headers_from_data_stream(data_list: Iterable[bytes]) -> Headers: """Get headers from a data stream (list of bytes data), if any headers are contained.""" - data_combined = b"".join(data_list) - frames = parse_http2_stream(data_combined) - headers = get_headers_from_frames(frames) - return headers + stream = b"".join(data_list) + return get_headers_from_frames(get_frames_from_http2_stream(stream)) -def get_headers_from_frames(frames: list[Frame]) -> Headers: +def get_headers_from_frames(frames: Iterable[Frame]) -> Headers: """Parse the given list of HTTP2 frames and return a dict of headers, if any""" result = {} decoder = Decoder() @@ -147,7 +146,7 @@ def get_headers_from_frames(frames: list[Frame]) -> Headers: return Headers(result) -def parse_http2_stream(data: bytes) -> list[Frame]: +def get_frames_from_http2_stream(data: bytes) -> Iterable[Frame]: """Parse the data from an HTTP2 stream into a list of frames""" frames = [] buffer = FrameBuffer(server=True) @@ -155,7 +154,6 @@ def parse_http2_stream(data: bytes) -> list[Frame]: buffer.add_data(data) try: for frame in buffer: - frames.append(frame) + yield frame except Exception: pass - return frames diff --git a/typedb/tests/test_extension.py b/typedb/tests/test_extension.py index 4d8c3bc..4bdd47f 100644 --- a/typedb/tests/test_extension.py +++ b/typedb/tests/test_extension.py @@ -1,6 +1,6 @@ import requests from localstack.utils.strings import short_uid -from localstack_typedb.utils.h2_proxy import parse_http2_stream, get_headers_from_frames +from localstack_typedb.utils.h2_proxy import get_frames_from_http2_stream, get_headers_from_frames from typedb.driver import TypeDB, Credentials, DriverOptions, TransactionType @@ -70,12 +70,12 @@ def test_connect_to_db_via_grpc_endpoint(): print(json) -def test_parse_http2_frames(): +def test_get_frames_from_http2_stream(): # note: the data below is a dump taken from a browser request made against the emulator data = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n\x00\x00\x18\x04\x00\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\x02\x00\x00\x00\x00\x00\x04\x00\x02\x00\x00\x00\x05\x00\x00@\x00\x00\x00\x04\x08\x00\x00\x00\x00\x00\x00\xbf\x00\x01" data += b"\x00\x01V\x01%\x00\x00\x00\x03\x00\x00\x00\x00\x15C\x87\xd5\xaf~MZw\x7f\x05\x8eb*\x0eA\xd0\x84\x8c\x9dX\x9c\xa3\xa13\xffA\x96\xa0\xe4\x1d\x13\x9d\t^\x83\x90t!#'U\xc9A\xed\x92\xe3M\xb8\xe7\x87z\xbe\xd0\x7ff\xa2\x81\xb0\xda\xe0S\xfa\xd02\x1a\xa4\x9d\x13\xfd\xa9\x92\xa4\x96\x854\x0c\x8aj\xdc\xa7\xe2\x81\x02\xe1o\xedK;\xdc\x0bM.\x0f\xedLE'S\xb0 \x04\x00\x08\x02\xa6\x13XYO\xe5\x80\xb4\xd2\xe0S\x83\xf9c\xe7Q\x8b-Kp\xdd\xf4Z\xbe\xfb@\x05\xdbP\x92\x9b\xd9\xab\xfaRB\xcb@\xd2_\xa5#\xb3\xe9OhL\x9f@\x94\x19\x08T!b\x1e\xa4\xd8z\x16\xb0\xbd\xad*\x12\xb5%L\xe7\x93\x83\xc5\x83\x7f@\x95\x19\x08T!b\x1e\xa4\xd8z\x16\xb0\xbd\xad*\x12\xb4\xe5\x1c\x85\xb1\x1f\x89\x1d\xa9\x9c\xf6\x1b\xd8\xd2c\xd5s\x95\x9d)\xad\x17\x18`u\xd6\xbd\x07 \xe8BFN\xab\x92\x83\xdb#\x1f@\x85=\x86\x98\xd5\x7f\x94\x9d)\xad\x17\x18`u\xd6\xbd\x07 \xe8BFN\xab\x92\x83\xdb'@\x8aAH\xb4\xa5I'ZB\xa1?\x84-5\xa7\xd7@\x8aAH\xb4\xa5I'Z\x93\xc8_\x83!\xecG@\x8aAH\xb4\xa5I'Y\x06I\x7f\x86@\xe9*\xc82K@\x86\xae\xc3\x1e\xc3'\xd7\x83\xb6\x06\xbf@\x82I\x7f\x86M\x835\x05\xb1\x1f\x00\x00\x04\x08\x00\x00\x00\x00\x03\x00\xbe\x00\x00" - frames = parse_http2_stream(data) + frames = get_frames_from_http2_stream(data) assert frames headers = get_headers_from_frames(frames) assert headers From 0ec4a672bc1daffbd871d08da7677d9d1be68eb6 Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Mon, 15 Dec 2025 10:53:04 +0000 Subject: [PATCH 13/18] No need to pass empty data to callback --- typedb/localstack_typedb/utils/h2_proxy.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/typedb/localstack_typedb/utils/h2_proxy.py b/typedb/localstack_typedb/utils/h2_proxy.py index 0eac928..d069b45 100644 --- a/typedb/localstack_typedb/utils/h2_proxy.py +++ b/typedb/localstack_typedb/utils/h2_proxy.py @@ -39,11 +39,8 @@ def __init__(self, port: int, host: str = "localhost"): self._socket.connect((self.host, self.port)) def receive_loop(self, callback): - while True: - data = self._socket.recv(self.buffer_size) + while data := self._socket.recv(self.buffer_size): callback(data) - if not data: - break def send(self, data): self._socket.sendall(data) From 229b23ca536f4a751d44a8bb59c876148e22f5aa Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Mon, 15 Dec 2025 17:14:22 +0000 Subject: [PATCH 14/18] Refactor HTTP2->GRPC forwarding for clarity --- typedb/localstack_typedb/utils/h2_proxy.py | 92 ++++++++++------------ 1 file changed, 42 insertions(+), 50 deletions(-) diff --git a/typedb/localstack_typedb/utils/h2_proxy.py b/typedb/localstack_typedb/utils/h2_proxy.py index d069b45..8a4d9fe 100644 --- a/typedb/localstack_typedb/utils/h2_proxy.py +++ b/typedb/localstack_typedb/utils/h2_proxy.py @@ -63,64 +63,56 @@ def apply_http2_patches_for_grpc_support( Note: this is a very brute-force approach and needs to be fixed/enhanced over time! """ + class ForwardingBuffer: + """ + A buffer atop the HTTP2 client connection, that will hold + data until the ProxyRequestMatcher tells us whether to send it + to the backend, or leave it to the default handler. + """ + def __init__(self, http_response_stream): + self.http_response_stream = http_response_stream + LOG.debug(f"Starting TCP forwarder to port {target_port} for new HTTP2 connection") + self.backend = TcpForwarder(target_port, host=target_host) + self.buffer = [] + self.proxying = False + reactor.getThreadPool().callInThread(self.backend.receive_loop, self.received_from_backend) + + def received_from_backend(self, data): + LOG.debug(f"Received {len(data)} bytes from backend") + self.http_response_stream.write(data) + + def received_from_http2_client(self, data, default_handler): + if self.proxying: + assert not self.buffer + # Keep sending data to the backend for the lifetime of this connection + self.backend.send(data) + else: + self.buffer.append(data) + if headers := get_headers_from_data_stream(self.buffer): + self.proxying = request_matcher.should_proxy_request(headers) + # Now we know what to do with the buffer + buffered_data = b"".join(self.buffer) + self.buffer = [] + if self.proxying: + LOG.debug(f"Forwarding {len(buffered_data)} bytes to backend") + self.backend.send(buffered_data) + else: + return default_handler(buffered_data) + + def close(self): + self.backend.close() + @patch(H2Connection.connectionMade) def _connectionMade(fn, self, *args, **kwargs): - def _process(data): - LOG.debug("Received data (%s bytes) from upstream HTTP2 server", len(data)) - self.transport.write(data) - - # TODO: make port configurable - self._ls_forwarder = TcpForwarder(target_port, host=target_host) - LOG.debug( - "Starting TCP forwarder to port %s for new HTTP2 connection", target_port - ) - reactor.getThreadPool().callInThread(self._ls_forwarder.receive_loop, _process) + self._ls_forwarding_buffer = ForwardingBuffer(self.transport) @patch(H2Connection.dataReceived) def _dataReceived(fn, self, data, *args, **kwargs): - forwarder = getattr(self, "_ls_forwarder", None) - should_proxy_request = getattr(self, "_ls_should_proxy_request", None) - if not forwarder or should_proxy_request is False: - return fn(self, data, *args, **kwargs) - - if should_proxy_request: - forwarder.send(data) - return - - setattr(self, "_data_received", getattr(self, "_data_received", [])) - self._data_received.append(data) - - # parse headers from request frames received so far - headers = get_headers_from_data_stream(self._data_received) - if not headers: - # if no headers received yet, then return (method will be called again for next chunk of data) - return - - # check if the incoming request should be proxies, based on the request headers - self._ls_should_proxy_request = request_matcher.should_proxy_request(headers) - - if not self._ls_should_proxy_request: - # if this is not a target request, then call the upstream function - result = None - for chunk in self._data_received: - result = fn(self, chunk, *args, **kwargs) - self._data_received = [] - return result - - # forward data chunks to the target - for chunk in self._data_received: - LOG.debug( - "Forwarding data (%s bytes) from HTTP2 client to server", len(chunk) - ) - forwarder.send(chunk) - self._data_received = [] + self._ls_forwarding_buffer.received_from_http2_client(data, lambda d: fn(d, *args, **kwargs)) @patch(H2Connection.connectionLost) def connectionLost(fn, self, *args, **kwargs): - forwarder = getattr(self, "_ls_forwarder", None) - if not forwarder: - return fn(self, *args, **kwargs) - forwarder.close() + self._ls_forwarding_buffer.close() def get_headers_from_data_stream(data_list: Iterable[bytes]) -> Headers: From bc8b6014b7f4fe62a6869c0ab13b520731b4ab73 Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Mon, 15 Dec 2025 17:14:57 +0000 Subject: [PATCH 15/18] Prefer format strings for debug log --- typedb/localstack_typedb/utils/h2_proxy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/typedb/localstack_typedb/utils/h2_proxy.py b/typedb/localstack_typedb/utils/h2_proxy.py index 8a4d9fe..14a815c 100644 --- a/typedb/localstack_typedb/utils/h2_proxy.py +++ b/typedb/localstack_typedb/utils/h2_proxy.py @@ -27,7 +27,7 @@ def should_proxy_request(self, headers: Headers) -> bool: class TcpForwarder: - """Simple helper class for bidirectional forwarding of TPC traffic.""" + """Simple helper class for bidirectional forwarding of TCP traffic.""" buffer_size: int = 1024 """Data buffer size for receiving data from upstream socket.""" @@ -46,7 +46,7 @@ def send(self, data): self._socket.sendall(data) def close(self): - LOG.debug("Closing connection to upstream HTTP2 server on port %s", self.port) + LOG.debug(f"Closing connection to upstream HTTP2 server on port {self.port}") try: self._socket.shutdown(socket.SHUT_RDWR) self._socket.close() From 5841ec55a64eea3ee239fefafec1588295c1c9e1 Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Mon, 15 Dec 2025 17:42:23 +0000 Subject: [PATCH 16/18] Ensure apply_http2_patches_for_grpc_support isn't called twice --- typedb/localstack_typedb/utils/h2_proxy.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/typedb/localstack_typedb/utils/h2_proxy.py b/typedb/localstack_typedb/utils/h2_proxy.py index 14a815c..531f360 100644 --- a/typedb/localstack_typedb/utils/h2_proxy.py +++ b/typedb/localstack_typedb/utils/h2_proxy.py @@ -55,6 +55,8 @@ def close(self): pass +patched_connection = False + def apply_http2_patches_for_grpc_support( target_host: str, target_port: int, request_matcher: ProxyRequestMatcher ): @@ -62,6 +64,10 @@ def apply_http2_patches_for_grpc_support( Apply some patches to proxy incoming gRPC requests and forward them to a target port. Note: this is a very brute-force approach and needs to be fixed/enhanced over time! """ + LOG.debug(f"Enabling proxying to backend {target_host}:{target_port}") + global patched_connection + assert not patched_connection, "It is not safe to patch H2Connection twice with this function" + patched_connection = True class ForwardingBuffer: """ From 1a4a66557c401cca5e118b1da3d70bf4ac03dce5 Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Tue, 16 Dec 2025 09:36:58 +0000 Subject: [PATCH 17/18] Use a plain callable type instead of the ProxyRequestMatcher base class --- typedb/localstack_typedb/utils/docker.py | 10 ++++++++-- typedb/localstack_typedb/utils/h2_proxy.py | 18 ++++-------------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/typedb/localstack_typedb/utils/docker.py b/typedb/localstack_typedb/utils/docker.py index f4460e2..2c8b03c 100644 --- a/typedb/localstack_typedb/utils/docker.py +++ b/typedb/localstack_typedb/utils/docker.py @@ -1,5 +1,6 @@ import re import logging +from abc import abstractmethod from functools import cache from typing import Callable import requests @@ -19,6 +20,7 @@ from rolo import route from rolo.proxy import Proxy from rolo.routing import RuleAdapter, WithHost +from werkzeug.datastructures import Headers LOG = logging.getLogger(__name__) logging.getLogger("localstack_typedb").setLevel( @@ -27,7 +29,7 @@ logging.basicConfig() -class ProxiedDockerContainerExtension(Extension, ProxyRequestMatcher): +class ProxiedDockerContainerExtension(Extension): """ Utility class to create a LocalStack Extension which runs a Docker container that exposes a service on one or more ports, with requests being proxied to that container through the LocalStack gateway. @@ -98,9 +100,13 @@ def update_gateway_routes(self, router: http.Router[http.RouteHandler]): # apply patches to serve HTTP/2 requests for port in self.http2_ports or []: apply_http2_patches_for_grpc_support( - self.container_host, port, self + self.container_host, port, self.should_proxy_request ) + @abstractmethod + def should_proxy_request(self, headers: Headers) -> bool: + """Define whether a request should be proxied, based on request headers.""" + def on_platform_shutdown(self): self._remove_container() diff --git a/typedb/localstack_typedb/utils/h2_proxy.py b/typedb/localstack_typedb/utils/h2_proxy.py index 531f360..ee533c1 100644 --- a/typedb/localstack_typedb/utils/h2_proxy.py +++ b/typedb/localstack_typedb/utils/h2_proxy.py @@ -1,7 +1,6 @@ import logging import socket -from abc import abstractmethod -from typing import Iterable +from typing import Iterable, Callable from h2.frame_buffer import FrameBuffer from hpack import Decoder @@ -15,16 +14,7 @@ LOG = logging.getLogger(__name__) -class ProxyRequestMatcher: - """ - Abstract base class that defines a request matcher, for an extension to define which incoming - request messages should be proxied to an upstream target (and which ones shouldn't). - """ - - @abstractmethod - def should_proxy_request(self, headers: Headers) -> bool: - """Define whether a request should be proxied, based on request headers.""" - +ProxyRequestMatcher = Callable[[Headers], bool] class TcpForwarder: """Simple helper class for bidirectional forwarding of TCP traffic.""" @@ -58,7 +48,7 @@ def close(self): patched_connection = False def apply_http2_patches_for_grpc_support( - target_host: str, target_port: int, request_matcher: ProxyRequestMatcher + target_host: str, target_port: int, should_proxy_request: ProxyRequestMatcher ): """ Apply some patches to proxy incoming gRPC requests and forward them to a target port. @@ -95,7 +85,7 @@ def received_from_http2_client(self, data, default_handler): else: self.buffer.append(data) if headers := get_headers_from_data_stream(self.buffer): - self.proxying = request_matcher.should_proxy_request(headers) + self.proxying = should_proxy_request(headers) # Now we know what to do with the buffer buffered_data = b"".join(self.buffer) self.buffer = [] From b40a652b232774938c4ffd8cac4834c5826b5107 Mon Sep 17 00:00:00 2001 From: Steve Purcell Date: Tue, 16 Dec 2025 10:20:01 +0000 Subject: [PATCH 18/18] Fix homepage link --- typedb/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/typedb/pyproject.toml b/typedb/pyproject.toml index 71e348f..0141906 100644 --- a/typedb/pyproject.toml +++ b/typedb/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ ] [project.urls] -Homepage = "https://github.com/whummer/localstack-utils" +Homepage = "https://github.com/localstack/localstack-extensions" [project.optional-dependencies] dev = [