Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ with AxmeClient(config) as client:
)
print(result)
print(client.get_intent(result["intent_id"])["intent"]["status"])
sent_intent_id = client.send_intent(
{
"intent_type": "notify.message.v1",
"from_agent": "agent://example/sender",
"to_agent": "agent://example/receiver",
"payload": {"text": "hello again"},
},
idempotency_key="send-intent-001",
)
print(sent_intent_id)
print(client.list_intent_events(sent_intent_id, since=0))
for event in client.observe(sent_intent_id, since=0, wait_seconds=10):
print(event["event_type"], event["status"])
if event["status"] in {"COMPLETED", "FAILED", "CANCELED"}:
break
terminal = client.wait_for(sent_intent_id, timeout_seconds=30)
print(terminal["status"])
inbox = client.list_inbox(owner_agent="agent://example/receiver", trace_id="trace-inbox-001")
print(inbox)
thread = client.get_inbox_thread("11111111-1111-4111-8111-111111111111", owner_agent="agent://example/receiver")
Expand Down
213 changes: 212 additions & 1 deletion axme_sdk/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

from dataclasses import dataclass
import json
import time
from typing import Any, Callable
from typing import Any, Callable, Iterator
from uuid import uuid4

import httpx
Expand Down Expand Up @@ -83,6 +84,154 @@ def create_intent(
def get_intent(self, intent_id: str, *, trace_id: str | None = None) -> dict[str, Any]:
return self._request_json("GET", f"/v1/intents/{intent_id}", trace_id=trace_id, retryable=True)

def send_intent(
self,
payload: dict[str, Any],
*,
correlation_id: str | None = None,
idempotency_key: str | None = None,
trace_id: str | None = None,
) -> str:
created = self.create_intent(
payload,
correlation_id=correlation_id or str(uuid4()),
idempotency_key=idempotency_key,
trace_id=trace_id,
)
intent_id = created.get("intent_id")
if not isinstance(intent_id, str) or not intent_id:
raise ValueError("create_intent response does not include string intent_id")
return intent_id

def list_intent_events(
self,
intent_id: str,
*,
since: int | None = None,
trace_id: str | None = None,
) -> dict[str, Any]:
params: dict[str, str] | None = None
if since is not None:
if since < 0:
raise ValueError("since must be >= 0")
params = {"since": str(since)}
return self._request_json(
"GET",
f"/v1/intents/{intent_id}/events",
params=params,
trace_id=trace_id,
retryable=True,
)

def resolve_intent(
self,
intent_id: str,
payload: dict[str, Any],
*,
idempotency_key: str | None = None,
trace_id: str | None = None,
) -> dict[str, Any]:
return self._request_json(
"POST",
f"/v1/intents/{intent_id}/resolve",
json_body=payload,
idempotency_key=idempotency_key,
trace_id=trace_id,
retryable=idempotency_key is not None,
)

def observe(
self,
intent_id: str,
*,
since: int = 0,
wait_seconds: int = 15,
poll_interval_seconds: float = 1.0,
timeout_seconds: float | None = None,
trace_id: str | None = None,
) -> Iterator[dict[str, Any]]:
if since < 0:
raise ValueError("since must be >= 0")
if wait_seconds < 1:
raise ValueError("wait_seconds must be >= 1")
if poll_interval_seconds < 0:
raise ValueError("poll_interval_seconds must be >= 0")
if timeout_seconds is not None and timeout_seconds <= 0:
raise ValueError("timeout_seconds must be > 0 when provided")

deadline = (time.monotonic() + timeout_seconds) if timeout_seconds is not None else None
next_since = since

while True:
if deadline is not None and time.monotonic() >= deadline:
raise TimeoutError(f"timed out while observing intent {intent_id}")

stream_wait_seconds = wait_seconds
if deadline is not None:
seconds_left = max(0.0, deadline - time.monotonic())
if seconds_left <= 0:
raise TimeoutError(f"timed out while observing intent {intent_id}")
stream_wait_seconds = max(1, min(wait_seconds, int(seconds_left)))

try:
for event in self._iter_intent_events_stream(
intent_id=intent_id,
since=next_since,
wait_seconds=stream_wait_seconds,
trace_id=trace_id,
):
next_since = _max_seen_seq(next_since=next_since, event=event)
yield event
if _is_terminal_intent_event(event):
return
except AxmeHttpError as exc:
if exc.status_code not in {404, 405, 501}:
raise

polled = self.list_intent_events(
intent_id,
since=next_since if next_since > 0 else None,
trace_id=trace_id,
)
events = polled.get("events")
if not isinstance(events, list):
raise AxmeHttpError(502, "invalid intent events payload: events must be list", body=polled)
if not events:
if deadline is not None and time.monotonic() >= deadline:
raise TimeoutError(f"timed out while observing intent {intent_id}")
time.sleep(poll_interval_seconds)
continue

for event in events:
if not isinstance(event, dict):
continue
next_since = _max_seen_seq(next_since=next_since, event=event)
yield event
if _is_terminal_intent_event(event):
return

def wait_for(
self,
intent_id: str,
*,
since: int = 0,
wait_seconds: int = 15,
poll_interval_seconds: float = 1.0,
timeout_seconds: float | None = None,
trace_id: str | None = None,
) -> dict[str, Any]:
for event in self.observe(
intent_id,
since=since,
wait_seconds=wait_seconds,
poll_interval_seconds=poll_interval_seconds,
timeout_seconds=timeout_seconds,
trace_id=trace_id,
):
if _is_terminal_intent_event(event):
return event
raise RuntimeError(f"intent observation finished without terminal event for {intent_id}")

def list_inbox(self, *, owner_agent: str | None = None, trace_id: str | None = None) -> dict[str, Any]:
params: dict[str, str] | None = None
if owner_agent is not None:
Expand Down Expand Up @@ -598,6 +747,53 @@ def _request_json(

raise RuntimeError("unreachable retry loop state")

def _iter_intent_events_stream(
self,
*,
intent_id: str,
since: int,
wait_seconds: int,
trace_id: str | None,
) -> Iterator[dict[str, Any]]:
headers: dict[str, str] | None = None
normalized_trace_id = self._normalize_trace_id(trace_id)
if normalized_trace_id is not None:
headers = {"X-Trace-Id": normalized_trace_id}

with self._http.stream(
"GET",
f"/v1/intents/{intent_id}/events/stream",
params={"since": str(since), "wait_seconds": str(wait_seconds)},
headers=headers,
) as response:
if response.status_code >= 400:
self._raise_http_error(response)

current_event: str | None = None
data_lines: list[str] = []
for line in response.iter_lines():
if line == "":
if current_event == "stream.timeout":
return
if current_event and data_lines:
try:
payload = json.loads("\n".join(data_lines))
except ValueError:
payload = None
if isinstance(payload, dict) and current_event.startswith("intent."):
yield payload
current_event = None
data_lines = []
continue
if line.startswith(":"):
continue
if line.startswith("event:"):
current_event = line.partition(":")[2].strip()
continue
if line.startswith("data:"):
data_lines.append(line.partition(":")[2].lstrip())
continue

def _mcp_request(
self,
*,
Expand Down Expand Up @@ -780,3 +976,18 @@ def _matches_json_type(*, value: Any, accepted_types: list[str]) -> bool:
if type_name == "array" and isinstance(value, list):
return True
return False


def _max_seen_seq(*, next_since: int, event: dict[str, Any]) -> int:
raw_seq = event.get("seq")
if isinstance(raw_seq, int) and raw_seq >= 0:
return max(next_since, raw_seq)
return next_since


def _is_terminal_intent_event(event: dict[str, Any]) -> bool:
status = event.get("status")
if isinstance(status, str) and status in {"COMPLETED", "FAILED", "CANCELED"}:
return True
event_type = event.get("event_type")
return isinstance(event_type, str) and event_type in {"intent.completed", "intent.failed", "intent.canceled"}
Loading