Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions crates/aof-triggers/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ impl TriggerHandler {
self.platforms.get(name)
}

/// Register a command binding (maps slash command to agent/fleet/flow)
pub fn register_command_binding(&mut self, command: String, binding: CommandBinding) {
info!("Registering command binding: /{} -> {:?}", command, binding);
self.config.command_bindings.insert(command, binding);
}

/// Handle incoming message from platform
pub async fn handle_message(&self, platform: &str, message: TriggerMessage) -> AofResult<()> {
debug!(
Expand Down Expand Up @@ -853,9 +859,42 @@ impl TriggerHandler {
if let Some(binding) = self.config.command_bindings.get(&cmd_name) {
info!("Command '{}' matched binding: {:?}", cmd_name, binding);

// Create modified message with just the text (without the command)
// Create modified message with context from metadata if command text is empty
let mut routed_message = message.clone();
routed_message.text = command_text.clone().unwrap_or_default();
let cmd_text = command_text.clone().unwrap_or_default();

// If command text is empty, construct context from metadata (for PR/issue commands)
if cmd_text.trim().is_empty() {
// Build context from metadata for commands like /review
let mut context_parts = Vec::new();

if let Some(pr_url) = message.metadata.get("pr_html_url").and_then(|v| v.as_str()) {
context_parts.push(format!("Review the PR at: {}", pr_url));
} else if let Some(issue_url) = message.metadata.get("issue_html_url").and_then(|v| v.as_str()) {
context_parts.push(format!("Review the PR/issue at: {}", issue_url));
}

if let Some(pr_title) = message.metadata.get("pr_title").and_then(|v| v.as_str()) {
context_parts.push(format!("Title: {}", pr_title));
} else if let Some(issue_title) = message.metadata.get("issue_title").and_then(|v| v.as_str()) {
context_parts.push(format!("Title: {}", issue_title));
}

if let Some(comment_body) = message.metadata.get("comment_body").and_then(|v| v.as_str()) {
if !comment_body.starts_with('/') {
context_parts.push(format!("Additional context: {}", comment_body));
}
}

routed_message.text = if context_parts.is_empty() {
format!("Execute {} command", cmd_name)
} else {
context_parts.join("\n")
};
info!("Constructed context for command '{}': {}", cmd_name, routed_message.text);
} else {
routed_message.text = cmd_text;
}

// Route to flow if specified (highest priority - complex workflows)
if let Some(ref flow_name) = binding.flow {
Expand Down
2 changes: 1 addition & 1 deletion crates/aof-triggers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod server;
pub use command::{CommandContext, CommandType, TriggerCommand, TriggerTarget};

// Re-export main types from handler module
pub use handler::{TriggerHandler, TriggerHandlerConfig};
pub use handler::{TriggerHandler, TriggerHandlerConfig, CommandBinding};

// Re-export main types from platforms module
pub use platforms::{Platform, PlatformConfig};
Expand Down
56 changes: 45 additions & 11 deletions crates/aof-triggers/src/platforms/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,18 @@ impl GitHubPlatform {
/// Create new GitHub platform adapter
///
/// # Errors
/// Returns error if token or webhook_secret is empty
/// Returns error if webhook_secret is empty (token is optional for receive-only mode)
pub fn new(config: GitHubConfig) -> Result<Self, PlatformError> {
if config.token.is_empty() || config.webhook_secret.is_empty() {
// Webhook secret is required for signature verification
if config.webhook_secret.is_empty() {
return Err(PlatformError::ParseError(
"GitHub token and webhook secret are required".to_string(),
"GitHub webhook secret is required for signature verification".to_string(),
));
}
// Token is optional - if not provided, API features (posting comments) are disabled
if config.token.is_empty() {
tracing::warn!("GitHub token not provided - API features (posting comments, reviews) disabled");
}

let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
Expand Down Expand Up @@ -916,12 +921,20 @@ impl GitHubPlatform {
})?;
let issue = payload.issue.as_ref();
let issue_num = issue.map(|i| i.number).unwrap_or(0);
format!(
"comment:{}:#{} {}",
action.unwrap_or(""),
issue_num,
comment.body.lines().next().unwrap_or("")
)
let first_line = comment.body.lines().next().unwrap_or("").trim();

// If the comment starts with a slash command, use it directly as the text
// This enables command detection in issue comments (e.g., /review, /deploy)
if first_line.starts_with('/') {
first_line.to_string()
} else {
format!(
"comment:{}:#{} {}",
action.unwrap_or(""),
issue_num,
first_line
)
}
}
"workflow_run" => {
let run = payload.workflow_run.as_ref().ok_or_else(|| {
Expand Down Expand Up @@ -949,8 +962,15 @@ impl GitHubPlatform {
_ => format!("{}:{}", event_type, action.unwrap_or("")),
};

// Build channel_id from repo full name
let channel_id = repo.full_name.clone();
// Build channel_id from repo full name and issue/PR number for response posting
// Format: owner/repo#number (allows send_response to post comments)
let channel_id = if let Some(ref pr) = payload.pull_request {
format!("{}#{}", repo.full_name, pr.number)
} else if let Some(ref issue) = payload.issue {
format!("{}#{}", repo.full_name, issue.number)
} else {
repo.full_name.clone()
};

// Build user
let trigger_user = TriggerUser {
Expand Down Expand Up @@ -993,6 +1013,20 @@ impl GitHubPlatform {
metadata.insert("issue_html_url".to_string(), serde_json::json!(issue.html_url));
}

// Add comment-specific metadata
if let Some(ref comment) = payload.comment {
metadata.insert("comment_id".to_string(), serde_json::json!(comment.id));
metadata.insert("comment_body".to_string(), serde_json::json!(comment.body));
metadata.insert("comment_html_url".to_string(), serde_json::json!(comment.html_url));
// Check if this is a PR comment (issue_comment on a PR has a pull_request URL in the issue)
if let Some(ref issue) = payload.issue {
// GitHub includes a pull_request field in the issue object if this is a PR
// Since we don't have that field parsed, we can check the html_url
let is_pr_comment = issue.html_url.contains("/pull/");
metadata.insert("is_pr_comment".to_string(), serde_json::json!(is_pr_comment));
}
}

// Add push-specific metadata
if let Some(ref git_ref) = payload.git_ref {
metadata.insert("ref".to_string(), serde_json::json!(git_ref));
Expand Down
1 change: 1 addition & 0 deletions crates/aof-triggers/tests/workflow_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ fn test_custom_handler_config() {
max_tasks_per_user: 5,
command_timeout_secs: 600,
default_agent: None,
command_bindings: HashMap::new(),
};

let handler = create_handler_with_config(config.clone());
Expand Down
6 changes: 6 additions & 0 deletions crates/aofctl/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ pub enum Commands {
/// Directory containing AgentFlow YAML files for event-driven routing
#[arg(long)]
flows_dir: Option<String>,

/// Directory containing Trigger YAML files
#[arg(long)]
triggers_dir: Option<String>,
},

/// Manage agent fleets (multi-agent coordination)
Expand Down Expand Up @@ -295,13 +299,15 @@ impl Cli {
host,
agents_dir,
flows_dir,
triggers_dir,
} => {
commands::serve::execute(
config.as_deref(),
port,
host.as_deref(),
agents_dir.as_deref(),
flows_dir.as_deref(),
triggers_dir.as_deref(),
)
.await
}
Expand Down
121 changes: 121 additions & 0 deletions crates/aofctl/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;

use aof_core::{TriggerRegistry, Registry, StandaloneTriggerType};
use aof_runtime::{Runtime, RuntimeOrchestrator};
use aof_triggers::{
TriggerHandler, TriggerHandlerConfig, TriggerServer, TriggerServerConfig,
SlackPlatform, SlackConfig,
DiscordPlatform, PlatformConfig,
TelegramPlatform, TelegramConfig,
WhatsAppPlatform, WhatsAppConfig,
GitHubPlatform, GitHubConfig,
CommandBinding as HandlerCommandBinding,
flow::{FlowRegistry, FlowRouter},
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -65,6 +68,10 @@ pub struct ServeSpec {
#[serde(default)]
pub flows: FlowsConfig,

/// Triggers directory (for loading Trigger resources)
#[serde(default)]
pub triggers: TriggersConfig,

/// Runtime settings
#[serde(default)]
pub runtime: RuntimeConfig,
Expand Down Expand Up @@ -222,6 +229,17 @@ pub struct FlowsConfig {
pub enabled: bool,
}

/// Triggers configuration for loading Trigger resources
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TriggersConfig {
/// Directory containing Trigger YAML files
pub directory: Option<PathBuf>,

/// Watch for changes and hot-reload
#[serde(default)]
pub watch: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeConfig {
/// Maximum concurrent tasks
Expand Down Expand Up @@ -287,6 +305,7 @@ pub async fn execute(
host: Option<&str>,
agents_dir: Option<&str>,
flows_dir: Option<&str>,
triggers_dir: Option<&str>,
) -> anyhow::Result<()> {
// Load configuration
let config = if let Some(config_path) = config_file {
Expand Down Expand Up @@ -315,6 +334,10 @@ pub async fn execute(
watch: false,
enabled: true,
},
triggers: TriggersConfig {
directory: triggers_dir.map(PathBuf::from),
watch: false,
},
runtime: RuntimeConfig::default(),
},
}
Expand Down Expand Up @@ -490,6 +513,104 @@ pub async fn execute(
}
}

// Load Triggers from directory
let triggers_dir_path = triggers_dir
.map(PathBuf::from)
.or_else(|| config.spec.triggers.directory.clone());

if let Some(ref triggers_path) = triggers_dir_path {
info!("Loading Triggers from: {}", triggers_path.display());
let mut trigger_registry = TriggerRegistry::new();

match trigger_registry.load_directory(triggers_path) {
Ok(count) => {
if count > 0 {
info!(" Loaded {} triggers: {:?}", count, trigger_registry.names());

// Register platforms for each trigger type
for trigger in trigger_registry.get_all() {
match trigger.spec.trigger_type {
StandaloneTriggerType::GitHub => {
// Register GitHub platform if we have a trigger for it
if let Some(ref secret) = trigger.spec.config.webhook_secret {
// Get GitHub token from env or trigger config
let token = std::env::var("GITHUB_TOKEN")
.or_else(|_| std::env::var("GH_TOKEN"))
.unwrap_or_default();

if token.is_empty() {
warn!(" GitHub trigger '{}': GITHUB_TOKEN not set, API features disabled", trigger.name());
}

let github_config = GitHubConfig {
token,
webhook_secret: secret.clone(),
bot_name: "aof-bot".to_string(),
api_url: "https://api.github.com".to_string(),
allowed_repos: None,
allowed_events: None,
allowed_users: None,
auto_approve_patterns: None,
enable_status_checks: true,
enable_reviews: true,
enable_comments: true,
};
match GitHubPlatform::new(github_config) {
Ok(platform) => {
handler.register_platform(Arc::new(platform));
info!(" Registered platform: github (from trigger '{}')", trigger.name());
platforms_registered += 1;
}
Err(e) => {
warn!(" Failed to create GitHub platform: {}", e);
}
}
} else {
warn!(" GitHub trigger '{}' missing webhook_secret", trigger.name());
}
}
// Other trigger types use platforms registered from config
_ => {}
}

// Add command bindings from trigger
for (cmd, binding) in &trigger.spec.commands {
// Convert core CommandBinding to handler CommandBinding
let handler_binding = HandlerCommandBinding {
agent: binding.agent.clone(),
fleet: binding.fleet.clone(),
flow: binding.flow.clone(),
description: binding.description.clone(),
};

// Strip leading slash if present for consistent lookup
let cmd_name = cmd.trim_start_matches('/').to_string();
handler.register_command_binding(cmd_name.clone(), handler_binding);

if let Some(ref agent) = binding.agent {
info!(" Registered command '{}' -> agent '{}'", cmd, agent);
} else if let Some(ref fleet) = binding.fleet {
info!(" Registered command '{}' -> fleet '{}'", cmd, fleet);
} else if let Some(ref flow) = binding.flow {
info!(" Registered command '{}' -> flow '{}'", cmd, flow);
}
}

// Set default agent if specified in trigger
if let Some(ref default_agent) = trigger.spec.default_agent {
info!(" Default agent for trigger '{}': {}", trigger.name(), default_agent);
}
}
} else {
info!(" No Trigger files found in {}", triggers_path.display());
}
}
Err(e) => {
warn!(" Failed to load triggers from {}: {}", triggers_path.display(), e);
}
}
}

if platforms_registered == 0 {
warn!("No platforms registered! Server will start but won't process any webhooks.");
warn!("Configure platforms in your config file or set environment variables.");
Expand Down