-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathregistration.py
More file actions
123 lines (107 loc) · 4.45 KB
/
registration.py
File metadata and controls
123 lines (107 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import json
import base64
import asyncio
import httpx
from agentex.lib.utils.logging import make_logger
from agentex.lib.environment_variables import EnvironmentVariables
logger = make_logger(__name__)
def get_auth_principal(env_vars: EnvironmentVariables):
if not env_vars.AUTH_PRINCIPAL_B64:
return None
try:
decoded_str = base64.b64decode(env_vars.AUTH_PRINCIPAL_B64).decode('utf-8')
return json.loads(decoded_str)
except Exception:
return None
def get_build_info():
build_info_path = os.environ.get("BUILD_INFO_PATH")
logger.info(f"Getting build info from {build_info_path}")
if not build_info_path:
return None
try:
with open(build_info_path, "r") as f:
return json.load(f)
except Exception:
return None
async def register_agent(env_vars: EnvironmentVariables, agent_card=None):
"""Register this agent with the Agentex server"""
if not env_vars.AGENTEX_BASE_URL:
logger.warning("AGENTEX_BASE_URL is not set, skipping registration")
return
# Build the agent's own URL
full_acp_url = f"{env_vars.ACP_URL.rstrip('/')}:{env_vars.ACP_PORT}"
description = (
env_vars.AGENT_DESCRIPTION
or f"Generic description for agent: {env_vars.AGENT_NAME}"
)
# Prepare registration data
registration_metadata = get_build_info()
if agent_card is not None:
card_data = agent_card.model_dump() if hasattr(agent_card, "model_dump") else agent_card
if registration_metadata is None:
registration_metadata = {}
registration_metadata["agent_card"] = card_data
registration_data = {
"name": env_vars.AGENT_NAME,
"description": description,
"acp_url": full_acp_url,
"acp_type": env_vars.ACP_TYPE,
"principal_context": get_auth_principal(env_vars),
"registration_metadata": registration_metadata,
}
if env_vars.AGENT_ID:
registration_data["agent_id"] = env_vars.AGENT_ID
if env_vars.AGENT_INPUT_TYPE:
registration_data["agent_input_type"] = env_vars.AGENT_INPUT_TYPE
# Make the registration request
registration_url = f"{env_vars.AGENTEX_BASE_URL.rstrip('/')}/agents/register"
# Retry logic with configurable attempts and delay
max_retries = 3
base_delay = 5 # seconds
last_exception = None
attempt = 0
while attempt < max_retries:
try:
async with httpx.AsyncClient() as client:
response = await client.post(
registration_url, json=registration_data, timeout=30.0
)
if response.status_code == 200:
agent = response.json()
agent_id, agent_name = agent["id"], agent["name"]
agent_api_key = agent["agent_api_key"]
os.environ["AGENT_ID"] = agent_id
os.environ["AGENT_NAME"] = agent_name
os.environ["AGENT_API_KEY"] = agent_api_key
env_vars.AGENT_ID = agent_id
env_vars.AGENT_NAME = agent_name
env_vars.AGENT_API_KEY = agent_api_key
global refreshed_environment_variables
refreshed_environment_variables = env_vars
logger.info(
f"Successfully registered agent '{env_vars.AGENT_NAME}' with Agentex server with acp_url: {full_acp_url}. Registration data: {registration_data}"
)
return # Success, exit the retry loop
else:
error_msg = f"Failed to register agent. Status: {response.status_code}, Response: {response.text}"
logger.error(error_msg)
last_exception = Exception(
f"Failed to startup agent: {response.text}"
)
except Exception as e:
logger.error(
f"Exception during agent registration attempt {attempt + 1}: {e}"
)
last_exception = e
attempt += 1
if attempt < max_retries:
delay = (attempt) * base_delay # 5, 10, 15 seconds
logger.info(
f"Retrying in {delay} seconds... (attempt {attempt}/{max_retries})"
)
await asyncio.sleep(delay)
# If we get here, all retries failed
raise last_exception or Exception(
f"Failed to register agent after {max_retries} attempts"
)