Skip to content
5 changes: 4 additions & 1 deletion src/agentex/lib/sdk/fastacp/base/base_acp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def __init__(self):
# Agent info to return in healthz
self.agent_id: str | None = None

# Optional agent card for registration metadata
self._agent_card: Any | None = None

@classmethod
def create(cls):
"""Create and initialize BaseACPServer instance"""
Expand All @@ -98,7 +101,7 @@ def get_lifespan_function(self):
async def lifespan_context(app: FastAPI): # noqa: ARG001
env_vars = EnvironmentVariables.refresh()
if env_vars.AGENTEX_BASE_URL:
await register_agent(env_vars)
await register_agent(env_vars, agent_card=self._agent_card)
self.agent_id = env_vars.AGENT_ID
else:
logger.warning("AGENTEX_BASE_URL not set, skipping agent registration")
Expand Down
18 changes: 14 additions & 4 deletions src/agentex/lib/sdk/fastacp/fastacp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os
import inspect
from typing import Literal
from typing import Any, Literal
from pathlib import Path
from typing_extensions import deprecated

Expand Down Expand Up @@ -88,7 +88,10 @@ def locate_build_info_path() -> None:

@staticmethod
def create(
acp_type: Literal["sync", "async", "agentic"], config: BaseACPConfig | None = None, **kwargs
acp_type: Literal["sync", "async", "agentic"],
config: BaseACPConfig | None = None,
agent_card: Any | None = None,
**kwargs,
) -> BaseACPServer | SyncACP | AsyncBaseACP | TemporalACP:
"""Main factory method to create any ACP type

Expand All @@ -102,10 +105,17 @@ def create(

if acp_type == "sync":
sync_config = config if isinstance(config, SyncACPConfig) else None
return FastACP.create_sync_acp(sync_config, **kwargs)
instance = FastACP.create_sync_acp(sync_config, **kwargs)
elif acp_type == "async" or acp_type == "agentic":
if config is None:
config = AsyncACPConfig(type="base")
if not isinstance(config, AsyncACPConfig):
raise ValueError("AsyncACPConfig is required for async/agentic ACP type")
return FastACP.create_async_acp(config, **kwargs)
instance = FastACP.create_async_acp(config, **kwargs)
else:
raise ValueError(f"Unknown acp_type: {acp_type}")

if agent_card is not None:
instance._agent_card = agent_card # type: ignore[attr-defined]

return instance
12 changes: 11 additions & 1 deletion src/agentex/lib/sdk/state_machine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
from agentex.lib.types.agent_card import AgentCard, AgentLifecycle, LifecycleState

from .state import State
from .noop_workflow import NoOpWorkflow
from .state_machine import StateMachine
from .state_workflow import StateWorkflow

__all__ = ["StateMachine", "StateWorkflow", "State", "NoOpWorkflow"]
__all__ = [
"StateMachine",
"StateWorkflow",
"State",
"NoOpWorkflow",
"AgentCard",
"AgentLifecycle",
"LifecycleState",
]
23 changes: 23 additions & 0 deletions src/agentex/lib/sdk/state_machine/state_machine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Generic, TypeVar

from agentex.lib import adk
Expand Down Expand Up @@ -129,6 +130,28 @@ async def reset_to_initial_state(self):
span.output = {"output_state": self._initial_state} # type: ignore[assignment,union-attr]
await adk.tracing.end_span(trace_id=self._task_id, span=span)

def get_lifecycle(self) -> dict[str, Any]:
"""Export the state machine's lifecycle as a dict suitable for AgentCard."""
states = []
for state in self._state_map.values():
workflow = state.workflow
states.append({
"name": state.name,
"description": workflow.description,
"waits_for_input": workflow.waits_for_input,
"accepts": list(workflow.accepts),
"transitions": [
t.value if isinstance(t, Enum) else str(t)
for t in workflow.transitions
],
})
initial: str = self._initial_state.value if isinstance(self._initial_state, Enum) else self._initial_state

return {
"states": states,
"initial_state": initial,
}

def dump(self) -> dict[str, Any]:
"""
Save the current state of the state machine to a serializable dictionary.
Expand Down
5 changes: 5 additions & 0 deletions src/agentex/lib/sdk/state_machine/state_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@


class StateWorkflow(ABC):
description: str = ""
waits_for_input: bool = False
accepts: list[str] = []
transitions: list[str] = []

@abstractmethod
async def execute(
self, state_machine: "StateMachine", state_machine_data: BaseModel | None = None
Expand Down
149 changes: 149 additions & 0 deletions src/agentex/lib/types/agent_card.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from __future__ import annotations

import types
import typing
from enum import Enum
from typing import TYPE_CHECKING, Any, get_args, get_origin

from pydantic import BaseModel

if TYPE_CHECKING:
from agentex.lib.sdk.state_machine.state import State


class LifecycleState(BaseModel):
name: str
description: str = ""
waits_for_input: bool = False
accepts: list[str] = []
transitions: list[str] = []


class AgentLifecycle(BaseModel):
states: list[LifecycleState]
initial_state: str
queries: list[str] = []


class AgentCard(BaseModel):
protocol: str = "acp"
lifecycle: AgentLifecycle | None = None
data_events: list[str] = []
input_types: list[str] = []
output_schema: dict | None = None

@classmethod
def from_states(
cls,
initial_state: str | Enum,
states: list[State],
output_event_model: type[BaseModel] | None = None,
extra_input_types: list[str] | None = None,
queries: list[str] | None = None,
) -> AgentCard:
"""Build an AgentCard directly from a list[State] + initial_state.

