Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 60 additions & 7 deletions core/src/ten_manager/src/designer/exec/cmd_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use actix_web_actors::ws::WebsocketContext;
use crossbeam_channel::{bounded, Sender};

use crate::designer::exec::RunCmdOutput;
use crate::log::{process_log_line, GraphResourcesLog, LogLineInfo};

use super::{msg::OutboundMsg, WsRunCmd};

Expand Down Expand Up @@ -43,7 +44,15 @@ impl WsRunCmd {
command
.arg("-c")
.arg(cmd)
.env("TEN_LOG_FORMATTER", "default")
// Set TEN_LOG_FORMATTER to default if any output is log content.
.env(
"TEN_LOG_FORMATTER",
if self.stdout_is_log || self.stderr_is_log {
"default"
} else {
""
},
)
// Capture stdout/stderr.
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
Expand Down Expand Up @@ -82,11 +91,20 @@ impl WsRunCmd {
if let Some(mut out) = stdout_child {
let addr_stdout = addr.clone();
let shutdown_rx = stdout_shutdown_rx;
let is_log = self.stdout_is_log;

thread::spawn(move || {
use std::io::{BufRead, BufReader};

let reader = BufReader::new(&mut out);
// Create a graph resources log instance for log processing.
let mut graph_resources_log = GraphResourcesLog {
graph_id: String::new(),
graph_name: String::new(),
app_uri: None,
extension_threads: std::collections::HashMap::new(),
};

for line_res in reader.lines() {
// Check if we should terminate.
if shutdown_rx.try_recv().is_ok() {
Expand All @@ -95,11 +113,22 @@ impl WsRunCmd {

match line_res {
Ok(line) => {
// `do_send` is used to asynchronously send messages
// to an actor. This method does not wait for the
// message to be processed, making it suitable for
// messages that do not require a response.
addr_stdout.do_send(RunCmdOutput::StdOut(line));
if is_log {
// Process line as log content.
let metadata = process_log_line(
&line,
&mut graph_resources_log,
);
let log_line_info =
LogLineInfo { line, metadata };
addr_stdout.do_send(RunCmdOutput::StdOutLog(
log_line_info,
));
} else {
// Process as normal stdout.
addr_stdout
.do_send(RunCmdOutput::StdOutNormal(line));
}
}
Err(_) => break,
}
Expand All @@ -112,11 +141,20 @@ impl WsRunCmd {
if let Some(mut err) = stderr_child {
let addr_stderr = addr.clone();
let shutdown_rx = stderr_shutdown_rx;
let is_log = self.stderr_is_log;

thread::spawn(move || {
use std::io::{BufRead, BufReader};

let reader = BufReader::new(&mut err);
// Create a graph resources log instance for log processing.
let mut graph_resources_log = GraphResourcesLog {
graph_id: String::new(),
graph_name: String::new(),
app_uri: None,
extension_threads: std::collections::HashMap::new(),
};

for line_res in reader.lines() {
// Check if we should terminate.
if shutdown_rx.try_recv().is_ok() {
Expand All @@ -125,7 +163,22 @@ impl WsRunCmd {

match line_res {
Ok(line) => {
addr_stderr.do_send(RunCmdOutput::StdErr(line));
if is_log {
// Process line as log content.
let metadata = process_log_line(
&line,
&mut graph_resources_log,
);
let log_line_info =
LogLineInfo { line, metadata };
addr_stderr.do_send(RunCmdOutput::StdErrLog(
log_line_info,
));
} else {
// Process as normal stderr.
addr_stderr
.do_send(RunCmdOutput::StdErrNormal(line));
}
}
Err(_) => break,
}
Expand Down
74 changes: 59 additions & 15 deletions core/src/ten_manager/src/designer/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use anyhow::Context;
use anyhow::Result;

use crate::designer::DesignerState;
use crate::log::LogLineInfo;
use cmd_run::ShutdownSenders;
use msg::InboundMsg;
use msg::OutboundMsg;
Expand All @@ -27,20 +28,25 @@ use run_script::extract_command_from_manifest;
#[derive(Message)]
#[rtype(result = "()")]
pub enum RunCmdOutput {
StdOut(String),
StdErr(String),
StdOutNormal(String),
StdOutLog(LogLineInfo),

StdErrNormal(String),
StdErrLog(LogLineInfo),

Exit(i32),
}

/// `CmdParser` returns a tuple: the 1st element is the command string, and
/// the 2nd is an optional working directory.
/// `CmdParser` returns a tuple: the 1st element is the command string, the 2nd
/// is an optional working directory, and the 3rd and 4th are booleans
/// indicating if stdout and stderr are log content.
pub type CmdParser = Box<
dyn Fn(
&str,
) -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<(String, Option<String>)>,
Output = Result<(String, Option<String>, bool, bool)>,
> + Send,
>,
> + Send
Expand All @@ -52,6 +58,8 @@ pub struct WsRunCmd {
cmd_parser: CmdParser,
working_directory: Option<String>,
shutdown_senders: Option<ShutdownSenders>,
stdout_is_log: bool,
stderr_is_log: bool,
}

impl WsRunCmd {
Expand All @@ -61,6 +69,8 @@ impl WsRunCmd {
cmd_parser,
working_directory: None,
shutdown_senders: None,
stdout_is_log: false,
stderr_is_log: false,
}
}
}
Expand Down Expand Up @@ -93,16 +103,30 @@ impl Handler<RunCmdOutput> for WsRunCmd {
ctx: &mut Self::Context,
) -> Self::Result {
match msg {
RunCmdOutput::StdOut(line) => {
RunCmdOutput::StdOutNormal(line) => {
// Send the line to the client.
let msg_out = OutboundMsg::StdOut { data: line };
let msg_out = OutboundMsg::StdOutNormal { data: line };
let out_str = serde_json::to_string(&msg_out).unwrap();

// Sends a text message to the WebSocket client.
ctx.text(out_str);
}
RunCmdOutput::StdOutLog(log_line) => {
let msg_out = OutboundMsg::StdOutLog { data: log_line };
let out_str = serde_json::to_string(&msg_out).unwrap();

// Sends a text message to the WebSocket client.
ctx.text(out_str);
}
RunCmdOutput::StdErr(line) => {
let msg_out = OutboundMsg::StdErr { data: line };
RunCmdOutput::StdErrNormal(line) => {
let msg_out = OutboundMsg::StdErrNormal { data: line };
let out_str = serde_json::to_string(&msg_out).unwrap();

// Sends a text message to the WebSocket client.
ctx.text(out_str);
}
RunCmdOutput::StdErrLog(log_line) => {
let msg_out = OutboundMsg::StdErrLog { data: log_line };
let out_str = serde_json::to_string(&msg_out).unwrap();

// Sends a text message to the WebSocket client.
Expand Down Expand Up @@ -140,10 +164,17 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsRunCmd {

actix::spawn(async move {
match fut.await {
Ok((cmd, working_directory)) => {
Ok((
cmd,
working_directory,
stdout_is_log,
stderr_is_log,
)) => {
actor_addr.do_send(ProcessCommand {
cmd,
working_directory,
stdout_is_log,
stderr_is_log,
});
}
Err(e) => {
Expand Down Expand Up @@ -173,6 +204,8 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsRunCmd {
struct ProcessCommand {
cmd: String,
working_directory: Option<String>,
stdout_is_log: bool,
stderr_is_log: bool,
}

#[derive(Message)]
Expand Down Expand Up @@ -219,6 +252,9 @@ impl Handler<ProcessCommand> for WsRunCmd {
self.working_directory = Some(dir);
}

self.stdout_is_log = msg.stdout_is_log;
self.stderr_is_log = msg.stderr_is_log;

self.cmd_run(&msg.cmd, ctx);
}
}
Expand All @@ -245,17 +281,25 @@ pub async fn exec_endpoint(
})?;

match inbound {
InboundMsg::ExecCmd { base_dir, cmd } => {
Ok((cmd, Some(base_dir)))
}
InboundMsg::RunScript { base_dir, name } => {
InboundMsg::ExecCmd {
base_dir,
cmd,
stdout_is_log,
stderr_is_log,
} => Ok((cmd, Some(base_dir), stdout_is_log, stderr_is_log)),
InboundMsg::RunScript {
base_dir,
name,
stdout_is_log,
stderr_is_log,
} => {
let cmd = extract_command_from_manifest(
&base_dir,
&name,
state_clone_inner,
)
.await?;
Ok((cmd, Some(base_dir)))
Ok((cmd, Some(base_dir), stdout_is_log, stderr_is_log))
}
}
})
Expand Down
30 changes: 24 additions & 6 deletions core/src/ten_manager/src/designer/exec/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,42 @@
//
use serde::{Deserialize, Serialize};

use crate::log::LogLineInfo;

#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum InboundMsg {
#[serde(rename = "exec_cmd")]
ExecCmd { base_dir: String, cmd: String },
ExecCmd {
base_dir: String,
cmd: String,
stdout_is_log: bool,
stderr_is_log: bool,
},

#[serde(rename = "run_script")]
RunScript { base_dir: String, name: String },
RunScript {
base_dir: String,
name: String,
stdout_is_log: bool,
stderr_is_log: bool,
},
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum OutboundMsg {
#[serde(rename = "stdout")]
StdOut { data: String },
#[serde(rename = "stdout_normal")]
StdOutNormal { data: String },

#[serde(rename = "stdout_log")]
StdOutLog { data: LogLineInfo },

#[serde(rename = "stderr_normal")]
StdErrNormal { data: String },

#[serde(rename = "stderr")]
StdErr { data: String },
#[serde(rename = "stderr_log")]
StdErrLog { data: LogLineInfo },

#[serde(rename = "exit")]
Exit { code: i32 },
Expand Down
24 changes: 2 additions & 22 deletions core/src/ten_manager/src/fs/log_file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use anyhow::{anyhow, Result};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::oneshot;

use crate::log::{extract_extension_from_log_line, parse_graph_resources_log};
use crate::log::{GraphResourcesLog, LogLineInfo, LogLineMetadata};
use crate::log::process_log_line;
use crate::log::{GraphResourcesLog, LogLineInfo};

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); // 1 minute timeout.
const DEFAULT_BUFFER_SIZE: usize = 4096; // Default read buffer size.
Expand Down Expand Up @@ -284,23 +284,3 @@ async fn watch_log_file_task(
}
}
}

/// Process a log line: try to parse as graph resources log first, then try to
/// extract extension information.
fn process_log_line(
log_line: &str,
graph_resources_log: &mut GraphResourcesLog,
) -> Option<LogLineMetadata> {
// First try to parse as graph resources log.
match parse_graph_resources_log(log_line, graph_resources_log) {
Ok(_) => {
// Successfully parsed as graph resources log, but no metadata to
// return.
None
}
Err(_) => {
// Not a graph resources log, try to extract extension information.
extract_extension_from_log_line(log_line, graph_resources_log)
}
}
}
20 changes: 20 additions & 0 deletions core/src/ten_manager/src/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,23 @@ pub fn parse_graph_resources_log(

Ok(())
}

/// Process a log line: try to parse as graph resources log first, then try to
/// extract extension information.
pub fn process_log_line(
log_line: &str,
graph_resources_log: &mut GraphResourcesLog,
) -> Option<LogLineMetadata> {
// First try to parse as graph resources log.
match parse_graph_resources_log(log_line, graph_resources_log) {
Ok(_) => {
// Successfully parsed as graph resources log, but no metadata to
// return.
None
}
Err(_) => {
// Not a graph resources log, try to extract extension information.
extract_extension_from_log_line(log_line, graph_resources_log)
}
}
}
Loading
Loading