diff --git a/core/src/ten_manager/src/designer/exec/cmd_run.rs b/core/src/ten_manager/src/designer/exec/cmd_run.rs index 4f81011ec6..31f77f0503 100644 --- a/core/src/ten_manager/src/designer/exec/cmd_run.rs +++ b/core/src/ten_manager/src/designer/exec/cmd_run.rs @@ -11,6 +11,7 @@ use actix_web_actors::ws::WebsocketContext; use crossbeam_channel::{bounded, Sender}; use crate::designer::exec::RunCmdOutput; +use crate::log::{process_log_line, GraphResourcesLog, LogLineInfo}; use super::{msg::OutboundMsg, WsRunCmd}; @@ -43,7 +44,15 @@ impl WsRunCmd { command .arg("-c") .arg(cmd) - .env("TEN_LOG_FORMATTER", "default") + // Set TEN_LOG_FORMATTER to default if any output is log content. + .env( + "TEN_LOG_FORMATTER", + if self.stdout_is_log || self.stderr_is_log { + "default" + } else { + "" + }, + ) // Capture stdout/stderr. .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()); @@ -82,11 +91,20 @@ impl WsRunCmd { if let Some(mut out) = stdout_child { let addr_stdout = addr.clone(); let shutdown_rx = stdout_shutdown_rx; + let is_log = self.stdout_is_log; thread::spawn(move || { use std::io::{BufRead, BufReader}; let reader = BufReader::new(&mut out); + // Create a graph resources log instance for log processing. + let mut graph_resources_log = GraphResourcesLog { + graph_id: String::new(), + graph_name: String::new(), + app_uri: None, + extension_threads: std::collections::HashMap::new(), + }; + for line_res in reader.lines() { // Check if we should terminate. if shutdown_rx.try_recv().is_ok() { @@ -95,11 +113,22 @@ impl WsRunCmd { match line_res { Ok(line) => { - // `do_send` is used to asynchronously send messages - // to an actor. This method does not wait for the - // message to be processed, making it suitable for - // messages that do not require a response. - addr_stdout.do_send(RunCmdOutput::StdOut(line)); + if is_log { + // Process line as log content. + let metadata = process_log_line( + &line, + &mut graph_resources_log, + ); + let log_line_info = + LogLineInfo { line, metadata }; + addr_stdout.do_send(RunCmdOutput::StdOutLog( + log_line_info, + )); + } else { + // Process as normal stdout. + addr_stdout + .do_send(RunCmdOutput::StdOutNormal(line)); + } } Err(_) => break, } @@ -112,11 +141,20 @@ impl WsRunCmd { if let Some(mut err) = stderr_child { let addr_stderr = addr.clone(); let shutdown_rx = stderr_shutdown_rx; + let is_log = self.stderr_is_log; thread::spawn(move || { use std::io::{BufRead, BufReader}; let reader = BufReader::new(&mut err); + // Create a graph resources log instance for log processing. + let mut graph_resources_log = GraphResourcesLog { + graph_id: String::new(), + graph_name: String::new(), + app_uri: None, + extension_threads: std::collections::HashMap::new(), + }; + for line_res in reader.lines() { // Check if we should terminate. if shutdown_rx.try_recv().is_ok() { @@ -125,7 +163,22 @@ impl WsRunCmd { match line_res { Ok(line) => { - addr_stderr.do_send(RunCmdOutput::StdErr(line)); + if is_log { + // Process line as log content. + let metadata = process_log_line( + &line, + &mut graph_resources_log, + ); + let log_line_info = + LogLineInfo { line, metadata }; + addr_stderr.do_send(RunCmdOutput::StdErrLog( + log_line_info, + )); + } else { + // Process as normal stderr. + addr_stderr + .do_send(RunCmdOutput::StdErrNormal(line)); + } } Err(_) => break, } diff --git a/core/src/ten_manager/src/designer/exec/mod.rs b/core/src/ten_manager/src/designer/exec/mod.rs index 0035637c0c..8c50ff437a 100644 --- a/core/src/ten_manager/src/designer/exec/mod.rs +++ b/core/src/ten_manager/src/designer/exec/mod.rs @@ -18,6 +18,7 @@ use anyhow::Context; use anyhow::Result; use crate::designer::DesignerState; +use crate::log::LogLineInfo; use cmd_run::ShutdownSenders; use msg::InboundMsg; use msg::OutboundMsg; @@ -27,20 +28,25 @@ use run_script::extract_command_from_manifest; #[derive(Message)] #[rtype(result = "()")] pub enum RunCmdOutput { - StdOut(String), - StdErr(String), + StdOutNormal(String), + StdOutLog(LogLineInfo), + + StdErrNormal(String), + StdErrLog(LogLineInfo), + Exit(i32), } -/// `CmdParser` returns a tuple: the 1st element is the command string, and -/// the 2nd is an optional working directory. +/// `CmdParser` returns a tuple: the 1st element is the command string, the 2nd +/// is an optional working directory, and the 3rd and 4th are booleans +/// indicating if stdout and stderr are log content. pub type CmdParser = Box< dyn Fn( &str, ) -> std::pin::Pin< Box< dyn std::future::Future< - Output = Result<(String, Option)>, + Output = Result<(String, Option, bool, bool)>, > + Send, >, > + Send @@ -52,6 +58,8 @@ pub struct WsRunCmd { cmd_parser: CmdParser, working_directory: Option, shutdown_senders: Option, + stdout_is_log: bool, + stderr_is_log: bool, } impl WsRunCmd { @@ -61,6 +69,8 @@ impl WsRunCmd { cmd_parser, working_directory: None, shutdown_senders: None, + stdout_is_log: false, + stderr_is_log: false, } } } @@ -93,16 +103,30 @@ impl Handler for WsRunCmd { ctx: &mut Self::Context, ) -> Self::Result { match msg { - RunCmdOutput::StdOut(line) => { + RunCmdOutput::StdOutNormal(line) => { // Send the line to the client. - let msg_out = OutboundMsg::StdOut { data: line }; + let msg_out = OutboundMsg::StdOutNormal { data: line }; + let out_str = serde_json::to_string(&msg_out).unwrap(); + + // Sends a text message to the WebSocket client. + ctx.text(out_str); + } + RunCmdOutput::StdOutLog(log_line) => { + let msg_out = OutboundMsg::StdOutLog { data: log_line }; let out_str = serde_json::to_string(&msg_out).unwrap(); // Sends a text message to the WebSocket client. ctx.text(out_str); } - RunCmdOutput::StdErr(line) => { - let msg_out = OutboundMsg::StdErr { data: line }; + RunCmdOutput::StdErrNormal(line) => { + let msg_out = OutboundMsg::StdErrNormal { data: line }; + let out_str = serde_json::to_string(&msg_out).unwrap(); + + // Sends a text message to the WebSocket client. + ctx.text(out_str); + } + RunCmdOutput::StdErrLog(log_line) => { + let msg_out = OutboundMsg::StdErrLog { data: log_line }; let out_str = serde_json::to_string(&msg_out).unwrap(); // Sends a text message to the WebSocket client. @@ -140,10 +164,17 @@ impl StreamHandler> for WsRunCmd { actix::spawn(async move { match fut.await { - Ok((cmd, working_directory)) => { + Ok(( + cmd, + working_directory, + stdout_is_log, + stderr_is_log, + )) => { actor_addr.do_send(ProcessCommand { cmd, working_directory, + stdout_is_log, + stderr_is_log, }); } Err(e) => { @@ -173,6 +204,8 @@ impl StreamHandler> for WsRunCmd { struct ProcessCommand { cmd: String, working_directory: Option, + stdout_is_log: bool, + stderr_is_log: bool, } #[derive(Message)] @@ -219,6 +252,9 @@ impl Handler for WsRunCmd { self.working_directory = Some(dir); } + self.stdout_is_log = msg.stdout_is_log; + self.stderr_is_log = msg.stderr_is_log; + self.cmd_run(&msg.cmd, ctx); } } @@ -245,17 +281,25 @@ pub async fn exec_endpoint( })?; match inbound { - InboundMsg::ExecCmd { base_dir, cmd } => { - Ok((cmd, Some(base_dir))) - } - InboundMsg::RunScript { base_dir, name } => { + InboundMsg::ExecCmd { + base_dir, + cmd, + stdout_is_log, + stderr_is_log, + } => Ok((cmd, Some(base_dir), stdout_is_log, stderr_is_log)), + InboundMsg::RunScript { + base_dir, + name, + stdout_is_log, + stderr_is_log, + } => { let cmd = extract_command_from_manifest( &base_dir, &name, state_clone_inner, ) .await?; - Ok((cmd, Some(base_dir))) + Ok((cmd, Some(base_dir), stdout_is_log, stderr_is_log)) } } }) diff --git a/core/src/ten_manager/src/designer/exec/msg.rs b/core/src/ten_manager/src/designer/exec/msg.rs index dc612c8fba..d88c8824ed 100644 --- a/core/src/ten_manager/src/designer/exec/msg.rs +++ b/core/src/ten_manager/src/designer/exec/msg.rs @@ -6,24 +6,42 @@ // use serde::{Deserialize, Serialize}; +use crate::log::LogLineInfo; + #[derive(Serialize, Deserialize, Debug)] #[serde(tag = "type")] pub enum InboundMsg { #[serde(rename = "exec_cmd")] - ExecCmd { base_dir: String, cmd: String }, + ExecCmd { + base_dir: String, + cmd: String, + stdout_is_log: bool, + stderr_is_log: bool, + }, #[serde(rename = "run_script")] - RunScript { base_dir: String, name: String }, + RunScript { + base_dir: String, + name: String, + stdout_is_log: bool, + stderr_is_log: bool, + }, } #[derive(Serialize, Deserialize, Debug)] #[serde(tag = "type")] pub enum OutboundMsg { - #[serde(rename = "stdout")] - StdOut { data: String }, + #[serde(rename = "stdout_normal")] + StdOutNormal { data: String }, + + #[serde(rename = "stdout_log")] + StdOutLog { data: LogLineInfo }, + + #[serde(rename = "stderr_normal")] + StdErrNormal { data: String }, - #[serde(rename = "stderr")] - StdErr { data: String }, + #[serde(rename = "stderr_log")] + StdErrLog { data: LogLineInfo }, #[serde(rename = "exit")] Exit { code: i32 }, diff --git a/core/src/ten_manager/src/fs/log_file_watcher.rs b/core/src/ten_manager/src/fs/log_file_watcher.rs index c4de5f62f1..1e862f3919 100644 --- a/core/src/ten_manager/src/fs/log_file_watcher.rs +++ b/core/src/ten_manager/src/fs/log_file_watcher.rs @@ -19,8 +19,8 @@ use anyhow::{anyhow, Result}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::oneshot; -use crate::log::{extract_extension_from_log_line, parse_graph_resources_log}; -use crate::log::{GraphResourcesLog, LogLineInfo, LogLineMetadata}; +use crate::log::process_log_line; +use crate::log::{GraphResourcesLog, LogLineInfo}; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); // 1 minute timeout. const DEFAULT_BUFFER_SIZE: usize = 4096; // Default read buffer size. @@ -284,23 +284,3 @@ async fn watch_log_file_task( } } } - -/// Process a log line: try to parse as graph resources log first, then try to -/// extract extension information. -fn process_log_line( - log_line: &str, - graph_resources_log: &mut GraphResourcesLog, -) -> Option { - // First try to parse as graph resources log. - match parse_graph_resources_log(log_line, graph_resources_log) { - Ok(_) => { - // Successfully parsed as graph resources log, but no metadata to - // return. - None - } - Err(_) => { - // Not a graph resources log, try to extract extension information. - extract_extension_from_log_line(log_line, graph_resources_log) - } - } -} diff --git a/core/src/ten_manager/src/log/mod.rs b/core/src/ten_manager/src/log/mod.rs index ba732add15..72063fdf3c 100644 --- a/core/src/ten_manager/src/log/mod.rs +++ b/core/src/ten_manager/src/log/mod.rs @@ -186,3 +186,23 @@ pub fn parse_graph_resources_log( Ok(()) } + +/// Process a log line: try to parse as graph resources log first, then try to +/// extract extension information. +pub fn process_log_line( + log_line: &str, + graph_resources_log: &mut GraphResourcesLog, +) -> Option { + // First try to parse as graph resources log. + match parse_graph_resources_log(log_line, graph_resources_log) { + Ok(_) => { + // Successfully parsed as graph resources log, but no metadata to + // return. + None + } + Err(_) => { + // Not a graph resources log, try to extract extension information. + extract_extension_from_log_line(log_line, graph_resources_log) + } + } +} diff --git a/core/src/ten_manager/tests/test_case/fs/mod.rs b/core/src/ten_manager/tests/test_case/fs/mod.rs index 0d28577865..05d53bc738 100644 --- a/core/src/ten_manager/tests/test_case/fs/mod.rs +++ b/core/src/ten_manager/tests/test_case/fs/mod.rs @@ -42,6 +42,7 @@ mod tests { // Get the first chunk. let chunk = stream.next().await.expect("Should receive data")?; + println!("chunk: {:?}", chunk); assert_eq!(chunk.line, test_content); // Write more content to the file. @@ -51,6 +52,7 @@ mod tests { // Get the second chunk. let chunk = stream.next().await; + println!("chunk: {:?}", chunk); match chunk { Some(chunk) => match chunk { Ok(chunk) => assert_eq!(chunk.line, more_content), @@ -100,6 +102,7 @@ mod tests { // Get the first chunk let chunk = stream.next().await.expect("Should receive data")?; + println!("chunk: {:?}", chunk); assert_eq!(chunk.line, "Initial content"); // Simulate log rotation - delete and recreate file @@ -121,6 +124,7 @@ mod tests { // Get the content after rotation let chunk = stream.next().await.expect("Should receive rotated data")?; + println!("chunk: {:?}", chunk); assert_eq!(chunk.line, "Rotated content"); // Stop watching @@ -152,10 +156,12 @@ mod tests { // Get the first chunk. let chunk = stream.next().await.expect("Should receive data")?; + println!("chunk: {:?}", chunk); assert_eq!(chunk.line, "Test content"); // Wait for the timeout to occur (no new content being written). let next_result = stream.next().await; + println!("next_result: {:?}", next_result); assert!(next_result.is_none(), "Stream should end after timeout"); Ok(()) @@ -198,6 +204,7 @@ mod tests { // Get the graph resources line let chunk = stream.next().await.expect("Should receive graph resources")?; + println!("chunk: {:?}", chunk); assert!(chunk.line.contains("[graph resources]")); // Now write logs that contain extension metadata in format @@ -218,11 +225,13 @@ mod tests { .next() .await .expect("Should receive log with extension")?; + println!("chunk: {:?}", chunk); assert!(chunk.line.contains("on_start()")); // The metadata should include the extension name assert!(chunk.metadata.is_some(), "Should have metadata"); let metadata = chunk.metadata.unwrap(); + println!("metadata: {:?}", metadata); assert_eq!(metadata.extension, Some("test_extension".to_string())); // Get second log with extension metadata @@ -230,11 +239,13 @@ mod tests { .next() .await .expect("Should receive log with extension")?; + println!("chunk: {:?}", chunk); assert!(chunk.line.contains("on_start() done")); // The metadata should include the extension name assert!(chunk.metadata.is_some(), "Should have metadata"); let metadata = chunk.metadata.unwrap(); + println!("metadata: {:?}", metadata); assert_eq!(metadata.extension, Some("test_extension".to_string())); // Stop watching @@ -394,6 +405,7 @@ mod tests { // Verify that the metadata includes the extension name assert!(chunk.metadata.is_some(), "Should have metadata"); let metadata = chunk.metadata.unwrap(); + println!("metadata: {:?}", metadata); assert_eq!( metadata.extension, Some("test_extension".to_string())