Agents can share their `states` list between the StateMachine and acp.py
without constructing a temporary StateMachine instance.
"""
lifecycle_states = [
LifecycleState(
name=state.name,
description=state.workflow.description,
waits_for_input=state.workflow.waits_for_input,
accepts=list(state.workflow.accepts),
transitions=[
t.value if isinstance(t, Enum) else str(t)
for t in state.workflow.transitions
],
)
for state in states
]

initial = initial_state.value if isinstance(initial_state, Enum) else initial_state

data_events: list[str] = []
output_schema: dict | None = None
if output_event_model:
data_events = extract_literal_values(output_event_model, "type")
output_schema = output_event_model.model_json_schema()

derived_input_types: set[str] = set()
for ls in lifecycle_states:
derived_input_types.update(ls.accepts)

return cls(
lifecycle=AgentLifecycle(
states=lifecycle_states,
initial_state=initial,
queries=queries or [],
),
data_events=data_events,
input_types=sorted(derived_input_types | set(extra_input_types or [])),
output_schema=output_schema,
)

@classmethod
def from_state_machine(
cls,
state_machine: Any,
output_event_model: type[BaseModel] | None = None,
extra_input_types: list[str] | None = None,
queries: list[str] | None = None,
) -> AgentCard:
"""Build an AgentCard from a StateMachine instance. Delegates to from_states()."""
lifecycle = state_machine.get_lifecycle()
states_data = lifecycle["states"]
initial = lifecycle["initial_state"]

# Reconstruct lightweight State-like objects from the lifecycle dict
# so we can reuse from_states logic via the dict path
data_events: list[str] = []
output_schema: dict | None = None
if output_event_model:
data_events = extract_literal_values(output_event_model, "type")
output_schema = output_event_model.model_json_schema()

derived_input_types: set[str] = set()
lifecycle_states = []
for s in states_data:
derived_input_types.update(s.get("accepts", []))
lifecycle_states.append(LifecycleState(
name=s["name"],
description=s.get("description", ""),
waits_for_input=s.get("waits_for_input", False),
accepts=s.get("accepts", []),
transitions=s.get("transitions", []),
))

return cls(
lifecycle=AgentLifecycle(
states=lifecycle_states,
initial_state=initial,
queries=queries or [],
),
data_events=data_events,
input_types=sorted(derived_input_types | set(extra_input_types or [])),
output_schema=output_schema,
)


def extract_literal_values(model: type[BaseModel], field: str) -> list[str]:
"""Extract allowed values from a Literal[...] type annotation on a Pydantic model field."""
field_info = model.model_fields.get(field)
if field_info is None:
return []

annotation = field_info.annotation
if annotation is None:
return []

# Unwrap Optional (Union[X, None] or PEP 604 X | None) to get the inner type
if get_origin(annotation) is typing.Union or isinstance(annotation, types.UnionType):
args = [a for a in get_args(annotation) if a is not type(None)]
annotation = args[0] if len(args) == 1 else annotation

if get_origin(annotation) is typing.Literal:
return list(get_args(annotation))

return []
11 changes: 9 additions & 2 deletions src/agentex/lib/utils/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_build_info():
except Exception:
return None

async def register_agent(env_vars: EnvironmentVariables):
async def register_agent(env_vars: EnvironmentVariables, agent_card=None):
"""Register this agent with the Agentex server"""
if not env_vars.AGENTEX_BASE_URL:
logger.warning("AGENTEX_BASE_URL is not set, skipping registration")
Expand All @@ -45,13 +45,20 @@ async def register_agent(env_vars: EnvironmentVariables):
)

# Prepare registration data
registration_metadata = get_build_info()
if agent_card is not None:
card_data = agent_card.model_dump() if hasattr(agent_card, "model_dump") else agent_card
if registration_metadata is None:
registration_metadata = {}
registration_metadata["agent_card"] = card_data

registration_data = {
"name": env_vars.AGENT_NAME,
"description": description,
"acp_url": full_acp_url,
"acp_type": env_vars.ACP_TYPE,
"principal_context": get_auth_principal(env_vars),
"registration_metadata": get_build_info()
"registration_metadata": registration_metadata,
}

if env_vars.AGENT_ID:
Expand Down
Loading
Loading