diff --git a/src/parse/cwlogs.rs b/src/cwlogs/mod.rs similarity index 63% rename from src/parse/cwlogs.rs rename to src/cwlogs/mod.rs index 177f79d..9369b81 100644 --- a/src/parse/cwlogs.rs +++ b/src/cwlogs/mod.rs @@ -2,22 +2,26 @@ use regex::Regex; use std::sync::LazyLock; use std::{collections::HashMap, sync::Arc}; -use aws_lambda_events::cloudwatch_logs::{LogEntry, LogsEvent}; +use aws_lambda_events::cloudwatch_logs::LogsEvent; use opentelemetry_proto::tonic::{ - common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value::Value}, + common::v1::InstrumentationScope, logs::v1::{ResourceLogs, ScopeLogs}, resource::v1::Resource, }; use tracing::debug; -use crate::parse::record_parser::RecordLogEntry; +use crate::parse::platform::{LogPlatform, ParserError}; +use crate::parse::utils::string_kv; use crate::{ aws_attributes::AwsAttributes, flowlogs::{FlowLogManager, ParsedFields}, - parse::record_parser::RecordParser, tags::TagManager, }; +pub mod record_parser; + +use record_parser::RecordParser; + /// Parser handles the conversion of AWS CloudWatch Logs events into OpenTelemetry ResourceLogs pub struct Parser<'a> { aws_attributes: &'a AwsAttributes, @@ -43,28 +47,6 @@ impl<'a> Parser<'a> { /// Parse an AWS CloudWatch Logs event and return ResourceLogs pub async fn parse(&mut self, logs_event: LogsEvent) -> Result, ParserError> { - debug!( - request_id = %self.request_id, - "Starting to parse CloudWatch Logs event" - ); - - // Parse the CloudWatch Logs event into ResourceLogs - let resource_logs = self.parse_logs_event(logs_event).await?; - - debug!( - request_id = %self.request_id, - count = resource_logs.len(), - "Successfully parsed CloudWatch Logs event into ResourceLogs" - ); - - Ok(resource_logs) - } - - /// Internal method to parse the LogsEvent - async fn parse_logs_event( - &mut self, - logs_event: LogsEvent, - ) -> Result, ParserError> { let mut resource_logs_list = Vec::new(); debug!( @@ -106,73 +88,39 @@ impl<'a> Parser<'a> { // Build base attributes let mut attributes = vec![ - KeyValue { - key: "cloud.provider".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue("aws".to_string())), - }), - }, - KeyValue { - key: "cloud.region".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(self.aws_attributes.region.clone())), - }), - }, - KeyValue { - key: "cloud.account.id".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(self.aws_attributes.account_id.clone())), - }), - }, - KeyValue { - key: "cloudwatch.log.group.name".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(log_data.log_group)), - }), - }, - KeyValue { - key: "cloudwatch.log.stream.name".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(log_data.log_stream)), - }), - }, + string_kv("cloud.provider", "aws"), + string_kv("cloud.region", self.aws_attributes.region.clone()), + string_kv("cloud.account.id", self.aws_attributes.account_id.clone()), + string_kv("cloudwatch.log.group.name", log_data.log_group), + string_kv("cloudwatch.log.stream.name", log_data.log_stream), ]; // Add cloud.platform attribute based on detected platform if log_platform != LogPlatform::Unknown { - attributes.push(KeyValue { - key: "cloud.platform".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(log_platform.as_str().to_string())), - }), - }); + attributes.push(string_kv("cloud.platform", log_platform.as_str())); } // Add CloudWatch log group tags as resource attributes for (tag_key, tag_value) in log_group_tags { - attributes.push(KeyValue { - key: format!("cloudwatch.log.tags.{}", tag_key), - value: Some(AnyValue { - value: Some(Value::StringValue(tag_value)), - }), - }); + attributes.push(string_kv( + &format!("cloudwatch.log.tags.{}", tag_key), + tag_value, + )); } // Add EC2 Flow Log tags as resource attributes for (tag_key, tag_value) in flow_log_tags { - attributes.push(KeyValue { - key: format!("ec2.flow-logs.tags.{}", tag_key), - value: Some(AnyValue { - value: Some(Value::StringValue(tag_value)), - }), - }); + attributes.push(string_kv( + &format!("ec2.flow-logs.tags.{}", tag_key), + tag_value, + )); } let rec_parser = RecordParser::new(log_platform, parser_type, flow_log_parsed_fields); let log_records = log_data .log_events .into_iter() - .map(|log| rec_parser.parse(now_nanos, log.into())) + .map(|log| rec_parser.parse(now_nanos, log)) .collect(); let resource_logs = ResourceLogs { @@ -183,20 +131,16 @@ impl<'a> Parser<'a> { }), scope_logs: vec![ScopeLogs { scope: Some(InstrumentationScope { - name: "rotel-lambda-forwarder".to_string(), - version: "0.0.1".to_string(), // TODO - attributes: vec![KeyValue { - key: "aws.lambda.invoked_arn".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue( - self.aws_attributes.invoked_function_arn.clone(), - )), - }), - }], + name: env!("CARGO_PKG_NAME").to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + attributes: vec![string_kv( + "aws.lambda.invoked_arn", + self.aws_attributes.invoked_function_arn.clone(), + )], dropped_attributes_count: 0, }), log_records, - schema_url: "".to_string(), + schema_url: String::new(), }], schema_url: String::new(), }; @@ -207,45 +151,9 @@ impl<'a> Parser<'a> { } } -impl From for RecordLogEntry { - fn from(value: LogEntry) -> Self { - RecordLogEntry::new(Some(value.id), value.timestamp, value.message) - } -} - -/// Detect AWS platform from log group name -/// Represents the AWS platform/service that generated the logs -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum LogPlatform { - Eks, - Ecs, - Rds, - Lambda, - Codebuild, - Cloudtrail, - VpcFlowLog, - Unknown, -} - -impl LogPlatform { - /// Returns the platform string used in cloud.platform attribute - pub fn as_str(&self) -> &'static str { - match self { - LogPlatform::Eks => "aws_eks", - LogPlatform::Ecs => "aws_ecs", - LogPlatform::Rds => "aws_rds", - LogPlatform::Lambda => "aws_lambda", - LogPlatform::Codebuild => "aws_codebuild", - LogPlatform::Cloudtrail => "aws_cloudtrail", - LogPlatform::VpcFlowLog => "aws_vpc_flow_log", - LogPlatform::Unknown => "aws_unknown", - } - } -} - /// Represents the type of parser to use for log entries #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] -pub enum ParserType { +pub(crate) enum ParserType { Json, KeyValue, VpcLog, @@ -253,8 +161,35 @@ pub enum ParserType { Unknown, } +static CLOUDTRAIL_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r"^\d{12}_CloudTrail_").unwrap()); + +fn detect_log_platform(log_group_name: &str, log_stream_name: &str) -> LogPlatform { + if CLOUDTRAIL_REGEX.is_match(log_stream_name) { + return LogPlatform::Cloudtrail; + } + + if let Some(rest) = log_group_name.strip_prefix("/aws/") { + if rest.starts_with("eks/") { + LogPlatform::Eks + } else if rest.starts_with("ecs/") { + LogPlatform::Ecs + } else if rest.starts_with("rds/") { + LogPlatform::Rds + } else if rest.starts_with("lambda/") { + LogPlatform::Lambda + } else if rest.starts_with("codebuild/") { + LogPlatform::Codebuild + } else { + LogPlatform::Unknown + } + } else { + LogPlatform::Unknown + } +} + impl<'a> Parser<'a> { - /// Detects the log platform and parser type based on log group and stream names + /// Detects the log platform and parser type based on log group and stream names. /// Returns (platform, parser_type, optional_flow_log_parsed_fields, flow_log_tags) async fn detect_log_type( &mut self, @@ -274,7 +209,6 @@ impl<'a> Parser<'a> { "Detected EC2 Flow Log" ); - // Extract parsed fields if successful, None if parsing failed or not attempted let parsed_fields = flow_log_config.parsed_fields.as_ref().cloned(); return ( @@ -288,7 +222,7 @@ impl<'a> Parser<'a> { // Otherwise, detect the platform normally let platform = detect_log_platform(log_group_name, log_stream_name); - // Then, determine the parser type based on platform and log stream name + // Determine the parser type based on platform and log stream name let parser_type = match platform { LogPlatform::Eks => { if log_stream_name.starts_with("authenticator-") { @@ -305,58 +239,12 @@ impl<'a> Parser<'a> { } } -static CLOUDTRAIL_REGEX: LazyLock = - LazyLock::new(|| Regex::new(r"^\d{12}_CloudTrail_").unwrap()); - -fn detect_log_platform(log_group_name: &str, log_stream_name: &str) -> LogPlatform { - if CLOUDTRAIL_REGEX.is_match(log_stream_name) { - return LogPlatform::Cloudtrail; - } - - if let Some(rest) = log_group_name.strip_prefix("/aws/") { - if rest.starts_with("eks/") { - LogPlatform::Eks - } else if rest.starts_with("ecs/") { - LogPlatform::Ecs - } else if rest.starts_with("rds/") { - LogPlatform::Rds - } else if rest.starts_with("lambda/") { - LogPlatform::Lambda - } else if rest.starts_with("codebuild/") { - LogPlatform::Codebuild - } else { - LogPlatform::Unknown - } - } else { - LogPlatform::Unknown - } -} - -/// Errors that can occur during parsing -#[derive(Debug, thiserror::Error)] -pub enum ParserError { - #[error("Failed to decode CloudWatch Logs data: {0}")] - DecodeError(String), - - #[error("Failed to decompress CloudWatch Logs data: {0}")] - DecompressionError(String), - - #[error("Failed to parse JSON: {0}")] - JsonParseError(String), - - #[error("Invalid log format: {0}")] - FormatParseError(String), - - #[error("Unable to parse EC2 flow log format")] - FlowLogFormatError, -} - #[cfg(test)] mod tests { - use aws_lambda_events::cloudwatch_logs::LogEntry; - use super::*; use aws_config::BehaviorVersion; + use aws_lambda_events::cloudwatch_logs::LogEntry; + use opentelemetry_proto::tonic::common::v1::any_value::Value; #[tokio::test] async fn test_parse_empty_event() { @@ -381,7 +269,6 @@ mod tests { assert!(result.is_ok()); let resource_logs = result.unwrap(); - // Implementation creates one ResourceLogs structure assert_eq!(resource_logs.len(), 1); } @@ -389,20 +276,17 @@ mod tests { async fn test_parse_eks_authenticator_log() { let log_msg = r#"time="2025-12-24T19:48:32Z" level=info msg="access granted" arn="arn:aws:iam::927209226484:role/AWSWesleyClusterManagerLambda-Add-AddonManagerRole-1CRTQUJF13T5U" client="127.0.0.1:54812" groups="[]" method=POST path=/authenticate stsendpoint=sts.us-east-1.amazonaws.com uid="aws-iam-authenticator:927209226484:AROA5PYP2AD2FVXU23CA6" username="eks:addon-manager""#; - // Test parsing the log entry let mut log_entry = LogEntry::default(); log_entry.id = "test-id".to_string(); log_entry.timestamp = 1000; log_entry.message = log_msg.to_string(); let rec_parser = RecordParser::new(LogPlatform::Eks, ParserType::KeyValue, None); - let log_record = rec_parser.parse(123456789, log_entry.into()); + let log_record = rec_parser.parse(123456789, log_entry); - // Verify the log was parsed correctly assert_eq!(log_record.severity_number, 9); // Info assert_eq!(log_record.severity_text, "INFO"); - // Verify body contains the msg field assert!(log_record.body.is_some()); if let Some(body) = &log_record.body { if let Some(Value::StringValue(s)) = &body.value { @@ -410,17 +294,11 @@ mod tests { } } - // Verify timestamp was parsed assert!(log_record.time_unix_nano > 0); - // Verify attributes were extracted - let has_arn = log_record.attributes.iter().any(|kv| kv.key == "arn"); - let has_method = log_record.attributes.iter().any(|kv| kv.key == "method"); - let has_username = log_record.attributes.iter().any(|kv| kv.key == "username"); - - assert!(has_arn); - assert!(has_method); - assert!(has_username); + assert!(log_record.attributes.iter().any(|kv| kv.key == "arn")); + assert!(log_record.attributes.iter().any(|kv| kv.key == "method")); + assert!(log_record.attributes.iter().any(|kv| kv.key == "username")); } #[test] diff --git a/src/cwlogs/record_parser.rs b/src/cwlogs/record_parser.rs new file mode 100644 index 0000000..f76e97a --- /dev/null +++ b/src/cwlogs/record_parser.rs @@ -0,0 +1,515 @@ +use std::sync::Arc; + +use aws_lambda_events::cloudwatch_logs::LogEntry; +use opentelemetry_proto::tonic::logs::v1::LogRecord; +use tracing::warn; + +use crate::cwlogs::ParserType; +use crate::flowlogs::ParsedFields; +use crate::parse::{ + json::parse_json_to_map, + keyvalue::parse_keyvalue_to_map, + platform::LogPlatform, + record_parser::{LogBuilder, RecordParserError}, + utils::string_kv, + vpclog::parse_vpclog_to_map, +}; + +/// The outcome of attempting to parse a raw log message string into a structured map. +enum ParsedMessage { + /// The message was parsed into a JSON map; the caller should call `populate_from_map`. + Map(serde_json::Map), + /// The message could not or should not be structured; use this string as the plain-text body. + /// For VPC logs, the raw line is always preserved as the body even when a map is also + /// available — so this variant carries both the body text and an optional structured map. + PlainText(String, Option>), + /// Parsing failed outright; use this string as the plain-text body. + Error(RecordParserError), +} + +/// CloudWatch-specific log entry parser. +/// +/// `RecordParser` owns the CloudWatch concerns: +/// +/// * Building the initial [`LogRecord`] skeleton via [`LogBuilder::start`], seeding it +/// with the CloudWatch millisecond timestamp and, when non-empty, the `cloudwatch.id` +/// attribute. +/// * Dispatching the raw message string to the appropriate format parser +/// (`parse_message_to_map`) based on the detected [`ParserType`]. +/// * On success, handing the parsed map to [`crate::parse::record_parser::LogRecordBuilder::populate_from_map`] +/// which strips sensitive fields and extracts well-known fields into the record. +/// * On failure, preserving the raw message as a plain-text body. +/// +/// All [`LogRecord`] construction is delegated to [`LogBuilder`] / +/// [`crate::parse::record_parser::LogRecordBuilder`] in the `parse` module — no +/// `LogRecord { … }` literals appear here. +pub(crate) struct RecordParser { + parser_type: ParserType, + flow_log_parsed_fields: Option>, + builder: LogBuilder, +} + +impl RecordParser { + /// Create a new `RecordParser` for the given platform and parser type. + pub(crate) fn new( + platform: LogPlatform, + parser_type: ParserType, + flow_log_parsed_fields: Option>, + ) -> Self { + Self { + parser_type, + flow_log_parsed_fields, + builder: LogBuilder::new(platform), + } + } + + /// Parse a CloudWatch [`LogEntry`] into an OpenTelemetry [`LogRecord`]. + /// + /// On parse failure the raw message is preserved as the log body. + pub(crate) fn parse(&self, now_nanos: u64, log_entry: LogEntry) -> LogRecord { + // Seed the record with the CW timestamp and, when non-empty, the entry ID. + let initial_attributes = if !log_entry.id.is_empty() { + vec![string_kv("cloudwatch.id", log_entry.id)] + } else { + vec![] + }; + + let mut record_builder = + self.builder + .start(now_nanos, log_entry.timestamp, initial_attributes); + + match self.parse_message(log_entry.message) { + ParsedMessage::Map(map) => { + record_builder.populate_from_map(map); + } + ParsedMessage::PlainText(body, maybe_map) => { + record_builder = record_builder.set_body_text(body); + if let Some(map) = maybe_map { + record_builder.populate_from_map(map); + } + } + ParsedMessage::Error(RecordParserError(err, msg)) => { + warn!(error = ?err, "Failed to parse log entry, using raw text as body"); + record_builder = record_builder.set_body_text(msg); + } + } + + record_builder.finish() + } + + /// Convert a raw message string into a [`ParsedMessage`] describing what the caller + /// should do next — no mutation of external state as a side-effect. + fn parse_message(&self, message: String) -> ParsedMessage { + match self.parser_type { + ParserType::Json => match parse_json_to_map(message) { + Ok(map) => ParsedMessage::Map(map), + Err(e) => ParsedMessage::Error(e), + }, + + ParserType::KeyValue => match parse_keyvalue_to_map(message) { + Ok(map) => ParsedMessage::Map(map), + Err(e) => ParsedMessage::Error(e), + }, + + ParserType::VpcLog => { + // VPC Flow Logs always preserve the raw line as the body. + // If parsed fields are available, also return a structured map so that + // individual flow-log fields are emitted as attributes. + match self.flow_log_parsed_fields.as_ref() { + Some(parsed_fields) => { + match parse_vpclog_to_map(message.clone(), parsed_fields.clone()) { + Ok(map) => ParsedMessage::PlainText(message, Some(map)), + Err(e) => ParsedMessage::Error(e), + } + } + // No field definitions available — just preserve the raw body. + None => ParsedMessage::PlainText(message, None), + } + } + + ParserType::Unknown => { + // Auto-detect: attempt JSON for messages that look like objects; + // otherwise treat as opaque plain text. + if message.len() > 2 && message.starts_with('{') { + match parse_json_to_map(message) { + Ok(map) => ParsedMessage::Map(map), + Err(e) => ParsedMessage::Error(e), + } + } else { + ParsedMessage::PlainText(message, None) + } + } + } + } +} + +#[cfg(test)] +mod tests { + use aws_lambda_events::cloudwatch_logs::LogEntry; + use opentelemetry_proto::tonic::common::v1::any_value::Value as AnyValueInner; + + use super::*; + + fn make_entry(id: &str, timestamp: i64, message: &str) -> LogEntry { + let mut entry = LogEntry::default(); + entry.id = id.to_string(); + entry.timestamp = timestamp; + entry.message = message.to_string(); + entry + } + + fn parse(message: &str, platform: LogPlatform, parser_type: ParserType) -> LogRecord { + let parser = RecordParser::new(platform, parser_type, None); + parser.parse(123_456_789, make_entry("test-id", 1000, message)) + } + + #[test] + fn test_cloudwatch_id_attached_as_attribute() { + let lr = parse(r#"{"msg":"hi"}"#, LogPlatform::Unknown, ParserType::Json); + assert!( + lr.attributes.iter().any(|kv| kv.key == "cloudwatch.id"), + "cloudwatch.id attribute should be present" + ); + } + + #[test] + fn test_cloudwatch_id_value() { + let lr = parse(r#"{"msg":"hi"}"#, LogPlatform::Unknown, ParserType::Json); + let attr = lr + .attributes + .iter() + .find(|kv| kv.key == "cloudwatch.id") + .expect("cloudwatch.id not found"); + if let Some(AnyValueInner::StringValue(s)) = + attr.value.as_ref().and_then(|v| v.value.as_ref()) + { + assert_eq!(s, "test-id"); + } else { + panic!("expected StringValue for cloudwatch.id"); + } + } + + #[test] + fn test_no_cloudwatch_id_when_empty() { + let parser = RecordParser::new(LogPlatform::Unknown, ParserType::Json, None); + let lr = parser.parse(123_456_789, make_entry("", 1000, r#"{"msg":"hi"}"#)); + assert!( + !lr.attributes.iter().any(|kv| kv.key == "cloudwatch.id"), + "cloudwatch.id should be absent when id is empty" + ); + } + + #[test] + fn test_timestamp_set_from_entry() { + // 1000 ms → 1_000_000_000 ns + let lr = parse(r#"{"msg":"hi"}"#, LogPlatform::Unknown, ParserType::Json); + assert_eq!(lr.time_unix_nano, 1_000_000_000u64); + } + + #[test] + fn test_parse_json_type() { + use opentelemetry_proto::tonic::logs::v1::SeverityNumber; + let lr = parse( + r#"{"level":"info","msg":"hello","user":"alice"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + assert_eq!(lr.severity_number, SeverityNumber::Info as i32); + assert_eq!(lr.severity_text, "INFO"); + if let Some(body) = &lr.body { + if let Some(AnyValueInner::StringValue(s)) = &body.value { + assert_eq!(s, "hello"); + } else { + panic!("expected string body"); + } + } else { + panic!("body should be set"); + } + assert!(lr.attributes.iter().any(|kv| kv.key == "user")); + } + + #[test] + fn test_parse_keyvalue_type() { + use opentelemetry_proto::tonic::logs::v1::SeverityNumber; + let lr = parse( + r#"time="2025-01-01T00:00:00Z" level=warn msg="something happened" host=myhost"#, + LogPlatform::Eks, + ParserType::KeyValue, + ); + assert_eq!(lr.severity_number, SeverityNumber::Warn as i32); + if let Some(body) = &lr.body { + if let Some(AnyValueInner::StringValue(s)) = &body.value { + assert_eq!(s, "something happened"); + } else { + panic!("expected string body"); + } + } else { + panic!("body should be set"); + } + assert!(lr.attributes.iter().any(|kv| kv.key == "host")); + } + + #[test] + fn test_parse_unknown_auto_detects_json() { + use opentelemetry_proto::tonic::logs::v1::SeverityNumber; + let lr = parse( + r#"{"level":"debug","msg":"auto detected"}"#, + LogPlatform::Unknown, + ParserType::Unknown, + ); + assert_eq!(lr.severity_number, SeverityNumber::Debug as i32); + assert!(lr.body.is_some()); + } + + #[test] + fn test_parse_unknown_plain_text_fallback() { + let msg = "just a plain log line"; + let lr = parse(msg, LogPlatform::Unknown, ParserType::Unknown); + if let Some(body) = &lr.body { + if let Some(AnyValueInner::StringValue(s)) = &body.value { + assert_eq!(s, msg); + } else { + panic!("expected string body"); + } + } else { + panic!("body should be set for plain text"); + } + } + + #[test] + fn test_parse_invalid_json_falls_back_to_plain_text() { + let bad_json = r#"{ this is not valid json }"#; + let lr = parse(bad_json, LogPlatform::Unknown, ParserType::Json); + if let Some(body) = &lr.body { + if let Some(AnyValueInner::StringValue(s)) = &body.value { + assert_eq!(s, bad_json); + } else { + panic!("expected string body on failure"); + } + } else { + panic!("body should be set on parse failure"); + } + } + + #[test] + fn test_vpc_log_body_always_set() { + // With no parsed fields the VPC branch should still set the body. + let raw = "2 123456789012 eni-abc 10.0.0.1 10.0.0.2 80 443 6 10 1000 0 0 ACCEPT OK"; + let parser = RecordParser::new(LogPlatform::VpcFlowLog, ParserType::VpcLog, None); + let lr = parser.parse(123_456_789, make_entry("", 1000, raw)); + if let Some(body) = &lr.body { + if let Some(AnyValueInner::StringValue(s)) = &body.value { + assert_eq!(s, raw); + } else { + panic!("expected string body for VPC log"); + } + } else { + panic!("body should always be set for VPC logs"); + } + } + + #[test] + fn test_parse_json_with_level() { + use opentelemetry_proto::tonic::logs::v1::SeverityNumber; + let lr = parse( + r#"{"level":"info","msg":"test message"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + assert_eq!(lr.severity_number, SeverityNumber::Info as i32); + assert_eq!(lr.severity_text, "INFO"); + assert!(lr.body.is_some()); + } + + #[test] + fn test_parse_json_body_fields() { + // Test 'msg' field + let lr = parse( + r#"{"msg":"test message"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + assert!(lr.body.is_some()); + if let Some(body) = &lr.body { + if let Some(AnyValueInner::StringValue(s)) = &body.value { + assert_eq!(s, "test message"); + } else { + panic!("expected string body"); + } + } + + // Test 'log' field (when 'msg' not present) + let lr = parse( + r#"{"log":"test log"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + assert!(lr.body.is_some()); + + // Test 'message' field (when 'msg' and 'log' not present) + let lr = parse( + r#"{"message":"test message field"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + assert!(lr.body.is_some()); + } + + #[test] + fn test_parse_json_timestamp_float() { + let lr = parse( + r#"{"ts":1234567890.5,"msg":"test"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + // Should convert seconds to nanoseconds + assert_eq!(lr.time_unix_nano, 1234567890500000000); + } + + #[test] + fn test_parse_json_timestamp_rfc3339() { + let lr = parse( + r#"{"timestamp":"2024-01-01T12:00:00Z","msg":"test"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + // Should parse RFC3339 timestamp + assert!(lr.time_unix_nano > 0); + } + + #[test] + fn test_parse_json_attributes() { + let lr = parse( + r#"{"level":"error","msg":"test","user_id":123,"session":"abc123","active":true}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + // Should have attributes for fields not extracted (including cloudwatch.id) + assert!(!lr.attributes.is_empty()); + assert!(lr.attributes.iter().any(|kv| kv.key == "user_id")); + assert!(lr.attributes.iter().any(|kv| kv.key == "session")); + assert!(lr.attributes.iter().any(|kv| kv.key == "active")); + } + + #[test] + fn test_parse_json_with_trace_id() { + let lr = parse( + r#"{"msg":"test","trace_id":"0123456789abcdef0123456789abcdef"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + assert_eq!(lr.trace_id.len(), 16); + } + + #[test] + fn test_parse_json_with_span_id() { + let lr = parse( + r#"{"msg":"test","span_id":"0123456789abcdef"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + assert_eq!(lr.span_id.len(), 8); + } + + #[test] + fn test_parse_json_with_both_trace_and_span_id() { + let lr = parse( + r#"{"msg":"test","trace_id":"0123456789abcdef0123456789abcdef","span_id":"0123456789abcdef"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + assert_eq!(lr.trace_id.len(), 16); + assert_eq!(lr.span_id.len(), 8); + } + + #[test] + fn test_parse_json_with_invalid_trace_id_length() { + let lr = parse( + r#"{"msg":"test","trace_id":"0123456789abcdef"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + // trace_id should be empty because it's not 16 bytes + assert_eq!(lr.trace_id.len(), 0); + } + + #[test] + fn test_parse_json_with_invalid_span_id_length() { + let lr = parse( + r#"{"msg":"test","span_id":"0123"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + // span_id should be empty because it's not 8 bytes + assert_eq!(lr.span_id.len(), 0); + } + + #[test] + fn test_parse_json_with_non_hex_trace_id() { + let lr = parse( + r#"{"msg":"test","trace_id":"not-a-hex-string-xxxxxxx"}"#, + LogPlatform::Unknown, + ParserType::Json, + ); + assert_eq!(lr.trace_id.len(), 0); + } + + #[test] + fn test_parse_cloudtrail_aws_vpce_event() { + let json_msg = r#"{ + "eventVersion": "1.09", + "userIdentity": { + "type": "AssumedRole", + "principalId": "ASIAIOSFODNN7EXAMPLE:role-name", + "arn": "arn:aws:sts::123456789012:assumed-role/Admin/role-name", + "accountId": "123456789012", + "accessKeyId": "ASIAIOSFODNN7EXAMPLE", + "sessionContext": { + "sessionIssuer": { + "type": "Role", + "principalId": "ASIAIOSFODNN7EXAMPLE", + "arn": "arn:aws:iam::123456789012:role/Admin", + "accountId": "123456789012", + "userName": "Admin" + }, + "attributes": { + "creationDate": "2024-06-04T23:10:46Z", + "mfaAuthenticated": "false" + } + } + }, + "eventTime": "2024-06-04T23:12:50Z", + "eventSource": "kms.amazonaws.com", + "eventName": "ListKeys", + "awsRegion": "us-east-1", + "sourceIPAddress": "192.0.2.0", + "requestID": "16bcc089-ac49-43f1-9177-EXAMPLE23731", + "eventID": "228ca3c8-5f95-4a8a-9732-EXAMPLE60ed9", + "eventType": "AwsVpceEvent", + "recipientAccountId": "123456789012", + "sharedEventID": "a1f3720c-ef19-47e9-a5d5-EXAMPLE8099f", + "vpcEndpointId": "vpce-EXAMPLE08c1b6b9b7", + "vpcEndpointAccountId": "123456789012", + "eventCategory": "NetworkActivity" +}"#; + let lr = parse(json_msg, LogPlatform::Cloudtrail, ParserType::Json); + + // Verify the body is set to "AwsVpceEvent::ListKeys" + assert!(lr.body.is_some()); + if let Some(body) = &lr.body { + if let Some(AnyValueInner::StringValue(s)) = &body.value { + assert_eq!(s, "AwsVpceEvent::ListKeys"); + } else { + panic!("Expected StringValue in body"); + } + } + + // Verify that eventType and eventName are still in attributes + assert!(lr.attributes.iter().any(|kv| kv.key == "eventType")); + assert!(lr.attributes.iter().any(|kv| kv.key == "eventName")); + + // Verify other fields are present as attributes + assert!(lr.attributes.iter().any(|kv| kv.key == "eventSource")); + assert!(lr.attributes.iter().any(|kv| kv.key == "awsRegion")); + } +} diff --git a/src/forward/forwarder.rs b/src/forward/forwarder.rs index 4839273..5af953f 100644 --- a/src/forward/forwarder.rs +++ b/src/forward/forwarder.rs @@ -9,10 +9,10 @@ use rotel::topology::payload::{Message, MessageMetadata}; use tracing::{debug, error}; use crate::aws_attributes::AwsAttributes; +use crate::cwlogs; use crate::events::{LambdaEvent, LambdaPayload}; use crate::flowlogs::FlowLogManager; use crate::forward::{AckerBuilder, AckerWaiter}; -use crate::parse::cwlogs; use crate::s3logs; use crate::tags::TagManager; diff --git a/src/lib.rs b/src/lib.rs index a446958..cccefda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod aws_attributes; +pub mod cwlogs; pub mod events; pub mod flowlogs; pub mod forward; diff --git a/src/parse/field_stripper.rs b/src/parse/field_stripper.rs index 49e4804..a789131 100644 --- a/src/parse/field_stripper.rs +++ b/src/parse/field_stripper.rs @@ -8,7 +8,7 @@ use serde_json::Value as JsonValue; use tracing::debug; -use crate::parse::cwlogs::LogPlatform; +use crate::parse::platform::LogPlatform; /// Strips sensitive fields from parsed log data based on the log platform. pub struct FieldStripper { diff --git a/src/parse/json.rs b/src/parse/json.rs index 53cd1c0..5e5aa07 100644 --- a/src/parse/json.rs +++ b/src/parse/json.rs @@ -8,7 +8,7 @@ use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value::Value}; use serde_json::Value as JsonValue; -use crate::parse::{cwlogs::ParserError, record_parser::RecordParserError}; +use crate::parse::{platform::ParserError, record_parser::RecordParserError}; /// Parse a JSON log entry and return the parsed map /// Returns an error if the message is not valid JSON or not an object @@ -90,152 +90,8 @@ pub fn json_value_to_any_value(value: JsonValue) -> AnyValue { #[cfg(test)] mod tests { use super::*; - use crate::parse::cwlogs::{LogPlatform, ParserType}; - use crate::parse::record_parser::{RecordParser, map_log_level}; - use aws_lambda_events::cloudwatch_logs::LogEntry; - use opentelemetry_proto::tonic::logs::v1::{LogRecord, SeverityNumber}; - - /// Test utility: Create a LogEntry from a message string - fn create_log_entry(message: &str) -> LogEntry { - let mut log_entry = LogEntry::default(); - log_entry.id = "test-id".to_string(); - log_entry.timestamp = 1000; - log_entry.message = message.to_string(); - log_entry - } - - /// Test utility: Parse a log message and return the LogRecord - fn parse_log_msg(message: &str, platform: LogPlatform) -> LogRecord { - let log_entry = create_log_entry(message); - let parser = RecordParser::new(platform, ParserType::Json, None); - parser.parse(123456789, log_entry.into()) - } - - #[test] - fn test_parse_json_with_level() { - let json_msg = r#"{"level":"info","msg":"test message"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - assert_eq!(log_record.severity_number, SeverityNumber::Info as i32); - assert_eq!(log_record.severity_text, "INFO"); - assert!(log_record.body.is_some()); - } - - #[test] - fn test_parse_json_body_fields() { - // Test 'msg' field - let json_msg = r#"{"msg":"test message"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - assert!(log_record.body.is_some()); - if let Some(body) = &log_record.body { - if let Some(Value::StringValue(s)) = &body.value { - assert_eq!(s, "test message"); - } - } - - // Test 'log' field (when 'msg' not present) - let json_msg = r#"{"log":"test log"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - assert!(log_record.body.is_some()); - - // Test 'message' field (when 'msg' and 'log' not present) - let json_msg = r#"{"message":"test message field"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - assert!(log_record.body.is_some()); - } - - #[test] - fn test_parse_json_timestamp_float() { - let json_msg = r#"{"ts":1234567890.5,"msg":"test"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - // Should convert seconds to nanoseconds - assert_eq!(log_record.time_unix_nano, 1234567890500000000); - } - - #[test] - fn test_parse_json_timestamp_rfc3339() { - let json_msg = r#"{"timestamp":"2024-01-01T12:00:00Z","msg":"test"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - // Should parse RFC3339 timestamp - assert!(log_record.time_unix_nano > 0); - } - - #[test] - fn test_parse_json_attributes() { - let json_msg = - r#"{"level":"error","msg":"test","user_id":123,"session":"abc123","active":true}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - // Should have attributes for fields not extracted (including cloudwatch.id) - assert!(log_record.attributes.len() > 0); - - // Check for specific attributes - let has_user_id = log_record.attributes.iter().any(|kv| kv.key == "user_id"); - let has_session = log_record.attributes.iter().any(|kv| kv.key == "session"); - let has_active = log_record.attributes.iter().any(|kv| kv.key == "active"); - - assert!(has_user_id); - assert!(has_session); - assert!(has_active); - } - - #[test] - fn test_parse_json_with_trace_id() { - let json_msg = r#"{"msg":"test","trace_id":"0123456789abcdef0123456789abcdef"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - assert_eq!(log_record.trace_id.len(), 16); - } - - #[test] - fn test_parse_json_with_span_id() { - let json_msg = r#"{"msg":"test","span_id":"0123456789abcdef"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - assert_eq!(log_record.span_id.len(), 8); - } - - #[test] - fn test_parse_json_with_both_trace_and_span_id() { - let json_msg = r#"{ - "msg":"test", - "trace_id":"0123456789abcdef0123456789abcdef", - "span_id":"0123456789abcdef" - }"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - assert_eq!(log_record.trace_id.len(), 16); - assert_eq!(log_record.span_id.len(), 8); - } - - #[test] - fn test_parse_json_with_invalid_trace_id_length() { - let json_msg = r#"{"msg":"test","trace_id":"0123456789abcdef"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - // trace_id should be empty because it's not 16 bytes - assert_eq!(log_record.trace_id.len(), 0); - } - - #[test] - fn test_parse_json_with_invalid_span_id_length() { - let json_msg = r#"{"msg":"test","span_id":"0123"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - // span_id should be empty because it's not 8 bytes - assert_eq!(log_record.span_id.len(), 0); - } - - #[test] - fn test_parse_json_with_non_hex_trace_id() { - let json_msg = r#"{"msg":"test","trace_id":"not-a-hex-string-xxxxxxx"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown); - - assert_eq!(log_record.trace_id.len(), 0); - } + use crate::parse::record_parser::map_log_level; + use opentelemetry_proto::tonic::logs::v1::SeverityNumber; #[test] fn test_map_log_level() { @@ -299,70 +155,4 @@ mod tests { let any_val = json_value_to_any_value(json_val); assert!(matches!(any_val.value, Some(Value::ArrayValue(_)))); } - - #[test] - fn test_parse_cloudtrail_aws_vpce_event() { - let json_msg = r#"{ - "eventVersion": "1.09", - "userIdentity": { - "type": "AssumedRole", - "principalId": "ASIAIOSFODNN7EXAMPLE:role-name", - "arn": "arn:aws:sts::123456789012:assumed-role/Admin/role-name", - "accountId": "123456789012", - "accessKeyId": "ASIAIOSFODNN7EXAMPLE", - "sessionContext": { - "sessionIssuer": { - "type": "Role", - "principalId": "ASIAIOSFODNN7EXAMPLE", - "arn": "arn:aws:iam::123456789012:role/Admin", - "accountId": "123456789012", - "userName": "Admin" - }, - "attributes": { - "creationDate": "2024-06-04T23:10:46Z", - "mfaAuthenticated": "false" - } - } - }, - "eventTime": "2024-06-04T23:12:50Z", - "eventSource": "kms.amazonaws.com", - "eventName": "ListKeys", - "awsRegion": "us-east-1", - "sourceIPAddress": "192.0.2.0", - "requestID": "16bcc089-ac49-43f1-9177-EXAMPLE23731", - "eventID": "228ca3c8-5f95-4a8a-9732-EXAMPLE60ed9", - "eventType": "AwsVpceEvent", - "recipientAccountId": "123456789012", - "sharedEventID": "a1f3720c-ef19-47e9-a5d5-EXAMPLE8099f", - "vpcEndpointId": "vpce-EXAMPLE08c1b6b9b7", - "vpcEndpointAccountId": "123456789012", - "eventCategory": "NetworkActivity" -}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Cloudtrail); - - // Verify the body is set to "AwsVpceEvent::ListKeys" - assert!(log_record.body.is_some()); - if let Some(body) = &log_record.body { - if let Some(Value::StringValue(s)) = &body.value { - assert_eq!(s, "AwsVpceEvent::ListKeys"); - } else { - panic!("Expected StringValue in body"); - } - } - - // Verify that eventType and eventName are still in attributes - let has_event_type = log_record.attributes.iter().any(|kv| kv.key == "eventType"); - let has_event_name = log_record.attributes.iter().any(|kv| kv.key == "eventName"); - assert!(has_event_type); - assert!(has_event_name); - - // Verify other fields are present as attributes - let has_event_source = log_record - .attributes - .iter() - .any(|kv| kv.key == "eventSource"); - let has_aws_region = log_record.attributes.iter().any(|kv| kv.key == "awsRegion"); - assert!(has_event_source); - assert!(has_aws_region); - } } diff --git a/src/parse/keyvalue.rs b/src/parse/keyvalue.rs index e50c754..a6463c3 100644 --- a/src/parse/keyvalue.rs +++ b/src/parse/keyvalue.rs @@ -8,7 +8,7 @@ use serde_json::Value as JsonValue; use tracing::debug; -use crate::parse::{cwlogs::ParserError, record_parser::RecordParserError}; +use crate::parse::{platform::ParserError, record_parser::RecordParserError}; /// Parse key-value pairs from a string and return as a serde_json::Map /// All values are stored as JsonValue::String @@ -146,8 +146,9 @@ fn parse_keyvalue_pairs(input: &str) -> Vec<(String, String)> { #[cfg(test)] mod tests { use super::*; - use crate::parse::cwlogs::{LogPlatform, ParserType}; - use crate::parse::record_parser::RecordParser; + use crate::cwlogs::ParserType; + use crate::cwlogs::record_parser::RecordParser; + use crate::parse::platform::LogPlatform; use aws_lambda_events::cloudwatch_logs::LogEntry; use opentelemetry_proto::tonic::{ common::v1::any_value::Value, diff --git a/src/parse/mod.rs b/src/parse/mod.rs index 73a347b..91a9677 100644 --- a/src/parse/mod.rs +++ b/src/parse/mod.rs @@ -1,6 +1,7 @@ -pub mod cwlogs; pub mod field_stripper; pub mod json; pub mod keyvalue; +pub mod platform; pub mod record_parser; +pub mod utils; pub mod vpclog; diff --git a/src/parse/platform.rs b/src/parse/platform.rs new file mode 100644 index 0000000..ee32ce6 --- /dev/null +++ b/src/parse/platform.rs @@ -0,0 +1,47 @@ +/// Represents the AWS platform/service that generated the logs +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LogPlatform { + Eks, + Ecs, + Rds, + Lambda, + Codebuild, + Cloudtrail, + VpcFlowLog, + Unknown, +} + +impl LogPlatform { + /// Returns the platform string used in cloud.platform attribute + pub fn as_str(&self) -> &'static str { + match self { + LogPlatform::Eks => "aws_eks", + LogPlatform::Ecs => "aws_ecs", + LogPlatform::Rds => "aws_rds", + LogPlatform::Lambda => "aws_lambda", + LogPlatform::Codebuild => "aws_codebuild", + LogPlatform::Cloudtrail => "aws_cloudtrail", + LogPlatform::VpcFlowLog => "aws_vpc_flow_log", + LogPlatform::Unknown => "aws_unknown", + } + } +} + +/// Errors that can occur during parsing +#[derive(Debug, thiserror::Error)] +pub enum ParserError { + #[error("Failed to decode CloudWatch Logs data: {0}")] + DecodeError(String), + + #[error("Failed to decompress CloudWatch Logs data: {0}")] + DecompressionError(String), + + #[error("Failed to parse JSON: {0}")] + JsonParseError(String), + + #[error("Invalid log format: {0}")] + FormatParseError(String), + + #[error("Unable to parse EC2 flow log format")] + FlowLogFormatError, +} diff --git a/src/parse/record_parser.rs b/src/parse/record_parser.rs index 5e77146..eb11fe1 100644 --- a/src/parse/record_parser.rs +++ b/src/parse/record_parser.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, OnceLock}; +use std::sync::OnceLock; use opentelemetry_proto::tonic::{ common::v1::{AnyValue, KeyValue, any_value::Value}, @@ -6,213 +6,153 @@ use opentelemetry_proto::tonic::{ }; use regex::Regex; use serde_json::Value as JsonValue; -use tracing::{debug, warn}; +use tracing::debug; -use crate::flowlogs::ParsedFields; use crate::parse::{ - cwlogs::{LogPlatform, ParserError, ParserType}, field_stripper::FieldStripper, - json::{json_value_to_any_value, parse_json_to_map}, - keyvalue::parse_keyvalue_to_map, - vpclog::parse_vpclog_to_map, + json::json_value_to_any_value, + platform::{LogPlatform, ParserError}, }; -/// Parser for CloudWatch log entries that converts them into OpenTelemetry LogRecords. +/// Wraps a parse error together with the original raw message so that callers can fall back +/// to treating the message as plain text. +#[derive(Debug)] +pub(crate) struct RecordParserError(pub(crate) ParserError, pub(crate) String); + +// ── LogBuilder ─────────────────────────────────────────────────────────────────────────────────── + +/// Platform-aware factory for building OpenTelemetry [`LogRecord`]s. /// -/// The `RecordParser` handles format detection, parsing, field extraction, and -/// platform-specific transformations in a unified way for both JSON and key-value logs. -pub(crate) struct RecordParser { +/// `LogBuilder` is the long-lived, reusable part: it holds the platform identity and the +/// [`FieldStripper`] configuration that are shared across all records produced from the same +/// log source. Call [`LogBuilder::start`] to obtain a per-record [`LogRecordBuilder`] that +/// accumulates state and ultimately returns the finished [`LogRecord`]. +pub(crate) struct LogBuilder { platform: LogPlatform, - parser_type: ParserType, field_stripper: FieldStripper, - flow_log_parsed_fields: Option>, -} - -pub(crate) struct RecordLogEntry { - cloudwatch_id: Option, - timestamp: i64, - message: String, -} - -impl RecordLogEntry { - pub(crate) fn new(cloudwatch_id: Option, timestamp: i64, message: String) -> Self { - Self { - cloudwatch_id, - timestamp, - message, - } - } } -#[derive(Debug)] -pub(crate) struct RecordParserError(pub(crate) ParserError, pub(crate) String); - -impl RecordParser { - /// Create a new RecordParser for a specific platform and parser type. - pub(crate) fn new( - platform: LogPlatform, - parser_type: ParserType, - flow_log_parsed_fields: Option>, - ) -> Self { +impl LogBuilder { + /// Create a new `LogBuilder` for the given platform. + pub(crate) fn new(platform: LogPlatform) -> Self { Self { platform, - parser_type, field_stripper: FieldStripper::new(platform), - flow_log_parsed_fields, } } - /// Parse a CloudWatch LogEntry into an OpenTelemetry LogRecord. - /// If parsing fails, the message is treated as plain text. - pub(crate) fn parse(&self, now_nanos: u64, log_entry: RecordLogEntry) -> LogRecord { - let mut lr = LogRecord { - time_unix_nano: timestamp_ms_to_ns(log_entry.timestamp, now_nanos), + /// Begin building a new [`LogRecord`]. + /// + /// * `now_nanos` — wall-clock nanoseconds used as `observed_time_unix_nano` and as a + /// fallback `time_unix_nano` when `timestamp_ms` would overflow. + /// * `timestamp_ms` — millisecond-precision event timestamp converted to `time_unix_nano`. + /// * `initial_attributes` — zero or more [`KeyValue`] pairs prepended to the record's + /// attribute list before any map-derived attributes are added (e.g. `cloudwatch.id`). + pub(crate) fn start( + &self, + now_nanos: u64, + timestamp_ms: i64, + initial_attributes: Vec, + ) -> LogRecordBuilder<'_> { + let lr = LogRecord { + time_unix_nano: timestamp_ms_to_ns(timestamp_ms, now_nanos), observed_time_unix_nano: now_nanos, + attributes: initial_attributes, ..Default::default() }; - - if let Some(cloudwatch_id) = log_entry.cloudwatch_id { - lr.attributes.push(KeyValue { - key: "cloudwatch.id".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(cloudwatch_id)), - }), - }) - } - - let message = log_entry.message; - - // Try to parse the message into a map - let json_map = self.parse_message_to_map(message, &mut lr); - - match json_map { - Ok(None) => return lr, - Ok(Some(mut map)) => { - self.field_stripper.strip_fields(&mut map); - - self.populate_log_record_from_map(map, &mut lr); - } - Err(RecordParserError(err, msg)) => { - warn!(error = ?err, "Failed to parse log entry, using raw text as body"); - - // If parsing failed, treat as plain text - lr.body = Some(AnyValue { - value: Some(Value::StringValue(msg)), - }); - } + LogRecordBuilder { + lr, + platform: self.platform, + field_stripper: &self.field_stripper, } - - lr } - /// Parse a pre-parsed JSON map into an OpenTelemetry LogRecord. - /// This is used for S3 logs where the entire file is a JSON blob with a Records array. + /// Convenience: build a [`LogRecord`] directly from a pre-parsed JSON map. + /// + /// Used by `s3logs` for JSON-blob files where the entire file is a `Records` array + /// and each element has already been deserialised. pub(crate) fn parse_from_map( &self, now_nanos: u64, - timestamp: i64, - mut json_map: serde_json::Map, + timestamp_ms: i64, + json_map: serde_json::Map, ) -> LogRecord { - let mut lr = LogRecord { - time_unix_nano: timestamp_ms_to_ns(timestamp, now_nanos), - observed_time_unix_nano: now_nanos, - ..Default::default() - }; + let mut builder = self.start(now_nanos, timestamp_ms, vec![]); + builder.populate_from_map(json_map); + builder.finish() + } +} - // Strip sensitive fields - self.field_stripper.strip_fields(&mut json_map); +// ── LogRecordBuilder ───────────────────────────────────────────────────────────────────────────── - // Populate the log record from the map - self.populate_log_record_from_map(json_map, &mut lr); +/// Per-record builder returned by [`LogBuilder::start`]. +/// +/// Holds the partially-constructed [`LogRecord`] and a reference back to the parent +/// [`LogBuilder`]'s platform configuration and field stripper. Obtain a finished record by +/// calling [`LogRecordBuilder::finish`]. +pub(crate) struct LogRecordBuilder<'a> { + lr: LogRecord, + platform: LogPlatform, + field_stripper: &'a FieldStripper, +} - lr +impl<'a> LogRecordBuilder<'a> { + /// Set the record body to a plain-text string value. + pub(crate) fn set_body_text(mut self, text: String) -> Self { + self.lr.body = Some(AnyValue { + value: Some(Value::StringValue(text)), + }); + self } - /// Parse a message string into a JSON map based on the parser type. - /// This is the first phase of parsing that converts a string message into a structured map. - fn parse_message_to_map( - &self, - message: String, - lr: &mut LogRecord, - ) -> Result>, RecordParserError> { - match self.parser_type { - ParserType::Json => parse_json_to_map(message).map(|r| Some(r)), - ParserType::KeyValue => parse_keyvalue_to_map(message).map(|r| Some(r)), - ParserType::VpcLog => { - // Always keep log in Body - lr.body = Some(AnyValue { - value: Some(Value::StringValue(message.clone())), - }); - - match self.flow_log_parsed_fields.as_ref() { - Some(parsed_fields) => { - parse_vpclog_to_map(message, parsed_fields.clone()).map(|r| Some(r)) - } - None => Ok(None), - } - } - ParserType::Unknown => { - // Auto-detect: try JSON first, otherwise try keyvalue, otherwise plain text - if message.len() > 2 && message.starts_with("{") { - parse_json_to_map(message).map(|r| Some(r)) - } else { - // Otherwise, just use the string as the body - lr.body = Some(AnyValue { - value: Some(Value::StringValue(message)), - }); - Ok(None) - } - } - } - } + /// Strip sensitive fields from `json_map` then extract well-known fields (severity, body, + /// timestamp, trace/span IDs) and platform-specific fallbacks into the in-progress record. + /// Any remaining map entries become log record attributes. + pub(crate) fn populate_from_map(&mut self, mut json_map: serde_json::Map) { + // Strip sensitive fields before anything else. + self.field_stripper.strip_fields(&mut json_map); - /// Populate a LogRecord from a parsed map (applies to both JSON and keyvalue formats) - fn populate_log_record_from_map( - &self, - mut json_map: serde_json::Map, - lr: &mut LogRecord, - ) { - // Handle "level" field for severity + // ── Severity ──────────────────────────────────────────────────────────────── if let Some(level_value) = json_map.remove("level") { - if let JsonValue::String(level_str) = &level_value { + if let JsonValue::String(ref level_str) = level_value { if let Some((severity_number, severity_text)) = map_log_level(level_str) { - lr.severity_number = severity_number as i32; - lr.severity_text = severity_text; + self.lr.severity_number = severity_number as i32; + self.lr.severity_text = severity_text; } else { - // put back if not matched + // Not a recognised level string — keep as an attribute. json_map.insert("level".to_string(), level_value); } } else { - // put back if not a string + // Not a string — keep as an attribute. json_map.insert("level".to_string(), level_value); } } - // Check for body fields in order: 'msg', 'log', 'message' - let body_keys = ["msg", "log", "message"]; - for key in body_keys { + // ── Body ──────────────────────────────────────────────────────────────────── + // Priority order: 'msg', 'log', 'message'. + for key in ["msg", "log", "message"] { if let Some(body_value) = json_map.remove(key) { - lr.body = Some(json_value_to_any_value(body_value)); + self.lr.body = Some(json_value_to_any_value(body_value)); break; } } - // Check for timestamp fields: 'ts', 'time', or 'timestamp' - let timestamp_keys = ["ts", "time", "timestamp"]; - for key in timestamp_keys { + // ── Timestamp ─────────────────────────────────────────────────────────────── + // Priority order: 'ts', 'time', 'timestamp'. + for key in ["ts", "time", "timestamp"] { if let Some(ts_value) = json_map.remove(key) { if let Some(parsed_nanos) = parse_timestamp(&ts_value) { - lr.time_unix_nano = parsed_nanos; + self.lr.time_unix_nano = parsed_nanos; } break; } } - // Handle trace_id and span_id if they exist + // ── Trace / Span IDs ──────────────────────────────────────────────────────── if let Some(JsonValue::String(trace_id_str)) = json_map.remove("trace_id") { if let Ok(trace_bytes) = decode_hex(&trace_id_str) { if trace_bytes.len() == 16 { - lr.trace_id = trace_bytes; + self.lr.trace_id = trace_bytes; } else { debug!( "trace_id has wrong length: expected 16 bytes, got {}", @@ -227,7 +167,7 @@ impl RecordParser { if let Some(JsonValue::String(span_id_str)) = json_map.remove("span_id") { if let Ok(span_bytes) = decode_hex(&span_id_str) { if span_bytes.len() == 8 { - lr.span_id = span_bytes; + self.lr.span_id = span_bytes; } else { debug!( "span_id has wrong length: expected 8 bytes, got {}", @@ -239,35 +179,41 @@ impl RecordParser { } } - // Platform-specific body handling - // CloudTrail: Use eventType::eventName as body if no body field was found - if self.platform == LogPlatform::Cloudtrail && lr.body.is_none() { + // ── Platform-specific body fallback ───────────────────────────────────────── + // CloudTrail: synthesise "eventType::eventName" when no body field was found. + if self.platform == LogPlatform::Cloudtrail && self.lr.body.is_none() { let event_type = json_map.get("eventType"); let event_name = json_map.get("eventName"); - lr.body = match (event_type, event_name) { + self.lr.body = match (event_type, event_name) { (Some(JsonValue::String(t)), Some(JsonValue::String(n))) => Some(AnyValue { value: Some(Value::StringValue(format!("{}::{}", t, n))), }), (Some(JsonValue::String(t)), None) => Some(AnyValue { value: Some(Value::StringValue(t.to_string())), }), - (_, _) => None, + _ => None, }; } - // Convert remaining fields to attributes + // ── Remaining fields → attributes ──────────────────────────────────────────── for (key, value) in json_map { - lr.attributes.push(KeyValue { + self.lr.attributes.push(KeyValue { key, value: Some(json_value_to_any_value(value)), }); } } + + /// Consume the builder and return the finished [`LogRecord`]. + pub(crate) fn finish(self) -> LogRecord { + self.lr + } } -/// Convert a millisecond timestamp to nanoseconds, falling back on overflow or -/// negative values. -fn timestamp_ms_to_ns(timestamp_ms: i64, fallback_ns: u64) -> u64 { +// ── Free helper functions ──────────────────────────────────────────────────────────────────────── + +/// Convert a millisecond timestamp to nanoseconds, falling back on overflow or negative values. +pub(crate) fn timestamp_ms_to_ns(timestamp_ms: i64, fallback_ns: u64) -> u64 { timestamp_ms .checked_mul(1_000_000) .filter(|&ns| ns >= 0) @@ -275,25 +221,28 @@ fn timestamp_ms_to_ns(timestamp_ms: i64, fallback_ns: u64) -> u64 { .unwrap_or(fallback_ns) } -/// Hex decode utilities -fn decode_hex(s: &str) -> Result, String> { +/// Hex-decode a string, returning an error message on failure. +pub(crate) fn decode_hex(s: &str) -> Result, String> { hex::decode(s).map_err(|e| format!("Hex decode error: {}", e)) } -/// Parse a timestamp value (supports RFC3339 strings and Unix timestamps) -fn parse_timestamp(value: &JsonValue) -> Option { +/// Parse a timestamp value (supports RFC 3339 strings and numeric Unix timestamps). +/// +/// Numbers ≤ 1 × 10¹² are treated as seconds; larger numbers as milliseconds. +/// Returns the timestamp as nanoseconds, or `None` on overflow / invalid input. +pub(crate) fn parse_timestamp(value: &JsonValue) -> Option { match value { JsonValue::Number(n) => { - // Try as Unix timestamp (seconds or milliseconds) if let Some(ts_float) = n.as_f64() { - // Heuristic: if > 1e12, assume milliseconds, otherwise seconds if ts_float > 1e12 { + // Milliseconds → nanoseconds let millis = ts_float * 1_000.0; if millis < 0.0 || millis > u64::MAX as f64 { return None; } Some(millis as u64) } else { + // Seconds → nanoseconds let nanos = ts_float * 1_000_000_000.0; if nanos < 0.0 || nanos > u64::MAX as f64 { return None; @@ -304,14 +253,9 @@ fn parse_timestamp(value: &JsonValue) -> Option { None } } - JsonValue::String(s) => { - // Try as RFC3339 - if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(s) { - Some(dt.timestamp_nanos_opt().unwrap_or(0) as u64) - } else { - None - } - } + JsonValue::String(s) => chrono::DateTime::parse_from_rfc3339(s) + .ok() + .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64), _ => None, } } @@ -361,41 +305,30 @@ fn get_level_regexes() -> &'static [(&'static str, SeverityNumber, &'static str, }) } -/// Map log level string to OpenTelemetry severity number and text using regex +/// Map a log level string to an OpenTelemetry `SeverityNumber` and canonical text. +/// +/// Returns `None` if the string does not match any known level pattern. pub fn map_log_level(level: &str) -> Option<(SeverityNumber, String)> { let level_lower = level.to_lowercase(); - - // Try to match against known patterns for (_name, severity_number, severity_text, regex) in get_level_regexes() { if regex.is_match(&level_lower) { return Some((*severity_number, severity_text.to_string())); } } - - // No match found None } +// ── Tests ──────────────────────────────────────────────────────────────────────────────────────── + #[cfg(test)] mod tests { use super::*; - use aws_lambda_events::cloudwatch_logs::LogEntry; use opentelemetry_proto::tonic::logs::v1::SeverityNumber; - /// Test utility: Create a LogEntry from a message string - fn create_log_entry(message: &str) -> LogEntry { - let mut log_entry = LogEntry::default(); - log_entry.id = "test-id".to_string(); - log_entry.timestamp = 1000; - log_entry.message = message.to_string(); - log_entry - } - - /// Test utility: Parse a log message and return the LogRecord - fn parse_log_msg(message: &str, platform: LogPlatform, parser_type: ParserType) -> LogRecord { - let log_entry = create_log_entry(message); - let parser = RecordParser::new(platform, parser_type, None); - parser.parse(123456789, log_entry.into()) + fn build_from_json(json_msg: &str, platform: LogPlatform) -> LogRecord { + let map: serde_json::Map = + serde_json::from_str(json_msg).expect("valid JSON object"); + LogBuilder::new(platform).parse_from_map(123_456_789, 1000, map) } #[test] @@ -409,15 +342,13 @@ mod tests { "user": "john", "action": "login" }"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown, ParserType::Json); + let lr = build_from_json(json_msg, LogPlatform::Unknown); - // Check severity - assert_eq!(log_record.severity_number, SeverityNumber::Info as i32); - assert_eq!(log_record.severity_text, "INFO"); + assert_eq!(lr.severity_number, SeverityNumber::Info as i32); + assert_eq!(lr.severity_text, "INFO"); - // Check body - assert!(log_record.body.is_some()); - if let Some(body) = &log_record.body { + assert!(lr.body.is_some()); + if let Some(body) = &lr.body { if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = &body.value { @@ -427,46 +358,32 @@ mod tests { } } - // Check timestamp - assert!(log_record.time_unix_nano > 0); - - // Check trace and span IDs - assert_eq!(log_record.trace_id.len(), 16); - assert_eq!(log_record.span_id.len(), 8); - - // Check attributes - assert!(log_record.attributes.iter().any(|kv| kv.key == "user")); - assert!(log_record.attributes.iter().any(|kv| kv.key == "action")); + assert!(lr.time_unix_nano > 0); + assert_eq!(lr.trace_id.len(), 16); + assert_eq!(lr.span_id.len(), 8); + assert!(lr.attributes.iter().any(|kv| kv.key == "user")); + assert!(lr.attributes.iter().any(|kv| kv.key == "action")); } #[test] - fn test_parse_keyvalue_record() { - let kv_msg = - r#"time="2025-12-24T19:48:32Z" level=info msg="access granted" user=john action=login"#; - let log_record = parse_log_msg(kv_msg, LogPlatform::Eks, ParserType::KeyValue); - - // Check severity - assert_eq!(log_record.severity_number, SeverityNumber::Info as i32); - assert_eq!(log_record.severity_text, "INFO"); - - // Check body - assert!(log_record.body.is_some()); - if let Some(body) = &log_record.body { + fn test_parse_cloudtrail_body_synthesis() { + let json_msg = r#"{ + "eventType": "AwsApiCall", + "eventName": "AssumeRole", + "eventSource": "sts.amazonaws.com" + }"#; + let lr = build_from_json(json_msg, LogPlatform::Cloudtrail); + + assert!(lr.body.is_some()); + if let Some(body) = &lr.body { if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = &body.value { - assert_eq!(s, "access granted"); + assert_eq!(s, "AwsApiCall::AssumeRole"); } else { panic!("Expected StringValue in body"); } } - - // Check timestamp was parsed - assert!(log_record.time_unix_nano > 0); - - // Check attributes - assert!(log_record.attributes.iter().any(|kv| kv.key == "user")); - assert!(log_record.attributes.iter().any(|kv| kv.key == "action")); } #[test] @@ -494,11 +411,10 @@ mod tests { }, "recipientAccountId": "123456789012" }"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Cloudtrail, ParserType::Json); + let lr = build_from_json(json_msg, LogPlatform::Cloudtrail); - // Check body is set to eventType::eventName - assert!(log_record.body.is_some()); - if let Some(body) = &log_record.body { + assert!(lr.body.is_some()); + if let Some(body) = &lr.body { if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = &body.value { @@ -508,44 +424,28 @@ mod tests { } } - // Find the responseElements attribute - let response_elements_attr = log_record - .attributes - .iter() - .find(|kv| kv.key == "responseElements"); + let response_elements_attr = lr.attributes.iter().find(|kv| kv.key == "responseElements"); assert!( response_elements_attr.is_some(), "responseElements should be present" ); - // Verify that sensitive credentials were stripped if let Some(attr) = response_elements_attr { if let Some(any_value) = &attr.value { if let Some( opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(json_str), ) = &any_value.value { - // The responseElements is stored as a JSON string in attributes - // Parse it to verify credentials were stripped let response_obj: serde_json::Value = serde_json::from_str(json_str).expect("Should parse JSON"); if let Some(credentials) = response_obj.get("credentials").and_then(|v| v.as_object()) { - assert!( - !credentials.contains_key("accessKeyId"), - "accessKeyId should be stripped from credentials" - ); - assert!( - !credentials.contains_key("secretAccessKey"), - "secretAccessKey should be stripped from credentials" - ); assert!( !credentials.contains_key("sessionToken"), - "sessionToken should be stripped from credentials" + "sessionToken should be stripped" ); - // expiration is not in the strip list, so it should remain assert!( credentials.contains_key("expiration"), "expiration should remain" @@ -553,107 +453,90 @@ mod tests { } else { panic!("credentials object not found in responseElements"); } - - // Verify assumedRoleUser is still present - assert!( - response_obj.get("assumedRoleUser").is_some(), - "assumedRoleUser should still be present" - ); + assert!(response_obj.get("assumedRoleUser").is_some()); } } } - // Verify other fields are present as attributes - assert!( - log_record - .attributes - .iter() - .any(|kv| kv.key == "eventSource") - ); - assert!(log_record.attributes.iter().any(|kv| kv.key == "awsRegion")); + assert!(lr.attributes.iter().any(|kv| kv.key == "eventSource")); + assert!(lr.attributes.iter().any(|kv| kv.key == "awsRegion")); } #[test] - fn test_parse_unknown_type_json_autodetect() { - let json_msg = r#"{"msg":"test message","level":"debug"}"#; - let log_record = parse_log_msg(json_msg, LogPlatform::Unknown, ParserType::Unknown); - - // Should auto-detect as JSON and parse correctly - assert_eq!(log_record.severity_number, SeverityNumber::Debug as i32); - assert!(log_record.body.is_some()); + fn test_start_sets_timestamps_and_initial_attributes() { + let builder = LogBuilder::new(LogPlatform::Unknown); + let initial_attrs = vec![KeyValue { + key: "source".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("test".to_string())), + }), + }]; + let lr = builder.start(999, 500, initial_attrs).finish(); + + assert_eq!(lr.observed_time_unix_nano, 999); + assert_eq!(lr.time_unix_nano, 500_000_000); // 500ms → ns + assert!(lr.attributes.iter().any(|kv| kv.key == "source")); } #[test] - fn test_parse_unknown_type_plain_text() { - let plain_msg = "This is just a plain text message"; - let log_record = parse_log_msg(plain_msg, LogPlatform::Unknown, ParserType::Unknown); - - // Should treat as plain text - assert!(log_record.body.is_some()); - if let Some(body) = &log_record.body { + fn test_set_body_text() { + let builder = LogBuilder::new(LogPlatform::Unknown); + let lr = builder + .start(0, 0, vec![]) + .set_body_text("hello world".to_string()) + .finish(); + + if let Some(body) = &lr.body { if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = &body.value { - assert_eq!(s, plain_msg); + assert_eq!(s, "hello world"); } else { - panic!("Expected StringValue in body"); + panic!("expected StringValue body"); } + } else { + panic!("body should be set"); } } #[test] fn test_parse_timestamp_seconds() { - // A normal Unix timestamp in seconds (< 1e12) should be converted to nanoseconds let value = JsonValue::Number(serde_json::Number::from_f64(1704110400.0).unwrap()); - let result = parse_timestamp(&value); - assert_eq!(result, Some(1704110400_000_000_000u64)); + assert_eq!(parse_timestamp(&value), Some(1704110400_000_000_000u64)); } #[test] fn test_parse_timestamp_milliseconds() { - // A Unix timestamp in milliseconds (> 1e12) should be converted to nanoseconds let value = JsonValue::Number(serde_json::Number::from_f64(1704110400_000.0).unwrap()); - let result = parse_timestamp(&value); - assert_eq!(result, Some(1704110400_000_000u64)); + assert_eq!(parse_timestamp(&value), Some(1704110400_000_000u64)); } #[test] fn test_parse_timestamp_seconds_overflow() { - // A seconds value so large that multiplying by 1_000_000_000 overflows u64 - // u64::MAX is ~1.84e19, so 1e11 * 1e9 = 1e20 overflows let value = JsonValue::Number(serde_json::Number::from_f64(1e11).unwrap()); - let result = parse_timestamp(&value); - assert_eq!(result, None); + assert_eq!(parse_timestamp(&value), None); } #[test] fn test_parse_timestamp_milliseconds_overflow() { - // A milliseconds value so large that multiplying by 1_000 overflows u64 - // u64::MAX is ~1.84e19, so 1e19 * 1e3 = 1e22 overflows let value = JsonValue::Number(serde_json::Number::from_f64(1e19).unwrap()); - let result = parse_timestamp(&value); - assert_eq!(result, None); + assert_eq!(parse_timestamp(&value), None); } #[test] fn test_parse_timestamp_negative_seconds() { - // Negative timestamps should return None let value = JsonValue::Number(serde_json::Number::from_f64(-1.0).unwrap()); - let result = parse_timestamp(&value); - assert_eq!(result, None); + assert_eq!(parse_timestamp(&value), None); } #[test] fn test_parse_timestamp_negative_milliseconds() { - // Negative millisecond timestamps (< -1e12) should return None let value = JsonValue::Number(serde_json::Number::from_f64(-2e12).unwrap()); - let result = parse_timestamp(&value); - assert_eq!(result, None); + assert_eq!(parse_timestamp(&value), None); } #[test] fn test_parse_timestamp_rfc3339_string() { - // A valid RFC3339 string should be parsed correctly let value = JsonValue::String("2024-01-01T12:00:00Z".to_string()); let result = parse_timestamp(&value); assert!(result.is_some()); @@ -662,17 +545,13 @@ mod tests { #[test] fn test_parse_timestamp_invalid_string() { - // An invalid string should return None let value = JsonValue::String("not-a-timestamp".to_string()); - let result = parse_timestamp(&value); - assert_eq!(result, None); + assert_eq!(parse_timestamp(&value), None); } #[test] fn test_parse_timestamp_non_number() { - // A boolean JSON value should return None let value = JsonValue::Bool(true); - let result = parse_timestamp(&value); - assert_eq!(result, None); + assert_eq!(parse_timestamp(&value), None); } } diff --git a/src/parse/utils.rs b/src/parse/utils.rs new file mode 100644 index 0000000..d627fe9 --- /dev/null +++ b/src/parse/utils.rs @@ -0,0 +1,35 @@ +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value::Value}; + +/// Construct a [`KeyValue`] with a [`Value::StringValue`] from a key and any value +/// that can be converted into a [`String`]. +/// +/// # Example +/// +/// ```ignore +/// let kv = string_kv("cloud.provider", "aws"); +/// let kv = string_kv("http.status_code", 200.to_string()); +/// ``` +pub(crate) fn string_kv(key: &str, value: impl Into) -> KeyValue { + KeyValue { + key: key.to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue(value.into())), + }), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_string_kv_str_value() { + let kv = string_kv("cloud.provider", "aws"); + assert_eq!(kv.key, "cloud.provider"); + let inner = kv.value.expect("value should be Some"); + match inner.value { + Some(Value::StringValue(s)) => assert_eq!(s, "aws"), + other => panic!("expected StringValue, got {:?}", other), + } + } +} diff --git a/src/parse/vpclog.rs b/src/parse/vpclog.rs index c607e68..5dfcff5 100644 --- a/src/parse/vpclog.rs +++ b/src/parse/vpclog.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use serde_json::Value as JsonValue; use crate::flowlogs::{ParsedFieldType, ParsedFields}; -use crate::parse::{cwlogs::ParserError, record_parser::RecordParserError}; +use crate::parse::{platform::ParserError, record_parser::RecordParserError}; /// Parse an EC2 Flow Log record from a string using pre-parsed field names /// Fields with value "-" are excluded from the result @@ -90,9 +90,10 @@ fn parse_vpclog_fields(input: &str) -> Vec { #[cfg(test)] mod tests { use super::*; + use crate::cwlogs::ParserType; + use crate::cwlogs::record_parser::RecordParser; use crate::flowlogs::parse_log_format; - use crate::parse::cwlogs::{LogPlatform, ParserType}; - use crate::parse::record_parser::RecordParser; + use crate::parse::platform::LogPlatform; use aws_lambda_events::cloudwatch_logs::LogEntry; use opentelemetry_proto::tonic::{common::v1::any_value::Value, logs::v1::LogRecord}; diff --git a/src/s3logs/mod.rs b/src/s3logs/mod.rs index ccd641d..7d93dcc 100644 --- a/src/s3logs/mod.rs +++ b/src/s3logs/mod.rs @@ -171,6 +171,12 @@ impl Parser { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ParserType { + Json, + Unknown, +} + /// Errors that can occur during S3 log parsing #[derive(Debug, thiserror::Error)] pub enum ParserError { diff --git a/src/s3logs/s3record.rs b/src/s3logs/s3record.rs index fb17cad..ca7bff3 100644 --- a/src/s3logs/s3record.rs +++ b/src/s3logs/s3record.rs @@ -1,19 +1,24 @@ use std::io::Read; +use serde_json::Value as JsonValue; +use tracing::warn; + use aws_lambda_events::s3::S3EventRecord; use aws_sdk_s3::Client as S3Client; use chrono::{DateTime, Utc}; use flate2::read::GzDecoder; use opentelemetry_proto::tonic::{ - common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value::Value}, + common::v1::{InstrumentationScope, KeyValue}, logs::v1::{ResourceLogs, ScopeLogs}, resource::v1::Resource, }; use tracing::debug; -use crate::aws_attributes::AwsAttributes; -use crate::parse::cwlogs::{LogPlatform, ParserType}; -use crate::parse::record_parser::{RecordLogEntry, RecordParser}; +use crate::parse::json::parse_json_to_map; +use crate::parse::platform::LogPlatform; +use crate::parse::record_parser::LogBuilder; +use crate::parse::utils::string_kv; +use crate::{aws_attributes::AwsAttributes, s3logs::ParserType}; use super::{JsonLogRecords, ParserError}; @@ -241,8 +246,7 @@ fn parse_log_lines( // Create base resource attributes let base_attributes = create_base_attributes(bucket, key, aws_attributes, log_platform); - // Parse lines into log records using RecordParser - let rec_parser = RecordParser::new(log_platform, parser_type, None); + let builder = LogBuilder::new(log_platform); let event_timestamp = event_time.timestamp_millis() as i64; @@ -252,9 +256,13 @@ fn parse_log_lines( let lines_count = lines.len(); for line in lines.into_iter() { - let log_entry = RecordLogEntry::new(None, event_timestamp, String::from(line)); - - let log_record = rec_parser.parse(now_nanos, log_entry); + let log_record = parse_line( + &builder, + now_nanos, + event_timestamp, + String::from(line), + parser_type, + ); current_batch.push(log_record); // Create a ResourceLogs when we hit the batch size @@ -285,6 +293,51 @@ fn parse_log_lines( Ok(resource_logs_list) } +/// Parse a single log line string into a [`LogRecord`]. +/// +/// Dispatches based on `parser_type`: +/// * `Json` — parse as a JSON object; on failure the raw string becomes the body. +/// * `Unknown` — auto-detect: attempt JSON for `{`-prefixed lines, otherwise plain text. +/// * Any other type — treat as plain text (S3 log detection never produces `VpcLog` or +/// `KeyValue`). +fn parse_line( + builder: &LogBuilder, + now_nanos: u64, + timestamp_ms: i64, + line: String, + parser_type: ParserType, +) -> opentelemetry_proto::tonic::logs::v1::LogRecord { + let mut record_builder = builder.start(now_nanos, timestamp_ms, vec![]); + + let map_result: Result>, _> = match parser_type { + ParserType::Json => parse_json_to_map(line.clone()) + .map(Some) + .map_err(|e| (e, line.clone())), + ParserType::Unknown => { + if line.len() > 2 && line.starts_with('{') { + parse_json_to_map(line.clone()) + .map(Some) + .map_err(|e| (e, line.clone())) + } else { + return record_builder.set_body_text(line).finish(); + } + } + }; + + match map_result { + Ok(None) => {} + Ok(Some(map)) => { + record_builder.populate_from_map(map); + } + Err((err, raw)) => { + warn!(error = ?err, "Failed to parse log line, using raw text as body"); + return record_builder.set_body_text(raw).finish(); + } + } + + record_builder.finish() +} + /// Parse a JSON blob containing a Records array fn parse_json_blob( json_blob: JsonLogRecords, @@ -312,8 +365,7 @@ fn parse_json_blob( "Detected log platform for JSON blob" ); - // Create RecordParser with Json parser type and detected platform - let rec_parser = RecordParser::new(log_platform, ParserType::Json, None); + let builder = LogBuilder::new(log_platform); let event_timestamp = event_time.timestamp_millis() as i64; @@ -323,8 +375,7 @@ fn parse_json_blob( let records_count = json_blob.len(); for record_map in json_blob.records { - // Parse each record map directly using the new parse_from_map method - let log_record = rec_parser.parse_from_map(now_nanos, event_timestamp, record_map); + let log_record = builder.parse_from_map(now_nanos, event_timestamp, record_map); current_batch.push(log_record); // Create a ResourceLogs when we hit the batch size @@ -362,46 +413,16 @@ fn create_base_attributes( log_platform: LogPlatform, ) -> Vec { let mut attributes = vec![ - KeyValue { - key: "cloud.provider".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue("aws".to_string())), - }), - }, - KeyValue { - key: "cloud.region".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(aws_attributes.region.clone())), - }), - }, - KeyValue { - key: "cloud.account.id".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(aws_attributes.account_id.clone())), - }), - }, - KeyValue { - key: "aws.s3.bucket".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(bucket.to_string())), - }), - }, - KeyValue { - key: "aws.s3.key".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(key.to_string())), - }), - }, + string_kv("cloud.provider", "aws"), + string_kv("cloud.region", aws_attributes.region.clone()), + string_kv("cloud.account.id", aws_attributes.account_id.clone()), + string_kv("aws.s3.bucket", bucket), + string_kv("aws.s3.key", key), ]; // Add cloud.platform attribute based on detected platform if log_platform != LogPlatform::Unknown { - attributes.push(KeyValue { - key: "cloud.platform".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue(log_platform.as_str().to_string())), - }), - }); + attributes.push(string_kv("cloud.platform", log_platform.as_str())); } attributes @@ -421,20 +442,16 @@ fn create_resource_logs( }), scope_logs: vec![ScopeLogs { scope: Some(InstrumentationScope { - name: "rotel-lambda-forwarder".to_string(), + name: env!("CARGO_PKG_NAME").to_string(), version: env!("CARGO_PKG_VERSION").to_string(), - attributes: vec![KeyValue { - key: "aws.lambda.invoked_arn".to_string(), - value: Some(AnyValue { - value: Some(Value::StringValue( - aws_attributes.invoked_function_arn.clone(), - )), - }), - }], + attributes: vec![string_kv( + "aws.lambda.invoked_arn", + aws_attributes.invoked_function_arn.clone(), + )], dropped_attributes_count: 0, }), log_records, - schema_url: "".to_string(), + schema_url: String::new(), }], schema_url: String::new(), } @@ -497,6 +514,7 @@ fn detect_log_platform_from_key(key: &str) -> LogPlatform { #[cfg(test)] mod tests { use super::*; + use opentelemetry_proto::tonic::logs::v1::SeverityNumber; #[test] fn test_detect_log_format_json_from_suffix() { @@ -664,6 +682,70 @@ mod tests { ); } + #[test] + fn test_parse_line_json() { + let builder = LogBuilder::new(LogPlatform::Unknown); + let lr = parse_line( + &builder, + 123_456_789, + 1000, + r#"{"level":"info","msg":"hello"}"#.to_string(), + ParserType::Json, + ); + assert_eq!(lr.severity_number, SeverityNumber::Info as i32); + assert!(lr.body.is_some()); + } + + #[test] + fn test_parse_line_unknown_auto_detects_json() { + let builder = LogBuilder::new(LogPlatform::Unknown); + let lr = parse_line( + &builder, + 123_456_789, + 1000, + r#"{"level":"debug","msg":"auto"}"#.to_string(), + ParserType::Unknown, + ); + assert_eq!(lr.severity_number, SeverityNumber::Debug as i32); + assert!(lr.body.is_some()); + } + + #[test] + fn test_parse_line_plain_text_fallback() { + let builder = LogBuilder::new(LogPlatform::Unknown); + let msg = "just plain text"; + let lr = parse_line(&builder, 0, 0, msg.to_string(), ParserType::Unknown); + if let Some(body) = &lr.body { + if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = + &body.value + { + assert_eq!(s, msg); + } else { + panic!("expected string body"); + } + } else { + panic!("body should be set"); + } + } + + #[test] + fn test_parse_line_invalid_json_falls_back_to_plain_text() { + let builder = LogBuilder::new(LogPlatform::Unknown); + let bad = r#"{ not valid json }"#; + let lr = parse_line(&builder, 0, 0, bad.to_string(), ParserType::Json); + if let Some(body) = &lr.body { + if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = + &body.value + { + assert_eq!(s, bad); + } else { + panic!("expected string body on parse failure"); + } + } else { + panic!("body should be set on parse failure"); + } + } + #[tokio::test] async fn test_parse_real_cloudtrail_file() { // Test with real CloudTrail file structure