diff --git a/.github/workflows/build-oabctl.yml b/.github/workflows/build-oabctl.yml index 558fc2973..3e167fc5a 100644 --- a/.github/workflows/build-oabctl.yml +++ b/.github/workflows/build-oabctl.yml @@ -26,6 +26,10 @@ jobs: with: targets: ${{ inputs.target == 'linux-aarch64' && 'aarch64-unknown-linux-gnu' || '' }} + - uses: Swatinem/rust-cache@v2 + with: + workspaces: operator + - name: Install cross-compilation tools if: inputs.target == 'linux-aarch64' run: | diff --git a/operator/Cargo.toml b/operator/Cargo.toml index 3550bb313..71f414cd0 100644 --- a/operator/Cargo.toml +++ b/operator/Cargo.toml @@ -27,5 +27,8 @@ serde_yaml = "0.9" tokio = { version = "1.40", features = ["full"] } toml = "0.8" anyhow = "1.0" +base64 = "0.22" dirs = "6" rpassword = "7" + +[workspace] \ No newline at end of file diff --git a/operator/src/apply.rs b/operator/src/apply.rs index 794370179..51797f948 100644 --- a/operator/src/apply.rs +++ b/operator/src/apply.rs @@ -3,7 +3,7 @@ use crate::manifest::{OABFleetManifest, OABServiceManifest, RawManifest, Runtime use anyhow::{Context, Result}; use aws_sdk_ecs::types::{ AssignPublicIp, AwsVpcConfiguration, CapacityProviderStrategyItem, ContainerDefinition, - KeyValuePair, NetworkConfiguration, Secret, + KeyValuePair, LogConfiguration, LogDriver, NetworkConfiguration, Secret, }; use aws_sdk_s3::primitives::ByteStream; use std::path::Path; @@ -124,6 +124,9 @@ async fn apply_ecs( }; let service_name = m.ecs_service_name(); + let cluster = crate::config::OabConfig::load() + .map(|c| c.defaults.cluster) + .unwrap_or_else(|_| "oab".to_string()); let bucket = if let Some(b) = crate::config::OabConfig::load().ok().and_then(|c| c.bucket()) { b } else { @@ -164,13 +167,30 @@ async fn apply_ecs( KeyValuePair::builder().name("NAMESPACE").value(&m.metadata.namespace).build(), KeyValuePair::builder().name("NAME").value(&m.metadata.name).build(), ]; - if !m.spec.config_from.is_empty() { - env_vars.push(KeyValuePair::builder().name("CONFIG_S3_PATH").value(&m.spec.config_from).build()); - } if let Some(ref bootstrap) = m.spec.bootstrap_from { env_vars.push(KeyValuePair::builder().name("BOOTSTRAP_FROM").value(bootstrap).build()); } + // Read and embed config.toml as base64 env var + let has_config = !m.spec.config_from.is_empty(); + if has_config { + // Resolve config content: if S3 path, download; otherwise treat as local + let config_content = if let Some(s3_path) = m.spec.config_from.strip_prefix("s3://") { + let (bucket, key) = s3_path.split_once('/').context("invalid configFrom S3 URI")?; + let resp = s3.get_object().bucket(bucket).key(key).send().await + .context("failed to download config from S3")?; + resp.body.collect().await?.into_bytes().to_vec() + } else { + std::fs::read(&m.spec.config_from).context("failed to read local config file")? + }; + use base64::Engine; + let b64 = base64::engine::general_purpose::STANDARD.encode(&config_content); + if b64.len() > 8192 { + anyhow::bail!("config.toml too large for env injection ({} bytes encoded, max 8192). Use S3 sidecar pattern instead.", b64.len()); + } + env_vars.push(KeyValuePair::builder().name("CONFIG_B64").value(&b64).build()); + } + // 3. Build secrets from map let secrets: Vec = m .spec @@ -182,22 +202,60 @@ async fn apply_ecs( .collect(); // 4. Register task definition - let container = ContainerDefinition::builder() + let log_config = LogConfiguration::builder() + .log_driver(LogDriver::Awslogs) + .options("awslogs-group", "/oab/agents") + .options("awslogs-region", config.region().map(|r| r.as_ref()).unwrap_or("us-east-1")) + .options("awslogs-stream-prefix", &service_name) + .build() + .context("failed to build log configuration")?; + + let mut container_builder = ContainerDefinition::builder() .name("openab") .image(&m.spec.image) .essential(true) .set_environment(Some(env_vars)) .set_secrets(if secrets.is_empty() { None } else { Some(secrets) }) - .build(); + .log_configuration(log_config); + + if has_config { + container_builder = container_builder + .entry_point("sh") + .entry_point("-c") + .command("mkdir -p $HOME/.config/openab && echo $CONFIG_B64 | base64 -d > $HOME/.config/openab/config.toml && exec openab run -c $HOME/.config/openab/config.toml"); + } + + let container = container_builder.build(); + + // Resolve account ID for role ARNs + // NOTE: Role names must match those created by `oabctl bootstrap` + let sts = aws_sdk_sts::Client::new(config); + let account_id = sts.get_caller_identity().send().await? + .account().unwrap_or_default().to_string(); + let execution_role = format!("arn:aws:iam::{account_id}:role/oab-task-execution"); + let task_role = format!("arn:aws:iam::{account_id}:role/oab-task-role"); + + let iam = aws_sdk_iam::Client::new(config); + check_iam_role(&iam, "oab-task-execution").await?; + check_iam_role(&iam, "oab-task-role").await?; let task_def = ecs .register_task_definition() .family(&service_name) + .execution_role_arn(&execution_role) + .task_role_arn(&task_role) .requires_compatibilities(aws_sdk_ecs::types::Compatibility::Fargate) .network_mode(aws_sdk_ecs::types::NetworkMode::Awsvpc) .cpu(&m.spec.resources.cpu) .memory(&m.spec.resources.memory) .container_definitions(container) + .runtime_platform( + aws_sdk_ecs::types::RuntimePlatform::builder() + .operating_system_family(aws_sdk_ecs::types::OsFamily::Linux) + // TODO: make configurable via manifest spec.resources.arch for ARM64/Graviton + .cpu_architecture(aws_sdk_ecs::types::CpuArchitecture::X8664) + .build() + ) .send() .await .context("failed to register task definition")?; @@ -228,7 +286,7 @@ async fn apply_ecs( // Check if service exists let existing = ecs .describe_services() - .cluster("oab") + .cluster(&cluster) .services(&service_name) .send() .await; @@ -241,10 +299,11 @@ async fn apply_ecs( if service_active { ecs.update_service() - .cluster("oab") + .cluster(&cluster) .service(&service_name) .task_definition(&task_def_arn) .network_configuration(network_config) + .enable_execute_command(true) .send() .await .context("failed to update ECS service")?; @@ -256,10 +315,11 @@ async fn apply_ecs( .build()?; ecs.create_service() - .cluster("oab") + .cluster(&cluster) .service_name(&service_name) .task_definition(&task_def_arn) .desired_count(1) + .enable_execute_command(true) .capacity_provider_strategy(cap_strategy) .network_configuration(network_config) .send() @@ -273,30 +333,31 @@ async fn apply_ecs( if wait { eprintln!(" ⏳ Waiting for {} to stabilize...", m.metadata.name); - wait_for_stable(ecs, "oab", &service_name).await?; - eprintln!(" ✓ {} is stable", m.metadata.name); + ecsctl::apply::wait_for_stable(ecs, &cluster, &service_name).await?; } Ok(()) } -async fn wait_for_stable(ecs: &aws_sdk_ecs::Client, cluster: &str, service: &str) -> Result<()> { - for _ in 0..60 { - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - let resp = ecs.describe_services() - .cluster(cluster) - .services(service) - .send().await?; - if let Some(svc) = resp.services().first() { - let deployments = svc.deployments(); - if deployments.len() == 1 { - if let Some(d) = deployments.first() { - if d.running_count() == d.desired_count() && d.rollout_state() == Some(&aws_sdk_ecs::types::DeploymentRolloutState::Completed) { - return Ok(()); - } - } +/// Check IAM role existence. If AccessDenied, warn and proceed (caller may only have iam:PassRole). +/// Only fail hard on NoSuchEntity. +async fn check_iam_role(iam: &aws_sdk_iam::Client, role_name: &str) -> Result<()> { + use aws_sdk_iam::error::ProvideErrorMetadata; + match iam.get_role().role_name(role_name).send().await { + Ok(_) => Ok(()), + Err(e) => { + let code = e.as_service_error() + .and_then(|se| se.code()) + .unwrap_or_default(); + if code == "AccessDenied" || code == "AccessDeniedException" { + eprintln!(" ⚠ Cannot verify role '{}' (AccessDenied) — proceeding anyway", role_name); + Ok(()) + } else { + Err(anyhow::anyhow!( + "IAM role '{}' not found — run `oabctl bootstrap` first ({})", + role_name, code + )) } } } - anyhow::bail!("timed out waiting for service to stabilize (5 min)") -} +} \ No newline at end of file diff --git a/operator/src/bootstrap.rs b/operator/src/bootstrap.rs index ac3270bce..cbaccc789 100644 --- a/operator/src/bootstrap.rs +++ b/operator/src/bootstrap.rs @@ -355,7 +355,7 @@ async fn create(config: &aws_config::SdkConfig, imports: ImportOptions) -> Resul _ => { let resp = ec2.create_security_group() .group_name(SG_NAME) - .description("OAB agent containers — managed by oabctl bootstrap") + .description("OAB agent containers - managed by oabctl bootstrap") .vpc_id(&vid) .send().await .context("failed to create security group")?; diff --git a/operator/src/create.rs b/operator/src/create.rs index 77241286a..5ac231cae 100644 --- a/operator/src/create.rs +++ b/operator/src/create.rs @@ -1,20 +1,34 @@ use anyhow::{Context, Result}; use aws_sdk_ec2::Client as Ec2Client; +use aws_sdk_ec2::error::ProvideErrorMetadata; use aws_sdk_s3::Client as S3Client; use aws_sdk_secretsmanager::Client as SmClient; use std::io::{self, Write}; const BACKENDS: &[(&str, &str)] = &[ - ("kiro", "public.ecr.aws/oablab/kiro"), - ("claude-code", "public.ecr.aws/oablab/claude-code"), - ("codex", "public.ecr.aws/oablab/codex"), - ("gemini", "public.ecr.aws/oablab/gemini"), - ("copilot", "public.ecr.aws/oablab/copilot"), - ("opencode", "public.ecr.aws/oablab/opencode"), + ("kiro", "ghcr.io/openabdev/openab"), + ("claude-code", "ghcr.io/openabdev/openab-claude"), + ("codex", "ghcr.io/openabdev/openab-codex"), + ("gemini", "ghcr.io/openabdev/openab-gemini"), + ("copilot", "ghcr.io/openabdev/openab-copilot"), + ("opencode", "ghcr.io/openabdev/openab-opencode"), + ("hermes", "ghcr.io/openabdev/openab-hermes"), + ("grok", "ghcr.io/openabdev/openab-grok"), + ("cursor", "ghcr.io/openabdev/openab-cursor"), + ("mimocode", "ghcr.io/openabdev/openab-mimocode"), + ("antigravity", "ghcr.io/openabdev/openab-antigravity"), ]; const CHANNELS: &[&str] = &["stable", "beta"]; +const VALID_FARGATE_SIZING: &[(u32, &[u32])] = &[ + (256, &[512, 1024, 2048]), + (512, &[1024, 2048, 3072, 4096]), + (1024, &[2048, 3072, 4096, 5120, 6144, 7168, 8192]), + (2048, &[4096, 5120, 6144, 7168, 8192, 9216, 10240, 11264, 12288, 13312, 14336, 15360, 16384]), + (4096, &[8192, 9216, 10240, 11264, 12288, 13312, 14336, 15360, 16384, 17408, 18432, 19456, 20480, 21504, 22528, 23552, 24576, 25600, 26624, 27648, 28672, 29696, 30720]), +]; + pub async fn run(config: &aws_config::SdkConfig, name: &str, namespace: &str, auto_apply: bool) -> Result<()> { eprintln!("🤖 Creating agent: {name}\n"); @@ -61,7 +75,19 @@ pub async fn run(config: &aws_config::SdkConfig, name: &str, namespace: &str, au let cap = prompt_select("Capacity provider", &["FARGATE_SPOT (cost-optimized)", "FARGATE (on-demand)"])?; let capacity_provider = if cap.starts_with("FARGATE_SPOT") { "FARGATE_SPOT" } else { "FARGATE" }; - // 6. VPC + // 6. CPU/Memory sizing + let cpu_options: Vec = VALID_FARGATE_SIZING.iter().map(|(c, _)| c.to_string()).collect(); + let cpu_labels: Vec<&str> = cpu_options.iter().map(|s| s.as_str()).collect(); + let cpu_choice = prompt_select("CPU (units)", &cpu_labels)?; + let cpu: u32 = cpu_choice.parse().unwrap(); + + let mem_values = VALID_FARGATE_SIZING.iter().find(|(c, _)| *c == cpu).unwrap().1; + let mem_options: Vec = mem_values.iter().map(|m| m.to_string()).collect(); + let mem_labels: Vec<&str> = mem_options.iter().map(|s| s.as_str()).collect(); + let mem_choice = prompt_select("Memory (MiB)", &mem_labels)?; + let memory: u32 = mem_choice.parse().unwrap(); + + // 7. VPC let ec2 = Ec2Client::new(config); let vpcs = list_vpcs(&ec2).await?; if vpcs.is_empty() { @@ -71,7 +97,7 @@ pub async fn run(config: &aws_config::SdkConfig, name: &str, namespace: &str, au let vpc_choice = prompt_select("VPC", &vpc_labels)?; let vpc = vpcs.iter().find(|v| v.label == vpc_choice).unwrap(); - // 7. Subnets (auto-select: private+NAT > private > public, 2-3 AZ) + // 8. Subnets (auto-select: private+NAT > private > public, 2-3 AZ) let subnets = select_subnets(&ec2, &vpc.id).await?; eprintln!(" Subnets (auto-selected):"); for s in &subnets { @@ -79,26 +105,38 @@ pub async fn run(config: &aws_config::SdkConfig, name: &str, namespace: &str, au } eprintln!(); - // 8. Security group - let sgs = list_security_groups(&ec2, &vpc.id).await?; - let mut sg_labels: Vec = vec!["Create new (oab-{name})".to_string()]; - sg_labels.extend(sgs.iter().map(|s| format!("{} ({})", s.id, s.name))); - let sg_labels_ref: Vec<&str> = sg_labels.iter().map(|s| s.as_str()).collect(); - let sg_choice = prompt_select("Security group", &sg_labels_ref)?; - - let sg_id = if sg_choice.starts_with("Create new") { - let sg_name = format!("oab-{name}"); - let resp = ec2.create_security_group() - .group_name(&sg_name) - .description(format!("OAB agent {name}")) - .vpc_id(&vpc.id) - .send().await - .context("failed to create security group")?; - let id = resp.group_id().unwrap_or_default().to_string(); - eprintln!(" → Created security group: {id}\n"); - id - } else { - sgs.iter().find(|s| sg_choice.contains(&s.id)).unwrap().id.clone() + // 8. Security group (always create a dedicated one) + let sg_name = format!("oab-{name}"); + let sg_id = match ec2.create_security_group() + .group_name(&sg_name) + .description(format!("OAB agent {name}")) + .vpc_id(&vpc.id) + .send().await + { + Ok(resp) => { + let id = resp.group_id().unwrap_or_default().to_string(); + eprintln!(" → Created security group: {id} ({sg_name})\n"); + id + } + Err(e) => { + let is_duplicate = e.as_service_error() + .map(|se| se.code() == Some("InvalidGroup.Duplicate")) + .unwrap_or(false); + if !is_duplicate { + return Err(anyhow::anyhow!("failed to create security group: {e}")); + } + // SG already exists — look it up + let existing = ec2.describe_security_groups() + .filters(aws_sdk_ec2::types::Filter::builder().name("group-name").values(&sg_name).build()) + .filters(aws_sdk_ec2::types::Filter::builder().name("vpc-id").values(&vpc.id).build()) + .send().await?; + let id = existing.security_groups().first() + .and_then(|sg| sg.group_id()) + .context("SG exists but could not be found")? + .to_string(); + eprintln!(" → Using existing security group: {id} ({sg_name})\n"); + id + } }; // ─── Generate config.toml ────────────────────────────────────────────── @@ -118,7 +156,10 @@ pub async fn run(config: &aws_config::SdkConfig, name: &str, namespace: &str, au std::fs::write(format!("{dir}/config.toml"), &config_toml)?; let subnet_ids: Vec = subnets.iter().map(|s| s.id.clone()).collect(); - let manifest_yaml = generate_manifest(name, namespace, &image, &config_from, capacity_provider, &subnet_ids, &sg_id); + let manifest_yaml = generate_manifest(&ManifestParams { + name, namespace, image: &image, config_from: &config_from, + cap: capacity_provider, subnets: &subnet_ids, sg: &sg_id, cpu, memory, + }); std::fs::write(format!("{dir}/manifest.yaml"), &manifest_yaml)?; // ─── Summary ─────────────────────────────────────────────────────────── @@ -126,7 +167,7 @@ pub async fn run(config: &aws_config::SdkConfig, name: &str, namespace: &str, au eprintln!("Summary:"); eprintln!(" Agent: {name}"); eprintln!(" Image: {image}"); - eprintln!(" CPU/Mem: 256 / 512"); + eprintln!(" CPU/Mem: {} / {}", cpu, memory); eprintln!(" Runtime: ECS {capacity_provider}"); eprintln!(" Subnets: {}", subnet_ids.join(", ")); eprintln!(" SG: {sg_id}"); @@ -282,20 +323,6 @@ async fn select_subnets(ec2: &Ec2Client, vpc_id: &str) -> Result Ok(selected) } -struct SgInfo { id: String, name: String } - -async fn list_security_groups(ec2: &Ec2Client, vpc_id: &str) -> Result> { - let resp = ec2.describe_security_groups() - .filters(aws_sdk_ec2::types::Filter::builder().name("vpc-id").values(vpc_id).build()) - .send().await?; - Ok(resp.security_groups().iter().map(|sg| { - SgInfo { - id: sg.group_id().unwrap_or_default().to_string(), - name: sg.group_name().unwrap_or_default().to_string(), - } - }).collect()) -} - fn generate_config(_backend: &str, name: &str, namespace: &str, stt_enabled: bool) -> String { let stt_section = if stt_enabled { r#"[stt] @@ -354,8 +381,20 @@ usercron_path = "cronjob.toml" ) } -fn generate_manifest(name: &str, namespace: &str, image: &str, config_from: &str, cap: &str, subnets: &[String], sg: &str) -> String { - let subnets_yaml = subnets.iter().map(|s| format!("\"{}\"", s)).collect::>().join(", "); +struct ManifestParams<'a> { + name: &'a str, + namespace: &'a str, + image: &'a str, + config_from: &'a str, + cap: &'a str, + subnets: &'a [String], + sg: &'a str, + cpu: u32, + memory: u32, +} + +fn generate_manifest(p: &ManifestParams) -> String { + let subnets_yaml = p.subnets.iter().map(|s| format!("\"{}\"", s)).collect::>().join(", "); format!( r#"apiVersion: oab.dev/v2 kind: OABService @@ -365,8 +404,8 @@ metadata: spec: image: {image} resources: - cpu: "256" - memory: "512" + cpu: "{cpu}" + memory: "{memory}" configFrom: {config_from} runtime: type: ecs @@ -374,7 +413,15 @@ spec: networking: subnets: [{subnets_yaml}] securityGroups: ["{sg}"] -"# +"#, + name = p.name, + namespace = p.namespace, + image = p.image, + cpu = p.cpu, + memory = p.memory, + config_from = p.config_from, + cap = p.cap, + sg = p.sg, ) } diff --git a/operator/src/main.rs b/operator/src/main.rs index c391e04e0..3dc6d6cbd 100644 --- a/operator/src/main.rs +++ b/operator/src/main.rs @@ -6,6 +6,7 @@ mod create; mod get; mod delete; +use anyhow::Context; use clap::{Parser, Subcommand}; #[derive(Parser)] @@ -47,7 +48,7 @@ enum Commands { /// Optional resource name name: Option, /// ECS cluster name - #[arg(long, default_value = "default")] + #[arg(long, default_value = "oab")] cluster: String, }, /// Delete an OAB service @@ -57,16 +58,22 @@ enum Commands { /// Resource name name: String, /// ECS cluster name - #[arg(long, default_value = "default")] + #[arg(long, default_value = "oab")] cluster: String, /// Namespace #[arg(long, default_value = "prod")] namespace: String, }, - /// Execute a command in an agent container (via ecsctl) + /// Execute a command in an agent container Exec { - /// Agent name (alias) + /// Agent name agent: String, + /// ECS cluster name + #[arg(long, default_value = "oab")] + cluster: String, + /// Namespace (used in service name: oab-{namespace}-{agent}) + #[arg(long, default_value = "prod")] + namespace: String, /// Command to run (default: /bin/sh). Use -- to separate args. #[arg(trailing_var_arg = true, allow_hyphen_values = true)] command: Vec, @@ -77,6 +84,12 @@ enum Commands { src: String, /// Destination path (local or agent:/path) dst: String, + /// ECS cluster name + #[arg(long, default_value = "oab")] + cluster: String, + /// Namespace (used in service name: oab-{namespace}-{agent}) + #[arg(long, default_value = "prod")] + namespace: String, }, /// Sync directories between local machine and agent containers (via ecsctl) Sync { @@ -84,6 +97,12 @@ enum Commands { src: String, /// Destination: agent:/path or local dir dst: String, + /// ECS cluster name + #[arg(long, default_value = "oab")] + cluster: String, + /// Namespace (used in service name: oab-{namespace}-{agent}) + #[arg(long, default_value = "prod")] + namespace: String, }, /// Bootstrap OAB infrastructure (cluster, IAM roles, S3, security group) Bootstrap { @@ -129,12 +148,11 @@ async fn main() -> anyhow::Result<()> { Commands::Delete { resource, name, cluster, namespace } => { delete::run(&config, &resource, &name, &cluster, &namespace).await } - Commands::Exec { agent, command } => { - let resolved = ecsctl::alias::resolve(&config, &agent).await?; + Commands::Exec { agent, cluster, namespace, command } => { + let resolved = resolve_agent(&config, &agent, &cluster, &namespace).await?; let cmd = if command.is_empty() { None } else { - // Join args with single-quote escaping to prevent shell interpretation let joined = command.iter().map(|a| { format!("'{}'", a.replace('\'', "'\\''")) }).collect::>().join(" "); @@ -142,17 +160,17 @@ async fn main() -> anyhow::Result<()> { }; ecsctl::exec::run(&config, &resolved, cmd.as_deref()).await } - Commands::Cp { src, dst } => { - let src = ecsctl::alias::resolve_remote(&config, &src).await?; - let dst = ecsctl::alias::resolve_remote(&config, &dst).await?; + Commands::Cp { src, dst, cluster, namespace } => { + let src = resolve_remote_path(&config, &src, &cluster, &namespace).await?; + let dst = resolve_remote_path(&config, &dst, &cluster, &namespace).await?; eprintln!("⇄ Copying {} → {} ...", src, dst); ecsctl::cp::run(&config, &src, &dst, None, 60).await?; eprintln!("✓ Done"); Ok(()) } - Commands::Sync { src, dst } => { - let src = ecsctl::alias::resolve_remote(&config, &src).await?; - let dst = ecsctl::alias::resolve_remote(&config, &dst).await?; + Commands::Sync { src, dst, cluster, namespace } => { + let src = resolve_remote_path(&config, &src, &cluster, &namespace).await?; + let dst = resolve_remote_path(&config, &dst, &cluster, &namespace).await?; let src_remote = ecsctl::cp::is_remote(&src); let dst_remote = ecsctl::cp::is_remote(&dst); eprintln!("⇄ Syncing {} → {} ...", src, dst); @@ -189,3 +207,40 @@ async fn main() -> anyhow::Result<()> { } } } + +/// Resolve agent name to cluster/task_id/container for ECS Exec. +/// Looks up service "{cluster}-{namespace}-{name}" in the given cluster. +async fn resolve_agent(config: &aws_config::SdkConfig, name: &str, cluster: &str, namespace: &str) -> anyhow::Result { + // If already in cluster/task/container format, pass through + if name.contains('/') { + return Ok(name.to_string()); + } + + let ecs = aws_sdk_ecs::Client::new(config); + let service_name = format!("oab-{namespace}-{name}"); + + let tasks = ecs.list_tasks() + .cluster(cluster) + .service_name(&service_name) + .desired_status(aws_sdk_ecs::types::DesiredStatus::Running) + .send().await + .context(format!("failed to list tasks for {service_name}"))?; + + let task_arn = tasks.task_arns().first() + .context(format!("no running tasks for agent '{name}'"))?; + + let task_id = task_arn.rsplit('/').next().unwrap_or(task_arn); + Ok(format!("{cluster}/{task_id}/openab")) +} + +/// Resolve remote path "agent:/path" using dynamic ECS task lookup. +/// Local paths are returned unchanged. +async fn resolve_remote_path(config: &aws_config::SdkConfig, path: &str, cluster: &str, namespace: &str) -> anyhow::Result { + if !path.contains(':') { + return Ok(path.to_string()); + } + let (agent, remote_path) = path.split_once(':') + .context("invalid remote path format")?; + let resolved = resolve_agent(config, agent, cluster, namespace).await?; + Ok(format!("{resolved}:{remote_path}")) +}