Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,40 @@ async def _start_cloud_agent(
assert self._api_url is not None, "api_url is not set"
assert self._api_secret is not None, "api_secret is not set"

# Determine if using custom API endpoint (not the default BitHuman auth API)
# Custom endpoints use multipart/form-data format for direct avatar worker requests
is_custom_endpoint = not self._is_default_api_url()

if is_custom_endpoint:
# Use FormData format for custom endpoints (e.g., gpu-avatar-worker, cerebrium)
await self._send_formdata_request(livekit_url, livekit_token, room_name)
else:
# Use JSON format for default BitHuman API
await self._send_json_request(livekit_url, livekit_token, room_name)

def _is_default_api_url(self) -> bool:
"""
Check if using the default BitHuman API URL.

Returns:
True if using default auth.api.bithuman.ai endpoint, False otherwise.
"""
if self._api_url is None:
return True
default_domains = ["auth.api.bithuman.ai", "api.bithuman.ai"]
return any(domain in self._api_url for domain in default_domains)

async def _send_json_request(
self, livekit_url: str, livekit_token: str, room_name: str
) -> None:
"""
Send request using JSON format (for default BitHuman API).

Args:
livekit_url: LiveKit server URL
livekit_token: JWT token for room access
room_name: Name of the LiveKit room
"""
# Prepare JSON data
json_data = {
"livekit_url": livekit_url,
Expand All @@ -320,17 +354,15 @@ async def _start_cloud_agent(
else "cpu",
}

# Handle avatar image
# Handle avatar image - convert to base64 for JSON serialization
if isinstance(self._avatar_image, Image.Image):
img_byte_arr = io.BytesIO()
self._avatar_image.save(img_byte_arr, format="JPEG", quality=95)
img_byte_arr.seek(0)
# Convert to base64 for JSON serialization
import base64

json_data["image"] = base64.b64encode(img_byte_arr.getvalue()).decode("utf-8")
elif isinstance(self._avatar_image, bytes):
# Convert bytes to base64 for JSON serialization
import base64

json_data["image"] = base64.b64encode(self._avatar_image).decode("utf-8")
Expand All @@ -340,15 +372,131 @@ async def _start_cloud_agent(
if utils.is_given(self._avatar_id):
json_data["agent_id"] = self._avatar_id

headers = {
"Content-Type": "application/json",
"api-secret": self._api_secret,
}

await self._send_request_with_retry(
headers=headers,
json_data=json_data,
form_data=None,
)

async def _send_formdata_request(
self, livekit_url: str, livekit_token: str, room_name: str
) -> None:
"""
Send request using multipart/form-data format (for custom avatar worker endpoints).

This format is used for direct communication with avatar workers like:
- gpu-avatar-worker (FLOAT model)
- cpu-avatar-worker
- Cerebrium deployments

Args:
livekit_url: LiveKit server URL
livekit_token: JWT token for room access
room_name: Name of the LiveKit room
"""
# Build form data with required fields
form_data = aiohttp.FormData()
form_data.add_field("livekit_url", livekit_url)
form_data.add_field("livekit_token", livekit_token)
form_data.add_field("room_name", room_name)

# Handle avatar image - send as file upload or URL
if isinstance(self._avatar_image, Image.Image):
# Convert PIL Image to bytes and upload as file
img_byte_arr = io.BytesIO()
self._avatar_image.save(img_byte_arr, format="JPEG", quality=95)
img_byte_arr.seek(0)
form_data.add_field(
"avatar_image",
img_byte_arr,
filename="avatar.jpg",
content_type="image/jpeg",
)
elif isinstance(self._avatar_image, bytes):
# Upload raw bytes as file
img_byte_arr = io.BytesIO(self._avatar_image)
form_data.add_field(
"avatar_image",
img_byte_arr,
filename="avatar.jpg",
content_type="image/jpeg",
)
elif isinstance(self._avatar_image, str):
# String can be URL or base64 - check if it's a URL
if self._avatar_image.startswith(("http://", "https://")):
form_data.add_field("avatar_image_url", self._avatar_image)
else:
# Assume base64, decode and upload as file
try:
import base64

decoded_image = base64.b64decode(self._avatar_image)
img_byte_arr = io.BytesIO(decoded_image)
form_data.add_field(
"avatar_image",
img_byte_arr,
filename="avatar.jpg",
content_type="image/jpeg",
)
except Exception:
# If decode fails, treat as URL
form_data.add_field("avatar_image_url", self._avatar_image)

# Add avatar_id if provided
if utils.is_given(self._avatar_id):
form_data.add_field("avatar_id", self._avatar_id)

# Authorization header for custom endpoints uses api_token (Bearer token format)
# Note: api_token is different from api_secret - token is for direct API access,
# while secret is for BitHuman's authentication service
auth_token = self._api_token or self._api_secret
if auth_token is None:
raise BitHumanException(
"api_token or api_secret is required for custom endpoint requests. "
"Set BITHUMAN_API_TOKEN or BITHUMAN_API_SECRET environment variable."
)

headers = {
"Authorization": f"Bearer {auth_token}",
}

await self._send_request_with_retry(
headers=headers,
json_data=None,
form_data=form_data,
)

async def _send_request_with_retry(
self,
headers: dict[str, str],
json_data: dict | None = None,
form_data: aiohttp.FormData | None = None,
) -> None:
"""
Send HTTP request with retry logic.

Handles both JSON and FormData request formats with configurable retry behavior.

Args:
headers: HTTP headers to include in the request
json_data: JSON payload (mutually exclusive with form_data)
form_data: FormData payload (mutually exclusive with json_data)

Raises:
APIConnectionError: If all retry attempts fail
"""
for i in range(self._conn_options.max_retry):
try:
async with self._ensure_http_session().post(
self._api_url,
headers={
"Content-Type": "application/json",
"api-secret": self._api_secret,
},
headers=headers,
json=json_data,
data=form_data,
timeout=aiohttp.ClientTimeout(sock_connect=self._conn_options.timeout),
) as response:
if not response.ok:
Expand Down
Loading