Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,31 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- **Fail-closed driver execution.** A grouped pass over the invocation path so
the kernel's "controlled, audited execution" promise (I-02) holds on *every*
exit, not just the happy path:
- **Audit + budget release on any driver fault (#152).** `execute_with_fallback`
now treats *any* exception a driver raises — not only `DriverError` — as a
failed attempt, so an unexpected error is captured and surfaced instead of
escaping un-audited with the budget reservation leaked. The post-driver
pipeline in `perform_invoke` (handle creation, firewall transform, token
counting) is likewise wrapped: a fault there releases the reservation
exactly once and records a failure `ActionTrace` before re-raising.
- **Per-invocation deadline (#191).** An optional `invoke_timeout_s` token
constraint bounds each driver attempt (single-shot and streaming, including
a stream inactivity timeout) via `asyncio.wait_for`. A timeout becomes a
synthetic `DriverError`, so the existing fallback and failure-trace paths
apply unchanged. Signed into the token, so the deadline is tamper-evident
and bound to the grant. Defaults to off.
- **Pooled HTTP client + response-size guard (#194).** `HTTPDriver` holds a
single long-lived `httpx.AsyncClient` (connection pooling, configurable
`httpx.Limits`) instead of building one per request, with an `aclose()` for
shutdown. An optional `max_response_bytes` streams and aborts oversized
bodies with a `DriverError` before they are fully buffered.
- **Defensive HTTP body parsing (#197).** A JSON endpoint returning a
non-JSON body now raises a typed `DriverError` (with content-type and a
redaction-safe snippet) instead of leaking `json.JSONDecodeError`. A new
`HTTPEndpoint.response_format="text"` supports text APIs deliberately.
- **Context-firewall sizing, budgeting & summary fidelity.** A grouped pass over
how the firewall measures, bounds, and represents payloads:
- **Allocation-free size estimation (#207).** `firewall.estimated_size` walks
Expand Down
40 changes: 38 additions & 2 deletions docs/integrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,18 @@ asyncio.run(main())

## HTTPDriver

The built-in `HTTPDriver` supports GET, POST, PUT, DELETE:
The built-in `HTTPDriver` supports GET, POST, PUT, DELETE (and any other method
via the generic path):

```python
from weaver_kernel.drivers.http import HTTPDriver, HTTPEndpoint

driver = HTTPDriver(driver_id="my_api")
driver = HTTPDriver(
driver_id="my_api",
# Optional response-size guard: reject bodies larger than this before they
# are fully buffered, so an unbounded upstream cannot exhaust memory (#194).
max_response_bytes=5_000_000,
)
driver.register_endpoint("users.list", HTTPEndpoint(
url="https://api.example.com/users",
method="GET",
Expand All @@ -106,6 +112,36 @@ driver.register_endpoint("users.list", HTTPEndpoint(
kernel.register_driver(driver)
```

The driver holds a single long-lived `httpx.AsyncClient` so requests reuse the
connection pool and keep-alive instead of opening a fresh connection per call
(#194). You own its lifecycle — call `await driver.aclose()` on shutdown (e.g.
in a `finally` block) to release the pool.

A non-JSON body from a JSON endpoint raises a typed `DriverError` rather than
leaking a decode error (#197). For text APIs, set `response_format="text"` on
the endpoint to receive the decoded body verbatim:

```python
driver.register_endpoint("status.page", HTTPEndpoint(
url="https://api.example.com/status",
response_format="text",
))
```

### Bounding execution time

Any driver — HTTP, MCP, or custom — can be bounded by a per-invocation
deadline. Set the `invoke_timeout_s` constraint when the policy issues the
grant; because constraints are signed into the capability token, the deadline
is tamper-evident and travels with the grant. An attempt that exceeds it is
turned into a `DriverError`, so the kernel still records a failure trace and
releases any reserved budget (#191):

```python
# A policy engine that attaches a 10s deadline to issued tokens:
decision.constraints["invoke_timeout_s"] = 10.0
```

## Custom drivers

Any object implementing the `Driver` protocol can be registered:
Expand Down
4 changes: 3 additions & 1 deletion examples/http_driver_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def _start_server(port: int) -> HTTPServer:
async def main() -> None:
port = 18765
server = _start_server(port)
http_driver = HTTPDriver(driver_id="catalog_api")

try:
registry = CapabilityRegistry()
Expand All @@ -76,7 +77,6 @@ async def main() -> None:
)
)

http_driver = HTTPDriver(driver_id="catalog_api")
http_driver.register_endpoint(
"catalog.list_products",
HTTPEndpoint(url=f"http://127.0.0.1:{port}/products", method="GET"),
Expand Down Expand Up @@ -131,6 +131,8 @@ async def main() -> None:

print("\n✓ http_driver_demo.py complete.")
finally:
# The driver owns a pooled httpx client; close it on shutdown (#194).
await http_driver.aclose()
server.shutdown()


Expand Down
190 changes: 161 additions & 29 deletions src/weaver_kernel/drivers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

from __future__ import annotations

import json
from dataclasses import dataclass, field
from typing import Any
from typing import Any, Literal

import httpx

from ..errors import DriverError
from ..models import RawResult
from .base import ExecutionContext

_DEFAULT_LIMITS = httpx.Limits(max_connections=100, max_keepalive_connections=20)


@dataclass
class HTTPEndpoint:
Expand All @@ -21,14 +24,20 @@ class HTTPEndpoint:
headers: dict[str, str] = field(default_factory=dict)
timeout: float | None = None
"""Per-endpoint timeout in seconds. Falls back to the driver's ``default_timeout``."""
response_format: Literal["json", "text"] = "json"
"""How to read a successful body: parse as JSON (default) or keep it as text."""


class HTTPDriver:
"""A driver that invokes capabilities via HTTP using :mod:`httpx`.

Each operation must be registered with an :class:`HTTPEndpoint`.
The driver performs *synchronous* execution inside an async method by
using ``httpx.AsyncClient`` for proper async support.
Each operation must be registered with an :class:`HTTPEndpoint`. The driver
holds a single long-lived :class:`httpx.AsyncClient` so requests reuse the
connection pool and keep-alive instead of paying a fresh TLS handshake on
every call (#194); call :meth:`aclose` on shutdown to release it. Bodies are
size-bounded (``max_response_bytes``) and parsed defensively — a non-JSON
body from a JSON endpoint raises :class:`DriverError` rather than leaking a
raw decode error (#197).
"""

def __init__(
Expand All @@ -37,11 +46,16 @@ def __init__(
*,
base_headers: dict[str, str] | None = None,
default_timeout: float = 30.0,
limits: httpx.Limits | None = None,
max_response_bytes: int | None = None,
) -> None:
self._driver_id = driver_id
self._endpoints: dict[str, HTTPEndpoint] = {}
self._base_headers = base_headers or {}
self._default_timeout = default_timeout
self._limits = limits or _DEFAULT_LIMITS
self._max_response_bytes = max_response_bytes
self._client: httpx.AsyncClient | None = None

@property
def driver_id(self) -> str:
Expand All @@ -57,6 +71,31 @@ def register_endpoint(self, operation: str, endpoint: HTTPEndpoint) -> None:
"""
self._endpoints[operation] = endpoint

def _get_client(self) -> httpx.AsyncClient:
"""Return the shared client, creating it on first use.

Built lazily so the connection pool, default headers, and limits are
established once and reused across invocations (#194).
"""
if self._client is None:
self._client = httpx.AsyncClient(
headers=self._base_headers,
timeout=self._default_timeout,
limits=self._limits,
)
return self._client

async def aclose(self) -> None:
"""Close the shared client and release its connection pool.

Idempotent — safe to call more than once. Callers that construct an
:class:`HTTPDriver` own its lifecycle and should call this on shutdown
(e.g. in a ``finally`` block or async-context teardown).
"""
if self._client is not None:
await self._client.aclose()
self._client = None

async def execute(self, ctx: ExecutionContext) -> RawResult:
"""Execute an HTTP request for the given context.

Expand All @@ -67,10 +106,13 @@ async def execute(self, ctx: ExecutionContext) -> RawResult:
ctx: The execution context.

Returns:
:class:`RawResult` containing the parsed JSON response.
:class:`RawResult` containing the parsed JSON response, or the raw
text when the endpoint's ``response_format`` is ``"text"``.

Raises:
DriverError: If the endpoint is not registered or the request fails.
DriverError: If the endpoint is not registered, the request fails,
the response exceeds ``max_response_bytes``, or a JSON endpoint
returns a body that is not valid JSON.
"""
operation = str(ctx.args.get("operation", ctx.capability_id))
endpoint = self._endpoints.get(operation)
Expand All @@ -79,47 +121,137 @@ async def execute(self, ctx: ExecutionContext) -> RawResult:
f"HTTPDriver '{self._driver_id}' has no endpoint for operation='{operation}'."
)

headers = {**self._base_headers, **endpoint.headers}
method = endpoint.method.upper()
params: dict[str, Any] = {}
json_body: dict[str, Any] | None = None

if endpoint.method.upper() in ("GET", "DELETE"):
if method in ("GET", "DELETE"):
params = {k: v for k, v in ctx.args.items() if k != "operation"}
else:
json_body = {k: v for k, v in ctx.args.items() if k != "operation"}

effective_timeout = (
endpoint.timeout if endpoint.timeout is not None else self._default_timeout
)
client = self._get_client()

try:
async with httpx.AsyncClient(headers=headers, timeout=effective_timeout) as client:
if endpoint.method.upper() == "GET":
response = await client.get(endpoint.url, params=params)
elif endpoint.method.upper() == "POST":
response = await client.post(endpoint.url, json=json_body)
elif endpoint.method.upper() == "PUT":
response = await client.put(endpoint.url, json=json_body)
elif endpoint.method.upper() == "DELETE":
response = await client.delete(endpoint.url, params=params)
else:
response = await client.request(
endpoint.method.upper(), endpoint.url, json=json_body
async with client.stream(
method,
endpoint.url,
params=params,
json=json_body,
headers=endpoint.headers,
timeout=effective_timeout,
) as response:
if response.is_error:
# Bound the error-body read too: an arbitrarily large error
# body must not be buffered just to build the message (#194).
snippet = await self._read_error_snippet(response)
raise DriverError(
f"HTTPDriver '{self._driver_id}': HTTP {response.status_code} "
f"from {endpoint.url}: {snippet}"
)
response.raise_for_status()
data: Any = response.json()
except httpx.HTTPStatusError as exc:
raise DriverError(
f"HTTPDriver '{self._driver_id}': HTTP {exc.response.status_code} "
f"from {endpoint.url}: {exc.response.text[:200]}"
) from exc
body = await self._read_bounded(response, url=endpoint.url)
status_code = response.status_code
content_type = response.headers.get("content-type", "")
except httpx.RequestError as exc:
raise DriverError(
f"HTTPDriver '{self._driver_id}': Request to {endpoint.url} failed: {exc}"
) from exc

data = self._decode_body(
body,
response_format=endpoint.response_format,
url=endpoint.url,
content_type=content_type,
)
return RawResult(
capability_id=ctx.capability_id,
data=data,
metadata={"status_code": response.status_code, "url": endpoint.url},
metadata={"status_code": status_code, "url": endpoint.url},
)

async def _read_error_snippet(self, response: httpx.Response, *, max_bytes: int = 512) -> str:
"""Read at most ``max_bytes`` of an error body for the failure message.

Streams and stops early so an oversized error body cannot be buffered in
full — the size guard must hold on the failure path too (#194). Only the
first 200 characters are surfaced in the error message.

Args:
response: The open streaming response (already known to be an error).
max_bytes: Hard cap on bytes read before giving up.

Returns:
A decoded, length-bounded snippet of the error body.
"""
chunks = bytearray()
async for chunk in response.aiter_bytes():
chunks.extend(chunk)
if len(chunks) >= max_bytes:
break
return bytes(chunks).decode("utf-8", "replace")[:200]

async def _read_bounded(self, response: httpx.Response, *, url: str) -> bytes:
"""Read the response body, aborting if it exceeds ``max_response_bytes``.

Streams chunks so an oversized upstream body is rejected before it is
fully buffered — the firewall's budget only applies *after* a
:class:`RawResult` exists, so the guard has to live here (#194).

Args:
response: The open streaming response.
url: The request URL, used in the error message.

Returns:
The full response body as bytes.

Raises:
DriverError: If the accumulated body exceeds ``max_response_bytes``.
"""
limit = self._max_response_bytes
if limit is None:
return await response.aread()
body = bytearray()
async for chunk in response.aiter_bytes():
body.extend(chunk)
if len(body) > limit:
raise DriverError(
f"HTTPDriver '{self._driver_id}': response from {url} exceeded "
f"max_response_bytes ({limit})."
)
return bytes(body)

def _decode_body(
self,
body: bytes,
*,
response_format: Literal["json", "text"],
url: str,
content_type: str,
) -> Any:
"""Decode a response body per the endpoint's ``response_format``.

Args:
body: The raw response bytes.
response_format: ``"json"`` to parse, ``"text"`` to decode as a string.
url: The request URL, used in the error message.
content_type: The response ``Content-Type``, used in the error message.

Returns:
The parsed JSON value (``None`` for an empty body), or the decoded text.

Raises:
DriverError: If ``response_format`` is ``"json"`` and the body is not
valid JSON (#197).
"""
if response_format == "text":
return body.decode("utf-8", "replace")
try:
return json.loads(body) if body else None
except (json.JSONDecodeError, ValueError) as exc:
snippet = body[:200].decode("utf-8", "replace")
raise DriverError(
f"HTTPDriver '{self._driver_id}': non-JSON response from {url} "
f"(content-type: {content_type or 'unknown'}): {snippet}"
) from exc
Loading
Loading