diff --git a/README.md b/README.md index 337e00f..e566795 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ be expanded as we verify support for additional services. | CloudTrail Logs | CloudWatch, S3 | | EKS Control Plane Logs | CloudWatch | | Lambda Logs | CloudWatch | -| VPC Flow Logs | CloudWatch | +| VPC Flow Logs | CloudWatch, S3 | ## Deploying @@ -569,7 +569,7 @@ from. We plan to make this configurable in the future. ## VPC Flow Logs -The forwarder includes comprehensive support for AWS VPC Flow Logs with the following features: +The forwarder includes support for ingesting AWS VPC Flow Logs written to CloudWatch or S3. ### Automatic Format Detection diff --git a/src/cwlogs/mod.rs b/src/cwlogs/mod.rs index 9369b81..b1d6f4f 100644 --- a/src/cwlogs/mod.rs +++ b/src/cwlogs/mod.rs @@ -27,7 +27,7 @@ pub struct Parser<'a> { aws_attributes: &'a AwsAttributes, request_id: &'a String, tag_manager: &'a mut TagManager, - flow_log_manager: &'a mut FlowLogManager, + flow_log_manager: &'a FlowLogManager, } impl<'a> Parser<'a> { @@ -35,7 +35,7 @@ impl<'a> Parser<'a> { aws_attributes: &'a AwsAttributes, request_id: &'a String, tag_manager: &'a mut TagManager, - flow_log_manager: &'a mut FlowLogManager, + flow_log_manager: &'a FlowLogManager, ) -> Self { Self { aws_attributes, @@ -192,7 +192,7 @@ impl<'a> Parser<'a> { /// Detects the log platform and parser type based on log group and stream names. /// Returns (platform, parser_type, optional_flow_log_parsed_fields, flow_log_tags) async fn detect_log_type( - &mut self, + &self, log_group_name: &str, log_stream_name: &str, ) -> ( @@ -257,13 +257,13 @@ mod tests { let mut tag_manager = crate::tags::TagManager::new(cw_client, None, None); let ec2_client = aws_sdk_ec2::Client::new(&config); - let mut flow_log_manager = crate::flowlogs::FlowLogManager::new(ec2_client, None, None); + let flow_log_manager = crate::flowlogs::FlowLogManager::new(ec2_client, None, None); let mut parser = Parser::new( &aws_attributes, &request_id, &mut tag_manager, - &mut flow_log_manager, + &flow_log_manager, ); let result = parser.parse(logs_event).await; diff --git a/src/cwlogs/record_parser.rs b/src/cwlogs/record_parser.rs index f76e97a..dcd5002 100644 --- a/src/cwlogs/record_parser.rs +++ b/src/cwlogs/record_parser.rs @@ -67,16 +67,11 @@ impl RecordParser { /// /// On parse failure the raw message is preserved as the log body. pub(crate) fn parse(&self, now_nanos: u64, log_entry: LogEntry) -> LogRecord { - // Seed the record with the CW timestamp and, when non-empty, the entry ID. - let initial_attributes = if !log_entry.id.is_empty() { - vec![string_kv("cloudwatch.id", log_entry.id)] - } else { - vec![] - }; - - let mut record_builder = - self.builder - .start(now_nanos, log_entry.timestamp, initial_attributes); + let mut record_builder = self.builder.start( + now_nanos, + log_entry.timestamp, + vec![string_kv("cloudwatch.id", log_entry.id)], + ); match self.parse_message(log_entry.message) { ParsedMessage::Map(map) => { @@ -101,12 +96,12 @@ impl RecordParser { /// should do next — no mutation of external state as a side-effect. fn parse_message(&self, message: String) -> ParsedMessage { match self.parser_type { - ParserType::Json => match parse_json_to_map(message) { + ParserType::Json => match parse_json_to_map(&message) { Ok(map) => ParsedMessage::Map(map), Err(e) => ParsedMessage::Error(e), }, - ParserType::KeyValue => match parse_keyvalue_to_map(message) { + ParserType::KeyValue => match parse_keyvalue_to_map(&message) { Ok(map) => ParsedMessage::Map(map), Err(e) => ParsedMessage::Error(e), }, @@ -117,7 +112,7 @@ impl RecordParser { // individual flow-log fields are emitted as attributes. match self.flow_log_parsed_fields.as_ref() { Some(parsed_fields) => { - match parse_vpclog_to_map(message.clone(), parsed_fields.clone()) { + match parse_vpclog_to_map(&message, parsed_fields.clone()) { Ok(map) => ParsedMessage::PlainText(message, Some(map)), Err(e) => ParsedMessage::Error(e), } @@ -131,7 +126,7 @@ impl RecordParser { // Auto-detect: attempt JSON for messages that look like objects; // otherwise treat as opaque plain text. if message.len() > 2 && message.starts_with('{') { - match parse_json_to_map(message) { + match parse_json_to_map(&message) { Ok(map) => ParsedMessage::Map(map), Err(e) => ParsedMessage::Error(e), } @@ -189,16 +184,6 @@ mod tests { } } - #[test] - fn test_no_cloudwatch_id_when_empty() { - let parser = RecordParser::new(LogPlatform::Unknown, ParserType::Json, None); - let lr = parser.parse(123_456_789, make_entry("", 1000, r#"{"msg":"hi"}"#)); - assert!( - !lr.attributes.iter().any(|kv| kv.key == "cloudwatch.id"), - "cloudwatch.id should be absent when id is empty" - ); - } - #[test] fn test_timestamp_set_from_entry() { // 1000 ms → 1_000_000_000 ns diff --git a/src/flowlogs/cache.rs b/src/flowlogs/cache.rs index ede9712..d80c412 100644 --- a/src/flowlogs/cache.rs +++ b/src/flowlogs/cache.rs @@ -75,7 +75,7 @@ pub enum ParsedFields { Error(String), } -/// Flow log configuration for a specific log group +/// Flow log configuration for a specific destination (log group or S3 bucket) #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FlowLogConfig { /// The log format string (e.g., "${version} ${account-id} ${interface-id} ...") @@ -83,22 +83,34 @@ pub struct FlowLogConfig { pub flow_log_id: String, pub tags: std::collections::HashMap, + /// Optional S3 folder prefix from the flow log destination ARN + /// (e.g., "vpc-a/" from + /// "arn:aws:s3:::vpc-a/"). + /// `None` for CloudWatch destinations or S3 destinations with no prefix. + #[serde(default)] + pub folder_prefix: Option, + /// Parsed field names from the log format (lazily computed, not serialized) #[serde(skip)] pub parsed_fields: Option>, // Use an Arc to reduce clone costs } -/// In-memory cache for flow log configurations +/// In-memory cache for flow log configurations, partitioned by destination type. +/// +/// Two independent look-up maps are maintained: +/// - `by_log_group`: CloudWatch Logs destinations, keyed by log group name. +/// - `by_bucket`: S3 destinations, keyed by bucket name. Each bucket may have +/// multiple flow log configurations with different folder prefixes. /// -/// Cache is refreshed when configurations are fetched from EC2 API. -/// Reading from the cache does not extend the TTL - the cache expires 30 minutes -/// after it was last refreshed from the API. +/// Both maps share a single TTL timestamp. The cache expires 30 minutes after it +/// was last refreshed from the EC2 API — reading does not extend the TTL. /// /// Since all flow logs are queried together with a single DescribeFlowLogs call, -/// the last_seen timestamp applies to the entire cache, not individual entries. +/// the last_refreshed timestamp applies to the entire cache, not individual entries. #[derive(Debug, Clone)] pub struct FlowLogCache { - inner: HashMap, + by_log_group: HashMap, + by_bucket: HashMap>, /// Unix timestamp in seconds when the cache was last refreshed from EC2 API last_refreshed_secs: u64, } @@ -106,51 +118,115 @@ pub struct FlowLogCache { impl FlowLogCache { pub fn new() -> Self { Self { - inner: HashMap::new(), + by_log_group: HashMap::new(), + by_bucket: HashMap::new(), last_refreshed_secs: 0, } } - /// Get flow log configuration for a log group if it exists and cache is not expired - /// Returns None if not found or cache is expired - /// - /// Note: This does not update the timestamp - cache expires 30 minutes - /// after it was last refreshed from the API. - pub fn get(&self, log_group: &str) -> Option { + // ----------------------------------------------------------------------- + // CloudWatch look-ups (keyed by log group name) + // ----------------------------------------------------------------------- + + /// Get a clone of the flow log configuration for a CloudWatch log group. + /// Returns `None` if not found or the cache is expired. + pub fn get_by_log_group(&self, log_group: &str) -> Option { if self.is_expired() { debug!("Cache expired"); return None; } - if let Some(config) = self.inner.get(log_group) { - trace!(log_group = %log_group, "Cache hit"); + if let Some(config) = self.by_log_group.get(log_group) { + trace!(log_group = %log_group, "Cache hit (by_log_group)"); Some(config.clone()) } else { - trace!(log_group = %log_group, "Cache miss"); + trace!(log_group = %log_group, "Cache miss (by_log_group)"); None } } - /// Get mutable reference to flow log configuration for a log group - /// Returns None if not found or cache is expired + /// Write back lazily-computed `parsed_fields` for a CloudWatch log group entry. /// - /// This is used for lazy initialization of parsed fields. - pub fn get_mut(&mut self, log_group: &str) -> Option<&mut FlowLogConfig> { + /// This is the counterpart to [`get_by_log_group`]: callers that compute + /// `parsed_fields` after a cache read can persist the result here so that + /// subsequent reads avoid re-parsing. No-ops if the entry no longer exists. + pub fn set_parsed_fields_by_log_group( + &mut self, + log_group: &str, + parsed_fields: Arc, + ) { + if let Some(config) = self.by_log_group.get_mut(log_group) { + config.parsed_fields = Some(parsed_fields); + } + } + + /// Insert or update a CloudWatch flow log configuration. + pub fn insert_by_log_group(&mut self, log_group: String, config: FlowLogConfig) { + debug!( + log_group = %log_group, + flow_log_id = %config.flow_log_id, + "Inserting flow log config into cache (by_log_group)" + ); + self.by_log_group.insert(log_group, config); + } + + // ----------------------------------------------------------------------- + // S3 look-ups (keyed by bucket name + object key prefix) + // ----------------------------------------------------------------------- + + /// Get a clone of the flow log configuration for an S3 object. + /// + /// Because multiple flow logs may share the same bucket (differentiated by a + /// folder prefix in their destination ARN), this method accepts the full S3 + /// object key and returns the first `FlowLogConfig` whose `folder_prefix` + /// matches the start of that key. A config with `folder_prefix: None` acts + /// as a catch-all and matches any key. + /// + /// Returns `None` if the cache is expired or no matching config is found. + pub fn get_by_bucket(&self, bucket: &str, object_key: &str) -> Option { if self.is_expired() { debug!("Cache expired"); return None; } - if let Some(config) = self.inner.get_mut(log_group) { - trace!(log_group = %log_group, "Cache hit (mutable)"); - Some(config) + if let Some(configs) = self.by_bucket.get(bucket) { + let matched = configs.iter().find(|c| match &c.folder_prefix { + Some(prefix) => object_key.starts_with(prefix.as_str()), + None => true, + }); + if let Some(config) = matched { + trace!(bucket = %bucket, object_key = %object_key, "Cache hit (by_bucket)"); + Some(config.clone()) + } else { + trace!(bucket = %bucket, object_key = %object_key, "Cache miss (by_bucket) — no prefix match"); + None + } } else { - trace!(log_group = %log_group, "Cache miss"); + trace!(bucket = %bucket, "Cache miss (by_bucket)"); None } } - /// Check if the entire cache is expired (older than 30 minutes) + /// Append an S3 flow log configuration for a bucket. + /// + /// Multiple configs can exist for the same bucket, each with a different + /// `folder_prefix`. Configs are stored in insertion order; the first + /// matching prefix wins during lookup. + pub fn insert_by_bucket(&mut self, bucket: String, config: FlowLogConfig) { + debug!( + bucket = %bucket, + flow_log_id = %config.flow_log_id, + folder_prefix = ?config.folder_prefix, + "Inserting flow log config into cache (by_bucket)" + ); + self.by_bucket.entry(bucket).or_default().push(config); + } + + // ----------------------------------------------------------------------- + // TTL / lifecycle helpers + // ----------------------------------------------------------------------- + + /// Check if the entire cache is expired (older than 30 minutes). pub fn is_expired(&self) -> bool { if self.last_refreshed_secs == 0 { return true; // Never been refreshed @@ -165,18 +241,10 @@ impl FlowLogCache { age_secs > MAX_CACHE_AGE_SECS } - /// Insert or update flow log configuration for a log group - /// - /// Should only be called when configuration is freshly fetched from EC2 API. - pub fn insert(&mut self, log_group: String, config: FlowLogConfig) { - debug!(log_group = %log_group, flow_log_id = %config.flow_log_id, "Inserting flow log config into cache"); - self.inner.insert(log_group, config); - } - - /// Mark the cache as refreshed with the current timestamp + /// Mark the cache as refreshed with the current timestamp. /// - /// This should be called after successfully fetching flow logs from EC2 API. - /// It resets the 30-minute TTL for the entire cache. + /// Should be called after successfully fetching flow logs from the EC2 API. + /// Resets the 30-minute TTL for the entire cache. pub fn mark_refreshed(&mut self) { self.last_refreshed_secs = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -188,58 +256,72 @@ impl FlowLogCache { ); } - /// Get a snapshot of the current cache with timestamp + /// Get a serialisable snapshot of the current cache contents and timestamp. pub fn get_snapshot(&self) -> CacheSnapshot { CacheSnapshot { - flow_logs: self.inner.clone(), + by_log_group: self.by_log_group.clone(), + by_bucket: self.by_bucket.clone(), last_refreshed_secs: self.last_refreshed_secs, } } - /// Load entries from a snapshot (used when restoring from S3) + /// Restore cache contents from a snapshot (used when reloading from S3 persistence). + /// + /// Expired snapshots are silently ignored. pub fn load_snapshot(&mut self, snapshot: CacheSnapshot) { debug!( - entry_count = snapshot.flow_logs.len(), + log_group_count = snapshot.by_log_group.len(), + bucket_count = snapshot.by_bucket.len(), "Loading snapshot into cache" ); if !snapshot.is_expired() { - self.inner = snapshot.flow_logs; + self.by_log_group = snapshot.by_log_group; + self.by_bucket = snapshot.by_bucket; self.last_refreshed_secs = snapshot.last_refreshed_secs; } else { debug!("Snapshot is expired, not loading"); } } - /// Get the number of entries in the cache + /// Total number of cached entries across both destination maps. + /// + /// For the bucket map, counts the total number of individual `FlowLogConfig` + /// entries (summed across all per-bucket `Vec`s), not the number of buckets. pub fn len(&self) -> usize { - self.inner.len() + let bucket_total: usize = self.by_bucket.values().map(|v| v.len()).sum(); + self.by_log_group.len() + bucket_total } - /// Check if the cache is empty + /// Returns `true` if both destination maps are empty. pub fn is_empty(&self) -> bool { - self.inner.is_empty() + self.by_log_group.is_empty() && self.by_bucket.is_empty() } - /// Clear all entries from the cache and reset timestamp + /// Clear all entries from both maps and reset the timestamp. #[cfg(test)] pub fn clear(&mut self) { - self.inner.clear(); + self.by_log_group.clear(); + self.by_bucket.clear(); self.last_refreshed_secs = 0; } } -/// Snapshot of the flow log cache for serialization/deserialization +/// Serialisable snapshot of the flow log cache for persistence (e.g. S3). #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CacheSnapshot { - /// Map of log group names to their flow log configurations - pub flow_logs: HashMap, - /// Unix timestamp in seconds when the cache was last refreshed + /// CloudWatch flow logs: log_group_name → config + #[serde(default)] + pub by_log_group: HashMap, + /// S3 flow logs: bucket_name → list of configs (one per distinct folder prefix) + #[serde(default)] + pub by_bucket: HashMap>, + /// Unix timestamp (seconds) when the cache was last refreshed pub last_refreshed_secs: u64, } impl CacheSnapshot { - /// Check if this snapshot is expired (older than 30 minutes) + /// Check if this snapshot is expired (older than 30 minutes). pub fn is_expired(&self) -> bool { if self.last_refreshed_secs == 0 { return true; @@ -260,31 +342,199 @@ mod tests { use super::*; use std::collections::HashMap; + // ------------------------------------------------------------------ + // CloudWatch (by_log_group) tests + // ------------------------------------------------------------------ + #[test] - fn test_cache_insert_and_get() { + fn test_cache_insert_and_get_by_log_group() { let mut cache = FlowLogCache::new(); let config = FlowLogConfig { log_format: "${version} ${account-id} ${interface-id}".to_string(), flow_log_id: "fl-1234567890abcdef0".to_string(), - tags: std::collections::HashMap::new(), + tags: HashMap::new(), + folder_prefix: None, parsed_fields: None, }; - cache.insert("/aws/ec2/flowlogs".to_string(), config.clone()); + cache.insert_by_log_group("/aws/ec2/flowlogs".to_string(), config.clone()); cache.mark_refreshed(); - let retrieved = cache.get("/aws/ec2/flowlogs"); + let retrieved = cache.get_by_log_group("/aws/ec2/flowlogs"); assert!(retrieved.is_some()); assert_eq!(retrieved.unwrap(), config); } #[test] - fn test_cache_miss() { + fn test_cache_miss_by_log_group() { let cache = FlowLogCache::new(); - let retrieved = cache.get("non-existent"); + let retrieved = cache.get_by_log_group("non-existent"); assert!(retrieved.is_none()); } + // ------------------------------------------------------------------ + // S3 (by_bucket) tests + // ------------------------------------------------------------------ + + #[test] + fn test_cache_insert_and_get_by_bucket() { + let mut cache = FlowLogCache::new(); + let config = FlowLogConfig { + log_format: "${version} ${account-id} ${interface-id}".to_string(), + flow_log_id: "fl-s3-abc123".to_string(), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }; + + cache.insert_by_bucket("my-flow-logs-bucket".to_string(), config.clone()); + cache.mark_refreshed(); + + let retrieved = cache.get_by_bucket("my-flow-logs-bucket", "some/object/key.log.gz"); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap(), config); + } + + #[test] + fn test_cache_miss_by_bucket() { + let cache = FlowLogCache::new(); + let retrieved = cache.get_by_bucket("non-existent-bucket", "some/key.log.gz"); + assert!(retrieved.is_none()); + } + + #[test] + fn test_cache_bucket_prefix_matching() { + let mut cache = FlowLogCache::new(); + + let config_a = FlowLogConfig { + log_format: "${version} ${account-id}".to_string(), + flow_log_id: "fl-s3-prefix-a".to_string(), + tags: HashMap::new(), + folder_prefix: Some("AWSLogs/111111111111/vpcflowlogs/us-east-1/".to_string()), + parsed_fields: None, + }; + let config_b = FlowLogConfig { + log_format: "${version} ${srcaddr}".to_string(), + flow_log_id: "fl-s3-prefix-b".to_string(), + tags: HashMap::new(), + folder_prefix: Some("AWSLogs/222222222222/vpcflowlogs/us-west-2/".to_string()), + parsed_fields: None, + }; + let config_catchall = FlowLogConfig { + log_format: "${version} ${dstaddr}".to_string(), + flow_log_id: "fl-s3-catchall".to_string(), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }; + + cache.insert_by_bucket("shared-bucket".to_string(), config_a.clone()); + cache.insert_by_bucket("shared-bucket".to_string(), config_b.clone()); + cache.insert_by_bucket("shared-bucket".to_string(), config_catchall.clone()); + cache.mark_refreshed(); + + // Key matching prefix A + let key_a = "AWSLogs/111111111111/vpcflowlogs/us-east-1/2024/01/01/flow.log.gz"; + let result = cache.get_by_bucket("shared-bucket", key_a); + assert!(result.is_some()); + assert_eq!(result.unwrap().flow_log_id, "fl-s3-prefix-a"); + + // Key matching prefix B + let key_b = "AWSLogs/222222222222/vpcflowlogs/us-west-2/2024/01/01/flow.log.gz"; + let result = cache.get_by_bucket("shared-bucket", key_b); + assert!(result.is_some()); + assert_eq!(result.unwrap().flow_log_id, "fl-s3-prefix-b"); + + // Key that matches neither prefix A nor B — falls through to the catch-all + let key_other = "custom/path/flow.log.gz"; + let result = cache.get_by_bucket("shared-bucket", key_other); + assert!(result.is_some()); + assert_eq!(result.unwrap().flow_log_id, "fl-s3-catchall"); + } + + #[test] + fn test_cache_bucket_no_prefix_match_returns_none() { + let mut cache = FlowLogCache::new(); + + let config = FlowLogConfig { + log_format: "${version} ${account-id}".to_string(), + flow_log_id: "fl-s3-specific".to_string(), + tags: HashMap::new(), + folder_prefix: Some("AWSLogs/123456789012/vpcflowlogs/".to_string()), + parsed_fields: None, + }; + + cache.insert_by_bucket("my-bucket".to_string(), config); + cache.mark_refreshed(); + + // Object key does not start with the configured prefix → no match + let result = cache.get_by_bucket("my-bucket", "other-prefix/2024/01/01/flow.log.gz"); + assert!(result.is_none()); + } + + #[test] + fn test_cache_len_counts_both_maps() { + let mut cache = FlowLogCache::new(); + + let config = FlowLogConfig { + log_format: "${version} ${account-id}".to_string(), + flow_log_id: "fl-xxx".to_string(), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }; + + cache.insert_by_log_group("/aws/ec2/flowlogs".to_string(), config.clone()); + cache.insert_by_bucket("my-bucket".to_string(), config.clone()); + cache.mark_refreshed(); + + assert_eq!(cache.len(), 2); + assert!(!cache.is_empty()); + } + + #[test] + fn test_cache_len_counts_multiple_configs_per_bucket() { + let mut cache = FlowLogCache::new(); + + let make_config = |id: &str, prefix: Option<&str>| FlowLogConfig { + log_format: "${version}".to_string(), + flow_log_id: id.to_string(), + tags: HashMap::new(), + folder_prefix: prefix.map(|s| s.to_string()), + parsed_fields: None, + }; + + cache.insert_by_bucket("bucket".to_string(), make_config("fl-1", Some("prefix-a/"))); + cache.insert_by_bucket("bucket".to_string(), make_config("fl-2", Some("prefix-b/"))); + cache.mark_refreshed(); + + // Two configs in one bucket → len should be 2 + assert_eq!(cache.len(), 2); + } + + #[test] + fn test_cache_clear() { + let mut cache = FlowLogCache::new(); + let config = FlowLogConfig { + log_format: "${version}".to_string(), + flow_log_id: "fl-yyy".to_string(), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }; + cache.insert_by_log_group("group".to_string(), config.clone()); + cache.insert_by_bucket("bucket".to_string(), config); + cache.mark_refreshed(); + + cache.clear(); + assert!(cache.is_empty()); + assert!(cache.is_expired()); + } + + // ------------------------------------------------------------------ + // Expiration tests + // ------------------------------------------------------------------ + #[test] fn test_cache_expiration() { let mut cache = FlowLogCache::new(); @@ -305,36 +555,44 @@ mod tests { assert!(cache.is_expired()); } + // ------------------------------------------------------------------ + // Snapshot tests + // ------------------------------------------------------------------ + #[test] - fn test_snapshot() { + fn test_snapshot_round_trip() { let mut cache = FlowLogCache::new(); - let config1 = FlowLogConfig { + let cw_config = FlowLogConfig { log_format: "${version} ${account-id}".to_string(), - flow_log_id: "fl-111".to_string(), - tags: std::collections::HashMap::new(), + flow_log_id: "fl-cw-111".to_string(), + tags: HashMap::new(), + folder_prefix: None, parsed_fields: None, }; - cache.insert("/aws/ec2/flowlogs1".to_string(), config1.clone()); - - let config2 = FlowLogConfig { + let s3_config = FlowLogConfig { log_format: "${version} ${interface-id}".to_string(), - flow_log_id: "fl-222".to_string(), - tags: std::collections::HashMap::new(), + flow_log_id: "fl-s3-222".to_string(), + tags: HashMap::new(), + folder_prefix: Some("AWSLogs/".to_string()), parsed_fields: None, }; - cache.insert("/aws/ec2/flowlogs2".to_string(), config2.clone()); + + cache.insert_by_log_group("/aws/ec2/flowlogs".to_string(), cw_config.clone()); + cache.insert_by_bucket("my-bucket".to_string(), s3_config.clone()); cache.mark_refreshed(); let snapshot = cache.get_snapshot(); - assert_eq!(snapshot.flow_logs.len(), 2); + // 1 CloudWatch bucket, 1 S3 bucket key (with 1 config inside) + assert_eq!(snapshot.by_log_group.len(), 1); + assert_eq!(snapshot.by_bucket.len(), 1); assert_eq!( - snapshot.flow_logs.get("/aws/ec2/flowlogs1").unwrap(), - &config1 + snapshot.by_log_group.get("/aws/ec2/flowlogs").unwrap(), + &cw_config ); assert_eq!( - snapshot.flow_logs.get("/aws/ec2/flowlogs2").unwrap(), - &config2 + snapshot.by_bucket.get("my-bucket").unwrap(), + &vec![s3_config] ); } @@ -342,17 +600,31 @@ mod tests { fn test_load_snapshot() { let mut cache = FlowLogCache::new(); - let config = FlowLogConfig { + let cw_config = FlowLogConfig { log_format: "${version} ${account-id}".to_string(), - flow_log_id: "fl-123".to_string(), - tags: std::collections::HashMap::new(), + flow_log_id: "fl-cw-123".to_string(), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }; + let s3_config = FlowLogConfig { + log_format: "${version} ${srcaddr}".to_string(), + flow_log_id: "fl-s3-456".to_string(), + tags: HashMap::new(), + folder_prefix: Some("AWSLogs/".to_string()), parsed_fields: None, }; + let snapshot = CacheSnapshot { - flow_logs: { - let mut map = HashMap::new(); - map.insert("/aws/ec2/flowlogs".to_string(), config.clone()); - map + by_log_group: { + let mut m = HashMap::new(); + m.insert("/aws/ec2/flowlogs".to_string(), cw_config.clone()); + m + }, + by_bucket: { + let mut m = HashMap::new(); + m.insert("my-bucket".to_string(), vec![s3_config.clone()]); + m }, last_refreshed_secs: SystemTime::now() .duration_since(UNIX_EPOCH) @@ -362,17 +634,60 @@ mod tests { cache.load_snapshot(snapshot); - let retrieved = cache.get("/aws/ec2/flowlogs"); - assert!(retrieved.is_some()); - assert_eq!(retrieved.unwrap(), config); + assert_eq!( + cache.get_by_log_group("/aws/ec2/flowlogs").unwrap(), + cw_config + ); + assert_eq!( + cache + .get_by_bucket("my-bucket", "AWSLogs/2024/01/01/flow.log.gz") + .unwrap(), + s3_config + ); + } + + #[test] + fn test_load_expired_snapshot_is_ignored() { + let mut cache = FlowLogCache::new(); + + let config = FlowLogConfig { + log_format: "${version}".to_string(), + flow_log_id: "fl-old".to_string(), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }; + + let snapshot = CacheSnapshot { + by_log_group: { + let mut m = HashMap::new(); + m.insert("/old/group".to_string(), config); + m + }, + by_bucket: HashMap::new(), + last_refreshed_secs: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .saturating_sub(31 * 60), // expired + }; + + cache.load_snapshot(snapshot); + + // Cache should remain empty / expired + assert!(cache.is_expired()); + assert!(cache.is_empty()); } + // ------------------------------------------------------------------ + // Tags test + // ------------------------------------------------------------------ + #[test] fn test_flow_log_config_with_tags() { let mut cache = FlowLogCache::new(); - // Create config with tags - let mut tags = std::collections::HashMap::new(); + let mut tags = HashMap::new(); tags.insert("Environment".to_string(), "production".to_string()); tags.insert("Team".to_string(), "platform".to_string()); tags.insert("Application".to_string(), "vpc-monitoring".to_string()); @@ -381,39 +696,84 @@ mod tests { log_format: "${version} ${account-id} ${interface-id}".to_string(), flow_log_id: "fl-1234567890abcdef0".to_string(), tags: tags.clone(), + folder_prefix: None, parsed_fields: None, }; - cache.insert("/aws/ec2/flowlogs".to_string(), config.clone()); + cache.insert_by_log_group("/aws/ec2/flowlogs".to_string(), config.clone()); cache.mark_refreshed(); - let retrieved = cache.get("/aws/ec2/flowlogs"); - assert!(retrieved.is_some()); + let retrieved = cache.get_by_log_group("/aws/ec2/flowlogs").unwrap(); + assert_eq!(retrieved.tags.len(), 3); + assert_eq!(retrieved.tags.get("Environment").unwrap(), "production"); + assert_eq!(retrieved.tags.get("Team").unwrap(), "platform"); + assert_eq!(retrieved.tags.get("Application").unwrap(), "vpc-monitoring"); + } - let retrieved_config = retrieved.unwrap(); - assert_eq!(retrieved_config.tags.len(), 3); - assert_eq!( - retrieved_config.tags.get("Environment").unwrap(), - "production" - ); - assert_eq!(retrieved_config.tags.get("Team").unwrap(), "platform"); - assert_eq!( - retrieved_config.tags.get("Application").unwrap(), - "vpc-monitoring" - ); + // ------------------------------------------------------------------ + // set_parsed_fields write-back tests + // ------------------------------------------------------------------ + + #[test] + fn test_set_parsed_fields_by_log_group() { + let mut cache = FlowLogCache::new(); + + let config = FlowLogConfig { + log_format: "${version} ${account-id}".to_string(), + flow_log_id: "fl-123".to_string(), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }; + cache.insert_by_log_group("/aws/ec2/flowlogs".to_string(), config); + cache.mark_refreshed(); + + // Initially no parsed fields + let retrieved = cache.get_by_log_group("/aws/ec2/flowlogs").unwrap(); + assert!(retrieved.parsed_fields.is_none()); + + // Write back parsed fields + let fields = Arc::new(ParsedFields::Success(vec![ + ParsedField::new("version".to_string(), ParsedFieldType::Int32), + ParsedField::new("account-id".to_string(), ParsedFieldType::String), + ])); + cache.set_parsed_fields_by_log_group("/aws/ec2/flowlogs", fields.clone()); + + // Now should be present + let retrieved = cache.get_by_log_group("/aws/ec2/flowlogs").unwrap(); + assert!(retrieved.parsed_fields.is_some()); + if let ParsedFields::Success(fs) = retrieved.parsed_fields.unwrap().as_ref() { + assert_eq!(fs.len(), 2); + assert_eq!(fs[0].field_name, "version"); + } else { + panic!("Expected Success"); + } + } + + #[test] + fn test_set_parsed_fields_by_log_group_noop_on_missing_key() { + let mut cache = FlowLogCache::new(); + cache.mark_refreshed(); + + // Should not panic when key does not exist + let fields = Arc::new(ParsedFields::Success(vec![])); + cache.set_parsed_fields_by_log_group("/does/not/exist", fields); + // Cache stays empty + assert!(cache.is_empty()); } + // ------------------------------------------------------------------ + // Field-type tests (unchanged from original) + // ------------------------------------------------------------------ + #[test] fn test_parsed_field_type_mapping() { - // Test some known fields assert_eq!(get_field_type("version"), ParsedFieldType::Int32); assert_eq!(get_field_type("account-id"), ParsedFieldType::String); assert_eq!(get_field_type("srcport"), ParsedFieldType::Int32); assert_eq!(get_field_type("packets"), ParsedFieldType::Int64); assert_eq!(get_field_type("bytes"), ParsedFieldType::Int64); assert_eq!(get_field_type("action"), ParsedFieldType::String); - - // Test unknown field defaults to String assert_eq!(get_field_type("unknown-field"), ParsedFieldType::String); } @@ -431,7 +791,6 @@ mod tests { ParsedField::new("account-id".to_string(), ParsedFieldType::String), ]; let parsed = ParsedFields::Success(fields.clone()); - assert_eq!(parsed, ParsedFields::Success(fields)); } @@ -439,7 +798,6 @@ mod tests { fn test_parsed_fields_error() { let error_msg = "Invalid format string".to_string(); let parsed = ParsedFields::Error(error_msg.clone()); - assert_eq!(parsed, ParsedFields::Error(error_msg)); } @@ -450,22 +808,20 @@ mod tests { let config = FlowLogConfig { log_format: "${version} ${account-id}".to_string(), flow_log_id: "fl-123".to_string(), - tags: std::collections::HashMap::new(), + tags: HashMap::new(), + folder_prefix: None, parsed_fields: Some(Arc::new(ParsedFields::Success(vec![ ParsedField::new("version".to_string(), ParsedFieldType::Int32), ParsedField::new("account-id".to_string(), ParsedFieldType::String), ]))), }; - cache.insert("/aws/ec2/flowlogs".to_string(), config.clone()); + cache.insert_by_log_group("/aws/ec2/flowlogs".to_string(), config.clone()); cache.mark_refreshed(); - let retrieved = cache.get("/aws/ec2/flowlogs"); - assert!(retrieved.is_some()); - - let retrieved_config = retrieved.unwrap(); - assert!(retrieved_config.parsed_fields.is_some()); - if let Some(parsed_fields) = &retrieved_config.parsed_fields { + let retrieved = cache.get_by_log_group("/aws/ec2/flowlogs").unwrap(); + assert!(retrieved.parsed_fields.is_some()); + if let Some(parsed_fields) = &retrieved.parsed_fields { if let ParsedFields::Success(fields) = parsed_fields.as_ref() { assert_eq!(fields.len(), 2); assert_eq!(fields[0].field_name, "version"); @@ -475,8 +831,6 @@ mod tests { } else { panic!("Expected ParsedFields::Success"); } - } else { - panic!("Expected Some(parsed_fields)"); } } @@ -487,24 +841,20 @@ mod tests { let config = FlowLogConfig { log_format: "invalid format".to_string(), flow_log_id: "fl-123".to_string(), - tags: std::collections::HashMap::new(), + tags: HashMap::new(), + folder_prefix: None, parsed_fields: Some(Arc::new(ParsedFields::Error("Parse failed".to_string()))), }; - cache.insert("/aws/ec2/flowlogs".to_string(), config.clone()); + cache.insert_by_log_group("/aws/ec2/flowlogs".to_string(), config); cache.mark_refreshed(); - let retrieved = cache.get("/aws/ec2/flowlogs"); - assert!(retrieved.is_some()); - - let retrieved_config = retrieved.unwrap(); - assert!(retrieved_config.parsed_fields.is_some()); - - if let Some(parsed_fields) = &retrieved_config.parsed_fields { + let retrieved = cache.get_by_log_group("/aws/ec2/flowlogs").unwrap(); + if let Some(parsed_fields) = &retrieved.parsed_fields { if let ParsedFields::Error(msg) = parsed_fields.as_ref() { assert_eq!(msg, "Parse failed"); } else { - panic!("Expected ParsedFields::Error") + panic!("Expected ParsedFields::Error"); } } else { panic!("Expected Some(parsed_fields)"); diff --git a/src/flowlogs/ec2.rs b/src/flowlogs/ec2.rs index 6ddcc32..58d7d32 100644 --- a/src/flowlogs/ec2.rs +++ b/src/flowlogs/ec2.rs @@ -18,6 +18,20 @@ pub enum Ec2Error { InvalidFormat(String), } +/// The result of fetching all EC2 Flow Log configurations. +/// +/// Flow logs are partitioned by their delivery destination type: +/// - `by_log_group`: keyed by CloudWatch log group name +/// - `by_bucket`: keyed by S3 bucket name (extracted from the `log-destination` ARN). +/// Multiple flow logs may share a bucket, differentiated by folder prefix. +#[derive(Debug, Default)] +pub struct FetchedFlowLogs { + /// CloudWatch-delivered flow logs: log_group_name → config + pub by_log_group: HashMap, + /// S3-delivered flow logs: bucket_name → list of configs (one per distinct folder prefix) + pub by_bucket: HashMap>, +} + /// Fetcher for EC2 Flow Log configurations from EC2 API /// /// This fetcher queries the EC2 DescribeFlowLogs API to retrieve EC2 Flow Log @@ -34,12 +48,15 @@ impl Ec2FlowLogFetcher { Self { client } } - /// Fetch all EC2 Flow Logs that are delivered to CloudWatch Logs + /// Fetch all EC2 Flow Logs, partitioned by destination type. /// - /// Returns a map of log group name to flow log configuration. - /// Only includes flow logs with destination type "cloud-watch-logs". + /// - CloudWatch Logs destinations are keyed by log group name. + /// - S3 destinations are keyed by bucket name extracted from the destination ARN + /// (`arn:aws:s3:::bucket-name[/optional-prefix]`). /// - pub async fn fetch_all_flow_logs(&self) -> Result, Ec2Error> { + /// Flow logs with missing or unrecognised destination information are skipped with a + /// warning so that a single misconfigured flow log cannot block the others. + pub async fn fetch_all_flow_logs(&self) -> Result { debug!("Fetching VPC flow logs from EC2 API"); let result = self.client.describe_flow_logs().send().await; @@ -56,23 +73,11 @@ impl Ec2FlowLogFetcher { } }; - let mut flow_log_configs = HashMap::new(); + let mut fetched = FetchedFlowLogs::default(); - // flow_logs() returns a slice, not an Option for flow_log in response.flow_logs() { - // Only process CloudWatch Logs destinations - match flow_log.log_destination_type() { - Some(LogDestinationType::CloudWatchLogs) => { - // Continue processing this flow log - } - Some(v) => { - debug!( - flow_log_id = ?flow_log.flow_log_id(), - log_destination_type = v.to_string(), - "Skipping flow log with non-CloudWatch destination" - ); - continue; - } + let destination_type = match flow_log.log_destination_type() { + Some(t) => t, None => { debug!( flow_log_id = ?flow_log.flow_log_id(), @@ -80,37 +85,22 @@ impl Ec2FlowLogFetcher { ); continue; } - } - - // Get the log group name - let log_group_name = match flow_log.log_group_name() { - Some(name) => name, - None => { - warn!( - flow_log_id = ?flow_log.flow_log_id(), - "Flow log missing log group name, skipping" - ); - continue; - } }; let log_format = match flow_log.log_format().map(|s| s.to_string()) { - Some(log_format) => log_format, + Some(f) => f, None => { warn!( flow_log_id = ?flow_log.flow_log_id(), "Flow log missing log format, skipping" ); - continue; } }; let flow_log_id = flow_log.flow_log_id().unwrap_or("unknown").to_string(); - // Extract tags from the flow log - // These tags will be applied to logs as resource attributes - // with the prefix "ec2.flow-logs.tags." + // Extract tags let mut tags = HashMap::new(); for tag in flow_log.tags() { if let (Some(key), Some(value)) = (tag.key(), tag.value()) { @@ -122,25 +112,136 @@ impl Ec2FlowLogFetcher { log_format, flow_log_id: flow_log_id.clone(), tags, + folder_prefix: None, // set below for S3 destinations parsed_fields: None, }; - debug!( - log_group = %log_group_name, - flow_log_id = %flow_log_id, - tag_count = config.tags.len(), - "Found EC2 flow log configuration with tags" - ); + match destination_type { + LogDestinationType::CloudWatchLogs => { + let log_group_name = match flow_log.log_group_name() { + Some(name) => name, + None => { + warn!( + flow_log_id = %flow_log_id, + "CloudWatch flow log missing log group name, skipping" + ); + continue; + } + }; + + debug!( + log_group = %log_group_name, + flow_log_id = %flow_log_id, + tag_count = config.tags.len(), + "Found CloudWatch EC2 flow log configuration" + ); + + fetched + .by_log_group + .insert(log_group_name.to_string(), config); + } + + LogDestinationType::S3 => { + let destination_arn = match flow_log.log_destination() { + Some(arn) => arn, + None => { + warn!( + flow_log_id = %flow_log_id, + "S3 flow log missing log_destination ARN, skipping" + ); + continue; + } + }; - flow_log_configs.insert(log_group_name.to_string(), config); + let (bucket_name, folder_prefix) = match extract_bucket_and_prefix_from_arn( + destination_arn, + ) { + Some(parts) => parts, + None => { + warn!( + flow_log_id = %flow_log_id, + destination_arn = %destination_arn, + "Unable to extract bucket name from S3 flow log destination ARN, skipping" + ); + continue; + } + }; + + debug!( + bucket = %bucket_name, + folder_prefix = ?folder_prefix, + flow_log_id = %flow_log_id, + tag_count = config.tags.len(), + "Found S3 EC2 flow log configuration" + ); + + let s3_config = FlowLogConfig { + folder_prefix, + ..config + }; + fetched + .by_bucket + .entry(bucket_name) + .or_default() + .push(s3_config); + } + + other => { + debug!( + flow_log_id = %flow_log_id, + log_destination_type = other.to_string(), + "Skipping flow log with unsupported destination type" + ); + } + } } + let s3_config_count: usize = fetched.by_bucket.values().map(|v| v.len()).sum(); info!( - count = flow_log_configs.len(), + cloudwatch_count = fetched.by_log_group.len(), + s3_bucket_count = fetched.by_bucket.len(), + s3_config_count, "Fetched EC2 flow log configurations from EC2" ); - Ok(flow_log_configs) + Ok(fetched) + } +} + +/// Extract the bucket name and optional folder prefix from an S3 ARN of the form +/// `arn:aws:s3:::bucket-name` or `arn:aws:s3:::bucket-name/optional/prefix`. +/// +/// Returns `Some((bucket, folder_prefix))` where `folder_prefix` is `None` when no +/// prefix path is present in the ARN, or `Some(prefix)` otherwise. +/// Returns `None` if the ARN does not have the expected structure. +pub(crate) fn extract_bucket_and_prefix_from_arn(arn: &str) -> Option<(String, Option)> { + // ARN format: arn:partition:s3:region:account-id:resource + // For S3, region and account-id may be empty (e.g. "arn:aws:s3:::bucket-name") + // or populated (e.g. "arn:aws:s3:us-east-1:123456789012:bucket-name"). + // We always split on ":" and take the 6th field (index 5) onward as the resource. + let resource = arn.splitn(6, ':').nth(5)?; + if resource.is_empty() { + return None; + } + // The bucket name is the first path component; anything after the first '/' is the prefix. + match resource.splitn(2, '/').collect::>().as_slice() { + [bucket] => { + if bucket.is_empty() { + None + } else { + Some((bucket.to_string(), None)) + } + } + [bucket, prefix] => { + if bucket.is_empty() { + None + } else if prefix.is_empty() { + Some((bucket.to_string(), None)) + } else { + Some((bucket.to_string(), Some(prefix.to_string()))) + } + } + _ => None, } } @@ -158,4 +259,75 @@ mod tests { // Just verify we can create the fetcher assert!(std::mem::size_of_val(&fetcher) > 0); } + + #[test] + fn test_extract_bucket_and_prefix_from_arn_simple() { + let arn = "arn:aws:s3:::my-flow-logs-bucket"; + assert_eq!( + extract_bucket_and_prefix_from_arn(arn), + Some(("my-flow-logs-bucket".to_string(), None)) + ); + } + + #[test] + fn test_extract_bucket_and_prefix_from_arn_with_prefix() { + let arn = "arn:aws:s3:::my-flow-logs-bucket/vpc-flow-logs/"; + assert_eq!( + extract_bucket_and_prefix_from_arn(arn), + Some(( + "my-flow-logs-bucket".to_string(), + Some("vpc-flow-logs/".to_string()) + )) + ); + } + + #[test] + fn test_extract_bucket_and_prefix_from_arn_with_deep_prefix() { + let arn = "arn:aws:s3:::my-bucket/AWSLogs/123456789012/vpcflowlogs/us-east-1/"; + assert_eq!( + extract_bucket_and_prefix_from_arn(arn), + Some(( + "my-bucket".to_string(), + Some("AWSLogs/123456789012/vpcflowlogs/us-east-1/".to_string()) + )) + ); + } + + #[test] + fn test_extract_bucket_and_prefix_from_arn_trailing_slash_only() { + // arn:aws:s3:::bucket/ — slash present but prefix is empty string + let arn = "arn:aws:s3:::my-bucket/"; + assert_eq!( + extract_bucket_and_prefix_from_arn(arn), + Some(("my-bucket".to_string(), None)) + ); + } + + #[test] + fn test_extract_bucket_and_prefix_from_arn_with_region_and_account() { + let arn = "arn:aws:s3:us-east-1:123456789012:my-flow-logs-bucket"; + assert_eq!( + extract_bucket_and_prefix_from_arn(arn), + Some(("my-flow-logs-bucket".to_string(), None)) + ); + } + + #[test] + fn test_extract_bucket_and_prefix_from_arn_with_region_and_account_and_prefix() { + let arn = "arn:aws:s3:us-east-1:123456789012:my-flow-logs-bucket/vpc-flow-logs/"; + assert_eq!( + extract_bucket_and_prefix_from_arn(arn), + Some(( + "my-flow-logs-bucket".to_string(), + Some("vpc-flow-logs/".to_string()) + )) + ); + } + + #[test] + fn test_extract_bucket_and_prefix_from_arn_invalid() { + assert_eq!(extract_bucket_and_prefix_from_arn("not-an-arn"), None); + assert_eq!(extract_bucket_and_prefix_from_arn("arn:aws:s3:::"), None); + assert_eq!(extract_bucket_and_prefix_from_arn(""), None); + } } diff --git a/src/flowlogs/mod.rs b/src/flowlogs/mod.rs index 6aa88a0..6d3d7fa 100644 --- a/src/flowlogs/mod.rs +++ b/src/flowlogs/mod.rs @@ -7,12 +7,23 @@ //! - Caching configurations in-memory with 30-minute TTL //! - Persisting configurations to S3 for durability across Lambda cold starts //! +//! Flow logs are indexed by destination type: +//! - CloudWatch Logs destinations: looked up by log group name via `get_config` +//! - S3 destinations: looked up by bucket name + object key via `get_config_by_bucket` +//! +//! ## Thread Safety +//! +//! `FlowLogManager` is cheaply cloneable and safe to share across concurrent tokio tasks. +//! All mutable state lives behind an `Arc>`, so +//! callers can clone the handle and call `get_config` / `get_config_by_bucket` from +//! multiple tasks simultaneously without any external synchronization. mod cache; mod ec2; pub use cache::{ CacheSnapshot, FlowLogCache, FlowLogConfig, ParsedField, ParsedFieldType, ParsedFields, + get_field_type, }; pub use ec2::{Ec2Error, Ec2FlowLogFetcher}; @@ -24,12 +35,10 @@ use std::{ time::{Duration, Instant}, }; use thiserror::Error; +use tokio::sync::Mutex; use tracing::{debug, error, info, warn}; -use crate::{ - flowlogs::cache::get_field_type, - s3_cache::{S3Cache, S3CacheError}, -}; +use crate::s3_cache::{S3Cache, S3CacheError}; /// S3 cache key for storing flow log configurations const FLOW_LOG_CACHE_KEY: &str = "rotel-lambda-forwarder/cache/flow-logs/configs.json.gz"; @@ -63,14 +72,15 @@ impl FlowLogCacheFile { } } -/// Main flow log manager that coordinates cache, S3, and EC2 operations -/// -/// The `FlowLogManager` provides automatic caching and persistence of EC2 Flow Log -/// configurations. It fetches configurations from EC2 API on startup and caches them -/// in memory with a 30-minute TTL. Optionally, configs can be persisted to S3 for -/// durability across Lambda cold starts. +// --------------------------------------------------------------------------- +// Inner state (lives behind the shared mutex) +// --------------------------------------------------------------------------- + +/// All mutable state owned by `FlowLogManager`. /// -pub struct FlowLogManager { +/// Callers never access this type directly; it is always accessed through the +/// `Arc>` held by `FlowLogManager`. +struct FlowLogManagerInner { cache: FlowLogCache, s3_cache: Option>, ec2_fetcher: Ec2FlowLogFetcher, @@ -79,13 +89,8 @@ pub struct FlowLogManager { cooldown_duration: Duration, } -impl FlowLogManager { - /// Create a new flow log manager - pub fn new( - ec2_client: Ec2Client, - s3_client: Option, - s3_bucket: Option, - ) -> Self { +impl FlowLogManagerInner { + fn new(ec2_client: Ec2Client, s3_client: Option, s3_bucket: Option) -> Self { let persist_enabled = s3_client.is_some() && s3_bucket.is_some(); let s3_cache = match (s3_client, s3_bucket) { @@ -105,25 +110,14 @@ impl FlowLogManager { } } - /// Initialize the flow log manager by loading the cache from S3 and fetching from EC2 - pub async fn initialize(&mut self) -> Result<(), FlowLogError> { - match self.reload_cache_if_needed().await { - Ok(_) => { - info!("Flow log manager initialization complete"); - Ok(()) - } - Err(e) => { - error!(error = %e, "Failed to fetch flow logs from EC2 during initialization"); - Err(FlowLogError::Ec2(e)) - } - } - } + // ----------------------------------------------------------------------- + // Reload / fetch helpers + // ----------------------------------------------------------------------- - /// Reload cache from S3 or EC2 if needed + /// Reload cache from S3 or EC2 if needed. /// /// First attempts to load a valid (non-expired) cache from S3. /// If S3 cache doesn't exist or is expired, fetches from EC2. - /// Returns Ok(true) if cache was successfully loaded/refreshed, Ok(false) if using existing cache. async fn reload_cache_if_needed(&mut self) -> Result<(), Ec2Error> { // Check if fetching is currently disabled due to AccessDenied if let Some(disabled_until) = self.fetch_disabled_until { @@ -148,7 +142,8 @@ impl FlowLogManager { Ok(Some(cache_file)) => { let is_expired = cache_file.snapshot.is_expired(); info!( - entry_count = cache_file.snapshot.flow_logs.len(), + log_group_count = cache_file.snapshot.by_log_group.len(), + bucket_count = cache_file.snapshot.by_bucket.len(), expired = is_expired, "Loaded flow log cache from S3" ); @@ -191,86 +186,26 @@ impl FlowLogManager { Ok(()) // Don't fail, just use cached data (if any) } - Err(e) => { - // For other errors, propagate them - Err(e) - } + Err(e) => Err(e), } } - /// Get flow log configuration for a log group - /// First checks the in-memory cache. Returns None if not found or expired. - /// Lazily parses the log format fields on first access and caches the result. - pub async fn get_config(&mut self, log_group_name: &str) -> Option { - // Check if cache is expired - if self.cache.is_expired() { - debug!( - log_group = %log_group_name, - "Cache expired, attempting to reload" - ); - - // Attempt to reload cache from S3 or EC2 - match self.reload_cache_if_needed().await { - Ok(_) => { - debug!("Successfully reloaded flow log cache"); - } - Err(e) => { - warn!( - error = %e, - log_group = %log_group_name, - "Cache expired and could not be reloaded" - ); - return None; - } - } - } - - // Get a mutable reference to the config so we can parse fields if needed - let config = self.cache.get_mut(log_group_name)?; + /// Fetch all flow logs from EC2 and update both cache maps. + async fn fetch_and_update_all(&mut self) -> Result<(), Ec2Error> { + debug!("Fetching flow logs from EC2"); - // Parse fields if not already attempted - if config.parsed_fields.is_none() { - let fields = parse_log_format(&config.log_format); + let fetched = self.ec2_fetcher.fetch_all_flow_logs().await?; - if fields.is_empty() { - // Parsing failed - cache the error - config.parsed_fields = Some(Arc::new(ParsedFields::Error( - "Failed to parse log format or no fields found".to_string(), - ))); - warn!( - log_group = %log_group_name, - log_format = %config.log_format, - "Failed to parse flow log format fields" - ); - } else { - // Parsing succeeded - cache the result - let field_count = fields.len(); - config.parsed_fields = Some(Arc::new(ParsedFields::Success(fields))); - debug!( - log_group = %log_group_name, - field_count = field_count, - "Parsed log format fields with types for flow log" - ); - } + // Update CloudWatch cache entries + for (log_group, config) in fetched.by_log_group { + self.cache.insert_by_log_group(log_group, config); } - Some(config.clone()) - } - - /// Fetch all flow logs from EC2 and update the cache - async fn fetch_and_update_all(&mut self) -> Result<(), Ec2Error> { - debug!("Fetching flow logs from EC2"); - - let flow_log_configs = match self.ec2_fetcher.fetch_all_flow_logs().await { - Ok(configs) => configs, - Err(e) => { - return Err(e); + // Update S3 cache entries + for (bucket, configs) in fetched.by_bucket { + for config in configs { + self.cache.insert_by_bucket(bucket.clone(), config); } - }; - - // Update cache with fetched configurations - for (log_group, config) in flow_log_configs { - self.cache.insert(log_group, config); } // Mark the cache as refreshed @@ -287,7 +222,7 @@ impl FlowLogManager { Ok(()) } - /// Persist the current cache to S3 + /// Persist the current cache to S3. async fn persist_cache(&mut self) -> Result<(), FlowLogError> { if let Some(s3_cache) = &mut self.s3_cache { let snapshot = self.cache.get_snapshot(); @@ -322,7 +257,7 @@ impl FlowLogManager { }) .await?; - // Replace out local cache with the most recent snapshot + // Replace our local cache with the most recent snapshot self.cache.load_snapshot(merged.snapshot.clone()); // Try to save again with the merged data. It is possible this @@ -339,17 +274,50 @@ impl FlowLogManager { } } - /// Check if flow log fetching is currently disabled due to the circuit breaker - pub fn is_fetch_disabled(&self) -> bool { + // ----------------------------------------------------------------------- + // Cache freshness helpers + // ----------------------------------------------------------------------- + + /// Ensure the cache is not expired, reloading if necessary. + /// + /// Returns `true` if the cache is usable (or was successfully reloaded), + /// and `false` if it could not be refreshed. + async fn ensure_cache_fresh(&mut self, key: &str, key_type: &str) -> bool { + if self.cache.is_expired() { + debug!( + key = %key, + key_type = %key_type, + "Cache expired, attempting to reload" + ); + + match self.reload_cache_if_needed().await { + Ok(_) => { + debug!("Successfully reloaded flow log cache"); + } + Err(e) => { + warn!( + error = %e, + key = %key, + key_type = %key_type, + "Cache expired and could not be reloaded" + ); + return false; + } + } + } + true + } + + // ----------------------------------------------------------------------- + // Status helpers + // ----------------------------------------------------------------------- + + fn is_fetch_disabled(&self) -> bool { self.fetch_disabled_until .is_some_and(|disabled_until| Instant::now() < disabled_until) } - /// Get the remaining cooldown time if fetching is disabled - /// - /// Returns `Some(Duration)` with the remaining time if the circuit breaker is active, - /// or `None` if fetching is currently enabled. - pub fn remaining_cooldown(&self) -> Option { + fn remaining_cooldown(&self) -> Option { self.fetch_disabled_until.and_then(|disabled_until| { let now = Instant::now(); if now < disabled_until { @@ -359,25 +327,218 @@ impl FlowLogManager { } }) } +} + +// --------------------------------------------------------------------------- +// Public handle +// --------------------------------------------------------------------------- + +/// Main flow log manager that coordinates cache, S3, and EC2 operations. +/// +/// `FlowLogManager` is a cheap-to-clone, `Send + Sync` handle backed by a shared +/// `Arc>`. Multiple tokio tasks may hold clones of the +/// same handle and call [`get_config`] / [`get_config_by_bucket`] concurrently — +/// the internal mutex serialises cache reads, lazy field parsing, and cache refreshes. +/// +/// ## Lookup methods +/// - [`get_config`]: look up by CloudWatch log group name (CloudWatch Logs destinations) +/// - [`get_config_by_bucket`]: look up by S3 bucket name + object key (S3 destinations) +/// +/// [`get_config`]: FlowLogManager::get_config +/// [`get_config_by_bucket`]: FlowLogManager::get_config_by_bucket +#[derive(Clone)] +pub struct FlowLogManager { + inner: Arc>, +} + +impl FlowLogManager { + /// Create a new flow log manager. + pub fn new( + ec2_client: Ec2Client, + s3_client: Option, + s3_bucket: Option, + ) -> Self { + Self { + inner: Arc::new(Mutex::new(FlowLogManagerInner::new( + ec2_client, s3_client, s3_bucket, + ))), + } + } + + /// Initialize the flow log manager by loading the cache from S3 and fetching from EC2. + pub async fn initialize(&self) -> Result<(), FlowLogError> { + let mut inner = self.inner.lock().await; + match inner.reload_cache_if_needed().await { + Ok(_) => { + info!("Flow log manager initialization complete"); + Ok(()) + } + Err(e) => { + error!(error = %e, "Failed to fetch flow logs from EC2 during initialization"); + Err(FlowLogError::Ec2(e)) + } + } + } + + /// Get flow log configuration for a CloudWatch log group name. + /// + /// Checks the in-memory cache first. If the cache is expired it attempts a reload + /// from S3 / EC2 before returning. Returns `None` if no matching configuration is + /// found or if the cache cannot be refreshed. + /// + /// Parsed fields are lazily initialised on first access and written back into the + /// cache to avoid re-parsing on subsequent calls. + pub async fn get_config(&self, log_group_name: &str) -> Option { + let mut inner = self.inner.lock().await; + + if !inner.ensure_cache_fresh(log_group_name, "log_group").await { + return None; + } + + let mut config = inner.cache.get_by_log_group(log_group_name)?; + + // Lazily compute parsed fields and write them back into the cache so + // that subsequent callers find them already populated. + if config.parsed_fields.is_none() { + let parsed = Arc::new(Self::compute_parsed_fields( + &config.log_format, + log_group_name, + "log_group", + )); + inner + .cache + .set_parsed_fields_by_log_group(log_group_name, Arc::clone(&parsed)); + config.parsed_fields = Some(parsed); + } + + Some(config) + } - /// Get cache statistics + /// Get flow log configuration for an S3 object. /// - /// Returns information about the current state of the cache including - /// the number of cached entries and whether fetching is currently disabled. + /// Mirrors [`get_config`] but looks up by S3 bucket name + object key rather than a + /// CloudWatch log group name. Because multiple flow logs may target the same bucket + /// (differentiated by a folder prefix), the full object key is required to select + /// the correct configuration. This is used when processing VPC flow log files + /// delivered to S3. + /// + /// Note: `parsed_fields` is intentionally **not** populated on the returned config. + /// S3-delivered VPC flow log files carry their own column-header line, which the + /// S3 record parser reads directly via [`parse_vpclog_header`]. The `log_format` + /// string stored in the cache is therefore unused on the S3 path. + /// + /// [`get_config`]: FlowLogManager::get_config + /// [`parse_vpclog_header`]: crate::parse::vpclog::parse_vpclog_header + pub async fn get_config_by_bucket( + &self, + bucket_name: &str, + object_key: &str, + ) -> Option { + let mut inner = self.inner.lock().await; + + if !inner.ensure_cache_fresh(bucket_name, "bucket").await { + return None; + } + + inner.cache.get_by_bucket(bucket_name, object_key) + } + + // ----------------------------------------------------------------------- + // Status accessors (read-only, acquire lock briefly) + // ----------------------------------------------------------------------- + + /// Check if flow log fetching is currently disabled due to the circuit breaker. + pub async fn is_fetch_disabled(&self) -> bool { + self.inner.lock().await.is_fetch_disabled() + } + + /// Get the remaining cooldown time if fetching is disabled. + /// + /// Returns `Some(Duration)` with the remaining time if the circuit breaker is active, + /// or `None` if fetching is currently enabled. + pub async fn remaining_cooldown(&self) -> Option { + self.inner.lock().await.remaining_cooldown() + } + + // ----------------------------------------------------------------------- + // Private helpers + // ----------------------------------------------------------------------- + + /// Parse the log format string into typed fields, returning a `ParsedFields` value. + fn compute_parsed_fields(log_format: &str, key: &str, key_type: &str) -> ParsedFields { + let fields = parse_log_format(log_format); + + if fields.is_empty() { + warn!( + key = %key, + key_type = %key_type, + log_format = %log_format, + "Failed to parse flow log format fields" + ); + ParsedFields::Error("Failed to parse log format or no fields found".to_string()) + } else { + debug!( + key = %key, + key_type = %key_type, + field_count = fields.len(), + "Parsed log format fields with types for flow log" + ); + ParsedFields::Success(fields) + } + } + + // ----------------------------------------------------------------------- + // Test helpers (only compiled in test builds) + // ----------------------------------------------------------------------- + + /// Get cache statistics. #[cfg(test)] - pub fn cache_stats(&self) -> CacheStats { + pub async fn cache_stats(&self) -> CacheStats { + let inner = self.inner.lock().await; CacheStats { - entry_count: self.cache.len(), - persist_enabled: self.persist_enabled, - fetch_disabled: self.is_fetch_disabled(), + entry_count: inner.cache.len(), + persist_enabled: inner.persist_enabled, + fetch_disabled: inner.is_fetch_disabled(), } } + + /// Expose inner cache length for tests. + #[cfg(test)] + pub async fn cache_len(&self) -> usize { + self.inner.lock().await.cache.len() + } + + /// Expose inner cache expiry status for tests. + #[cfg(test)] + pub async fn cache_is_expired(&self) -> bool { + self.inner.lock().await.cache.is_expired() + } + + /// Load a snapshot directly into the inner cache (test helper). + #[cfg(test)] + pub async fn load_cache_snapshot(&self, snapshot: CacheSnapshot) { + self.inner.lock().await.cache.load_snapshot(snapshot); + } + + /// Override the cooldown duration (test helper). + #[cfg(test)] + pub async fn set_cooldown_duration(&self, duration: Duration) { + self.inner.lock().await.cooldown_duration = duration; + } + + /// Simulate an AccessDenied circuit-breaker trip (test helper). + #[cfg(test)] + pub async fn trigger_fetch_disabled(&self) { + let mut inner = self.inner.lock().await; + let cooldown = inner.cooldown_duration; + inner.fetch_disabled_until = Some(Instant::now() + cooldown); + } } -/// Parse the LogFormat string to extract field names and their types +/// Parse the LogFormat string to extract field names and their types. /// -/// LogFormat strings look like: "${version} ${account-id} ${interface-id} ..." -/// This function extracts the field names between ${ and } and assigns types based on +/// LogFormat strings look like: `"${version} ${account-id} ${interface-id} ..."` +/// This function extracts the field names between `${` and `}` and assigns types based on /// the AWS VPC Flow Logs documentation. pub fn parse_log_format(log_format: &str) -> Vec { let mut fields = Vec::new(); @@ -413,9 +574,9 @@ pub fn parse_log_format(log_format: &str) -> Vec { fields } -/// Cache statistics -#[derive(Debug, Clone)] +/// Cache statistics (test builds only). #[cfg(test)] +#[derive(Debug, Clone)] pub struct CacheStats { /// Number of entries currently in the cache pub entry_count: usize, @@ -435,67 +596,144 @@ mod tests { let config = aws_config::defaults(BehaviorVersion::latest()).load().await; let ec2_client = Ec2Client::new(&config); - let mut manager = FlowLogManager::new(ec2_client, None, None); + let manager = FlowLogManager::new(ec2_client, None, None); // Set a short cooldown for testing - manager.cooldown_duration = Duration::from_millis(100); + manager + .set_cooldown_duration(Duration::from_millis(100)) + .await; - // Simulate AccessDenied by setting the disabled time - manager.fetch_disabled_until = Some(Instant::now() + manager.cooldown_duration); + // Simulate AccessDenied by tripping the circuit breaker + manager.trigger_fetch_disabled().await; // Initially should be disabled - let stats = manager.cache_stats(); + let stats = manager.cache_stats().await; assert!(stats.fetch_disabled); // Wait for cooldown to elapse tokio::time::sleep(Duration::from_millis(150)).await; // Should now be enabled again - let stats = manager.cache_stats(); + let stats = manager.cache_stats().await; assert!(!stats.fetch_disabled); } #[tokio::test] - async fn test_cache_reuse_optimization() { + async fn test_cache_reuse_optimization_by_log_group() { use crate::flowlogs::cache::{CacheSnapshot, FlowLogConfig}; use std::collections::HashMap; let config = aws_config::defaults(BehaviorVersion::latest()).load().await; let ec2_client = Ec2Client::new(&config); - let mut manager = FlowLogManager::new(ec2_client, None, None); + let manager = FlowLogManager::new(ec2_client, None, None); - // Manually insert a valid cache entry - let mut flow_logs = HashMap::new(); - flow_logs.insert( + // Manually insert a valid cache entry via a snapshot + let mut by_log_group = HashMap::new(); + by_log_group.insert( "/aws/ec2/test-flowlogs".to_string(), FlowLogConfig { log_format: "${version} ${account-id}".to_string(), flow_log_id: "fl-test123".to_string(), tags: HashMap::new(), + folder_prefix: None, parsed_fields: None, }, ); let snapshot = CacheSnapshot { - flow_logs, + by_log_group, + by_bucket: HashMap::new(), last_refreshed_secs: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(), }; - manager.cache.load_snapshot(snapshot); + manager.load_cache_snapshot(snapshot).await; - // Verify cache has the entry and is not expired - assert!(!manager.cache.is_expired()); - assert_eq!(manager.cache.len(), 1); + assert!(!manager.cache_is_expired().await); + assert_eq!(manager.cache_len().await, 1); let config = manager.get_config("/aws/ec2/test-flowlogs").await; assert!(config.is_some()); assert_eq!(config.unwrap().flow_log_id, "fl-test123"); } + #[tokio::test] + async fn test_cache_reuse_optimization_by_bucket() { + use crate::flowlogs::cache::{CacheSnapshot, FlowLogConfig}; + use std::collections::HashMap; + + let config = aws_config::defaults(BehaviorVersion::latest()).load().await; + let ec2_client = Ec2Client::new(&config); + + let manager = FlowLogManager::new(ec2_client, None, None); + + let mut by_bucket = HashMap::new(); + by_bucket.insert( + "my-flow-logs-bucket".to_string(), + vec![FlowLogConfig { + log_format: "${version} ${account-id} ${srcaddr}".to_string(), + flow_log_id: "fl-s3-abc456".to_string(), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }], + ); + + let snapshot = CacheSnapshot { + by_log_group: HashMap::new(), + by_bucket, + last_refreshed_secs: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + + manager.load_cache_snapshot(snapshot).await; + + assert!(!manager.cache_is_expired().await); + assert_eq!(manager.cache_len().await, 1); + + let config = manager + .get_config_by_bucket("my-flow-logs-bucket", "AWSLogs/2024/01/01/flow.log.gz") + .await; + assert!(config.is_some()); + let config = config.unwrap(); + assert_eq!(config.flow_log_id, "fl-s3-abc456"); + // parsed_fields is intentionally not populated on the S3 path — S3-delivered + // VPC flow log files carry their own column-header line which is parsed at + // read time, so the cached log_format is unused on this path. + assert!(config.parsed_fields.is_none()); + } + + #[tokio::test] + async fn test_get_config_by_bucket_miss() { + use std::collections::HashMap; + + let config = aws_config::defaults(BehaviorVersion::latest()).load().await; + let ec2_client = Ec2Client::new(&config); + + let manager = FlowLogManager::new(ec2_client, None, None); + + // Seed with a fresh (non-expired) empty cache so we don't attempt an EC2 fetch + let snapshot = crate::flowlogs::cache::CacheSnapshot { + by_log_group: HashMap::new(), + by_bucket: HashMap::new(), + last_refreshed_secs: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + manager.load_cache_snapshot(snapshot).await; + + let result = manager + .get_config_by_bucket("non-existent-bucket", "some/key.log.gz") + .await; + assert!(result.is_none()); + } + #[test] fn test_parse_log_format_default() { let log_format = "${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}"; @@ -541,16 +779,17 @@ mod tests { let config = aws_config::defaults(BehaviorVersion::latest()).load().await; let ec2_client = Ec2Client::new(&config); - let mut manager = FlowLogManager::new(ec2_client, None, None); + let manager = FlowLogManager::new(ec2_client, None, None); // Manually insert a cache entry with an expired timestamp - let mut flow_logs = HashMap::new(); - flow_logs.insert( + let mut by_log_group = HashMap::new(); + by_log_group.insert( "/aws/ec2/expired-flowlogs".to_string(), FlowLogConfig { log_format: "${version} ${account-id}".to_string(), flow_log_id: "fl-expired123".to_string(), tags: HashMap::new(), + folder_prefix: None, parsed_fields: None, }, ); @@ -563,20 +802,110 @@ mod tests { - 1900; // 31+ minutes ago let snapshot = CacheSnapshot { - flow_logs, + by_log_group, + by_bucket: HashMap::new(), last_refreshed_secs: expired_time, }; - manager.cache.load_snapshot(snapshot); + manager.load_cache_snapshot(snapshot).await; - // Verify cache is expired but entries are still present - assert!(manager.cache.is_expired()); - // Note: cache.len() still returns 1 because expired cache doesn't clear entries, - // it just refuses to serve them + // load_snapshot ignores expired snapshots, so cache should be empty & expired + assert!(manager.cache_is_expired().await); // Attempt to get config - should return None since cache is expired // and we can't reload from EC2 (no permissions) or S3 (not configured) let config = manager.get_config("/aws/ec2/expired-flowlogs").await; assert!(config.is_none()); } + + /// Verify that a `FlowLogManager` clone shares the same underlying state. + #[tokio::test] + async fn test_clone_shares_state() { + use crate::flowlogs::cache::{CacheSnapshot, FlowLogConfig}; + use std::collections::HashMap; + + let config = aws_config::defaults(BehaviorVersion::latest()).load().await; + let ec2_client = Ec2Client::new(&config); + + let manager = FlowLogManager::new(ec2_client, None, None); + let manager_clone = manager.clone(); + + // Seed via the original handle + let mut by_log_group = HashMap::new(); + by_log_group.insert( + "/aws/ec2/shared-flowlogs".to_string(), + FlowLogConfig { + log_format: "${version} ${account-id}".to_string(), + flow_log_id: "fl-shared".to_string(), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }, + ); + let snapshot = CacheSnapshot { + by_log_group, + by_bucket: HashMap::new(), + last_refreshed_secs: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + manager.load_cache_snapshot(snapshot).await; + + // Clone should see the same data + let result = manager_clone.get_config("/aws/ec2/shared-flowlogs").await; + assert!(result.is_some()); + assert_eq!(result.unwrap().flow_log_id, "fl-shared"); + } + + /// Verify that two concurrent tasks can call get_config simultaneously. + #[tokio::test] + async fn test_concurrent_get_config() { + use crate::flowlogs::cache::{CacheSnapshot, FlowLogConfig}; + use std::collections::HashMap; + + let config = aws_config::defaults(BehaviorVersion::latest()).load().await; + let ec2_client = Ec2Client::new(&config); + + let manager = FlowLogManager::new(ec2_client, None, None); + + // Seed the cache + let mut by_log_group = HashMap::new(); + for i in 0..5 { + by_log_group.insert( + format!("/aws/ec2/flowlogs-{}", i), + FlowLogConfig { + log_format: "${version} ${account-id}".to_string(), + flow_log_id: format!("fl-concurrent-{}", i), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }, + ); + } + let snapshot = CacheSnapshot { + by_log_group, + by_bucket: HashMap::new(), + last_refreshed_secs: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + manager.load_cache_snapshot(snapshot).await; + + // Spawn five concurrent lookups on cloned handles + let mut handles = Vec::new(); + for i in 0..5 { + let m = manager.clone(); + handles.push(tokio::spawn(async move { + m.get_config(&format!("/aws/ec2/flowlogs-{}", i)).await + })); + } + + for (i, handle) in handles.into_iter().enumerate() { + let result = handle.await.expect("task panicked"); + assert!(result.is_some(), "task {} got None", i); + assert_eq!(result.unwrap().flow_log_id, format!("fl-concurrent-{}", i)); + } + } } diff --git a/src/forward/forwarder.rs b/src/forward/forwarder.rs index 5af953f..33828d0 100644 --- a/src/forward/forwarder.rs +++ b/src/forward/forwarder.rs @@ -72,7 +72,7 @@ impl Forwarder { aws_attributes, &context.request_id, &mut self.tag_manager, - &mut self.flow_log_manager, + &self.flow_log_manager, ); // Parse the logs @@ -141,23 +141,25 @@ impl Forwarder { lambda_runtime::Error::from("S3 client not initialized for S3 event processing") })?; - let parser = s3logs::Parser::new(aws_attributes, &context.request_id, s3_client); + let parser = s3logs::Parser::new( + aws_attributes, + &context.request_id, + s3_client, + &self.flow_log_manager, + ); - // Create a channel so the parse task can stream batches of ResourceLogs to us as each - // S3 object completes, rather than waiting for all objects before forwarding anything. + // Create a channel so per-object tasks can stream batches of ResourceLogs. // Buffer depth matches typical max-parallel-objects (default 5) with some headroom. let (result_tx, mut result_rx) = tokio::sync::mpsc::channel::>(32); - // Spawn the parse task so it runs concurrently with the forwarding loop below. - let parse_handle = tokio::spawn(async move { parser.parse(s3_event, result_tx).await }); + let parse_task = tokio::spawn(parser.parse(s3_event, result_tx)); - // Split into counter + waiter. The drain task starts immediately inside into_parts(), - // so ack senders are never blocked regardless of how many messages are in flight. + // Split into counter + waiter before draining so the drain task starts immediately + // and ack senders are never blocked. let (mut counter, waiter) = AckerBuilder::new(context.request_id.clone()).into_parts(); let mut count: usize = 0; - // Forward each batch to logs_tx as it arrives — this runs concurrently with the parse - // task, giving downstream batching / exporting a head-start before parsing is complete. + // Drain completed batches as they arrive from the spawned parse task. while let Some(logs) = result_rx.recv().await { for log in logs { count += 1; @@ -180,9 +182,8 @@ impl Forwarder { } } - // The result_rx loop above ends only after result_tx is dropped, which happens when the - // parse task exits. Join here to surface any parse errors. - match parse_handle.await { + // result_rx is now closed; collect the parse result. + match parse_task.await { Ok(Ok(())) => {} Ok(Err(e)) => { error!( @@ -198,7 +199,10 @@ impl Forwarder { error = %e, "S3 parse task panicked" ); - return Err(lambda_runtime::Error::from(e.to_string())); + return Err(lambda_runtime::Error::from(format!( + "S3 parse task panicked: {}", + e + ))); } } diff --git a/src/main.rs b/src/main.rs index 4b3302b..d60b8ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -213,7 +213,7 @@ async fn run_forwarder( // Initialize FlowLogManager let ec2_client = aws_sdk_ec2::Client::new(&aws_config); - let mut flow_log_manager = rotel_lambda_forwarder::flowlogs::FlowLogManager::new( + let flow_log_manager = rotel_lambda_forwarder::flowlogs::FlowLogManager::new( ec2_client, s3_client_clone, s3_bucket_clone, diff --git a/src/parse/json.rs b/src/parse/json.rs index 5e5aa07..f41ad04 100644 --- a/src/parse/json.rs +++ b/src/parse/json.rs @@ -13,23 +13,23 @@ use crate::parse::{platform::ParserError, record_parser::RecordParserError}; /// Parse a JSON log entry and return the parsed map /// Returns an error if the message is not valid JSON or not an object pub(crate) fn parse_json_to_map( - msg: String, + msg: &str, ) -> Result, RecordParserError> { // Parse the message as JSON - let json_map: serde_json::Map = match serde_json::from_str(&msg) { + let json_map: serde_json::Map = match serde_json::from_str(msg) { Ok(JsonValue::Object(map)) => map, Ok(_) => { // Not an object return Err(RecordParserError( ParserError::FormatParseError("JSON log entry is not an object".to_string()), - msg, + msg.to_string(), )); } Err(e) => { // Failed to parse JSON return Err(RecordParserError( ParserError::JsonParseError(e.to_string()), - msg, + msg.to_string(), )); } }; diff --git a/src/parse/keyvalue.rs b/src/parse/keyvalue.rs index a6463c3..865afb2 100644 --- a/src/parse/keyvalue.rs +++ b/src/parse/keyvalue.rs @@ -14,14 +14,14 @@ use crate::parse::{platform::ParserError, record_parser::RecordParserError}; /// All values are stored as JsonValue::String /// Returns an error if no valid key-value pairs are found pub(crate) fn parse_keyvalue_to_map( - input: String, + input: &str, ) -> Result, RecordParserError> { - let pairs = parse_keyvalue_pairs(&input); + let pairs = parse_keyvalue_pairs(input); if pairs.is_empty() { return Err(RecordParserError( ParserError::FormatParseError("No valid key-value pairs found".to_string()), - input, + input.to_string(), )); } diff --git a/src/parse/vpclog.rs b/src/parse/vpclog.rs index 5dfcff5..4e2da90 100644 --- a/src/parse/vpclog.rs +++ b/src/parse/vpclog.rs @@ -13,19 +13,38 @@ use std::sync::Arc; use serde_json::Value as JsonValue; -use crate::flowlogs::{ParsedFieldType, ParsedFields}; +use crate::flowlogs::{ParsedField, ParsedFieldType, ParsedFields, get_field_type}; use crate::parse::{platform::ParserError, record_parser::RecordParserError}; +/// Parse a VPC flow log header line into a list of typed fields. +/// +/// The header is the first line of an S3-delivered VPC flow log file and contains +/// space-separated field names, e.g.: +/// +/// ```text +/// version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status +/// ``` +/// +/// Each name is looked up in the field-type map so that numeric fields are given the +/// correct `Int32` / `Int64` type, matching the behaviour of `parse_log_format` for +/// CloudWatch-delivered flow logs. +pub(crate) fn parse_vpclog_header(header: &str) -> Vec { + header + .split_whitespace() + .map(|name| ParsedField::new(name.to_string(), get_field_type(name))) + .collect() +} + /// Parse an EC2 Flow Log record from a string using pre-parsed field names /// Fields with value "-" are excluded from the result /// Returns an error if the record doesn't have the expected number of fields pub(crate) fn parse_vpclog_to_map( - input: String, + input: &str, parsed_fields: Arc, ) -> Result, RecordParserError> { match parsed_fields.as_ref() { ParsedFields::Success(parsed_fields) => { - let field_values = parse_vpclog_fields(&input); + let field_values = parse_vpclog_fields(input); if field_values.len() != parsed_fields.len() { return Err(RecordParserError( @@ -34,7 +53,7 @@ pub(crate) fn parse_vpclog_to_map( parsed_fields.len(), field_values.len() )), - input, + input.to_string(), )); } @@ -92,11 +111,68 @@ mod tests { use super::*; use crate::cwlogs::ParserType; use crate::cwlogs::record_parser::RecordParser; - use crate::flowlogs::parse_log_format; + use crate::flowlogs::{ParsedFieldType, parse_log_format}; use crate::parse::platform::LogPlatform; use aws_lambda_events::cloudwatch_logs::LogEntry; use opentelemetry_proto::tonic::{common::v1::any_value::Value, logs::v1::LogRecord}; + #[test] + fn test_parse_vpclog_header_types() { + let header = "version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status"; + let fields = parse_vpclog_header(header); + + assert_eq!(fields.len(), 14); + + // Spot-check names + assert_eq!(fields[0].field_name, "version"); + assert_eq!(fields[1].field_name, "account-id"); + assert_eq!(fields[13].field_name, "log-status"); + + // Numeric types + assert_eq!(fields[0].field_type, ParsedFieldType::Int32); // version + assert_eq!(fields[5].field_type, ParsedFieldType::Int32); // srcport + assert_eq!(fields[6].field_type, ParsedFieldType::Int32); // dstport + assert_eq!(fields[7].field_type, ParsedFieldType::Int32); // protocol + assert_eq!(fields[8].field_type, ParsedFieldType::Int64); // packets + assert_eq!(fields[9].field_type, ParsedFieldType::Int64); // bytes + assert_eq!(fields[10].field_type, ParsedFieldType::Int64); // start + assert_eq!(fields[11].field_type, ParsedFieldType::Int64); // end + + // String types + assert_eq!(fields[1].field_type, ParsedFieldType::String); // account-id + assert_eq!(fields[2].field_type, ParsedFieldType::String); // interface-id + assert_eq!(fields[3].field_type, ParsedFieldType::String); // srcaddr + assert_eq!(fields[4].field_type, ParsedFieldType::String); // dstaddr + assert_eq!(fields[12].field_type, ParsedFieldType::String); // action + assert_eq!(fields[13].field_type, ParsedFieldType::String); // log-status + } + + #[test] + fn test_parse_vpclog_header_matches_parse_log_format() { + // The header line and the ${field} format string must produce identical ParsedField lists. + let header = "version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status"; + let format = "${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}"; + + let from_header = parse_vpclog_header(header); + let from_format = parse_log_format(format); + + assert_eq!(from_header, from_format); + } + + #[test] + fn test_parse_vpclog_header_unknown_field_defaults_to_string() { + let fields = parse_vpclog_header("some-new-field another-new-field"); + assert_eq!(fields.len(), 2); + assert_eq!(fields[0].field_type, ParsedFieldType::String); + assert_eq!(fields[1].field_type, ParsedFieldType::String); + } + + #[test] + fn test_parse_vpclog_header_empty() { + let fields = parse_vpclog_header(""); + assert!(fields.is_empty()); + } + const DEFAULT_FORMAT: &'static str = "${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}"; /// Test utility: Create a LogEntry from a message string @@ -121,7 +197,7 @@ mod tests { } fn parse_vpclog_to_map_default( - input: String, + input: &str, ) -> Result, RecordParserError> { let dflt_fields = parse_log_format(DEFAULT_FORMAT); parse_vpclog_to_map(input, Arc::new(ParsedFields::Success(dflt_fields))) @@ -157,7 +233,7 @@ mod tests { #[test] fn test_parse_vpclog_to_map_basic_tcp() { let input = "2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK"; - let result = parse_vpclog_to_map_default(input.to_string()); + let result = parse_vpclog_to_map_default(input); assert!(result.is_ok()); let map = result.unwrap(); @@ -226,10 +302,7 @@ mod tests { let log_format = "${version} ${account-id} ${srcaddr} ${dstaddr}"; let input = "2 123456789010 172.31.16.139 172.31.16.21"; let parsed_fields = parse_log_format(log_format); - let result = parse_vpclog_to_map( - input.to_string(), - Arc::new(ParsedFields::Success(parsed_fields)), - ); + let result = parse_vpclog_to_map(input, Arc::new(ParsedFields::Success(parsed_fields))); assert!(result.is_ok()); let map = result.unwrap(); @@ -258,10 +331,7 @@ mod tests { let log_format = "${version} ${account-id} ${srcaddr} ${dstaddr}"; let input = "2 123456789010 - 172.31.16.21"; let parsed_fields = parse_log_format(log_format); - let result = parse_vpclog_to_map( - input.to_string(), - Arc::new(ParsedFields::Success(parsed_fields)), - ); + let result = parse_vpclog_to_map(input, Arc::new(ParsedFields::Success(parsed_fields))); assert!(result.is_ok()); let map = result.unwrap(); @@ -285,7 +355,7 @@ mod tests { #[test] fn test_parse_vpclog_to_map_reject() { let input = "2 123456789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK"; - let result = parse_vpclog_to_map_default(input.to_string()); + let result = parse_vpclog_to_map_default(input); assert!(result.is_ok()); let map = result.unwrap(); @@ -308,7 +378,7 @@ mod tests { fn test_parse_vpclog_to_map_with_dashes() { let input = "2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA"; - let result = parse_vpclog_to_map_default(input.to_string()); + let result = parse_vpclog_to_map_default(input); assert!(result.is_ok()); let map = result.unwrap(); @@ -349,7 +419,7 @@ mod tests { #[test] fn test_parse_vpclog_to_map_icmp() { let input = "2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK"; - let result = parse_vpclog_to_map_default(input.to_string()); + let result = parse_vpclog_to_map_default(input); assert!(result.is_ok()); let map = result.unwrap(); @@ -375,7 +445,7 @@ mod tests { #[test] fn test_parse_vpclog_to_map_ipv6() { let input = "2 123456789010 eni-1235b8ca123456789 2001:db8:1234:a100:8d6e:3477:df66:f105 2001:db8:1234:a102:3304:8879:34cf:4071 34892 22 6 54 8855 1477913708 1477913820 ACCEPT OK"; - let result = parse_vpclog_to_map_default(input.to_string()); + let result = parse_vpclog_to_map_default(input); assert!(result.is_ok()); let map = result.unwrap(); @@ -405,7 +475,7 @@ mod tests { #[test] fn test_parse_vpclog_to_map_invalid_field_count() { let input = "2 123456789010 eni-1235b8ca123456789 172.31.16.139"; - let result = parse_vpclog_to_map_default(input.to_string()); + let result = parse_vpclog_to_map_default(input); assert!(result.is_err()); } @@ -413,7 +483,7 @@ mod tests { #[test] fn test_parse_vpclog_to_map_empty() { let input = ""; - let result = parse_vpclog_to_map_default(input.to_string()); + let result = parse_vpclog_to_map_default(input); assert!(result.is_err()); } @@ -600,7 +670,7 @@ mod tests { ]; for example in examples { - let result = parse_vpclog_to_map_default(example.to_string()); + let result = parse_vpclog_to_map_default(example); assert!(result.is_ok(), "Failed to parse: {}", example); let log_record = parse_log_msg(example); @@ -618,10 +688,7 @@ mod tests { let log_format = "${version} ${account-id} ${srcport} ${dstport}"; let input = "2 123456789010 invalid-port 22"; let parsed_fields = parse_log_format(log_format); - let result = parse_vpclog_to_map( - input.to_string(), - Arc::new(ParsedFields::Success(parsed_fields)), - ); + let result = parse_vpclog_to_map(input, Arc::new(ParsedFields::Success(parsed_fields))); // Should return an error because srcport (Int32) cannot parse "invalid-port" assert!(result.is_err()); @@ -638,10 +705,7 @@ mod tests { let log_format = "${version} ${account-id} ${bytes} ${packets}"; let input = "2 123456789010 not-a-number 100"; let parsed_fields = parse_log_format(log_format); - let result = parse_vpclog_to_map( - input.to_string(), - Arc::new(ParsedFields::Success(parsed_fields)), - ); + let result = parse_vpclog_to_map(input, Arc::new(ParsedFields::Success(parsed_fields))); // Should return an error because bytes (Int64) cannot parse "not-a-number" assert!(result.is_err()); diff --git a/src/s3logs/mod.rs b/src/s3logs/mod.rs index 7d93dcc..058b4ac 100644 --- a/src/s3logs/mod.rs +++ b/src/s3logs/mod.rs @@ -6,6 +6,7 @@ use tokio::task::JoinSet; use tracing::{debug, error}; use crate::aws_attributes::AwsAttributes; +use crate::flowlogs::FlowLogManager; mod json_blob; mod s3record; @@ -37,16 +38,25 @@ pub struct Parser { request_id: String, s3_client: S3Client, config: S3LogsConfig, + /// Shared, cheaply-cloneable flow log manager. No `&mut` needed — the manager + /// serialises access to its internal state with a `tokio::sync::Mutex`. + flow_log_manager: FlowLogManager, } impl Parser { - pub fn new(aws_attributes: &AwsAttributes, request_id: &String, s3_client: &S3Client) -> Self { + pub fn new( + aws_attributes: &AwsAttributes, + request_id: &String, + s3_client: &S3Client, + flow_log_manager: &FlowLogManager, + ) -> Self { let config = Self::load_config(); Self { aws_attributes: aws_attributes.clone(), request_id: request_id.clone(), s3_client: s3_client.clone(), config, + flow_log_manager: flow_log_manager.clone(), } } @@ -72,11 +82,51 @@ impl Parser { /// completes. This enables the caller to pipeline downstream processing (batching, exporting) /// concurrently with ongoing S3 reads. /// + /// Because `FlowLogManager` is now `Clone + Send + Sync`, each spawned task receives its own + /// cloned handle and resolves the VPC flow log configuration concurrently with the actual S3 + /// object download. This removes the serialisation point that previously forced config + /// look-ups to happen before task spawning. + /// /// Returns `Ok(())` once all S3 objects have been processed. Individual object errors are /// logged but do not abort processing of remaining objects. A send error (receiver dropped) /// causes an early return with `Err`. + async fn drain_one_task( + tasks: &mut JoinSet, ParserError>>, + result_tx: &mpsc::Sender>, + total: &mut usize, + request_id: &str, + ) -> Result { + match tasks.join_next().await { + None => Ok(false), + Some(Ok(Ok(logs))) => { + *total += logs.len(); + if result_tx.send(logs).await.is_err() { + error!( + request_id = %request_id, + "Result receiver dropped; aborting S3 parse" + ); + Err(ParserError::ParseError( + "Result receiver dropped".to_string(), + )) + } else { + Ok(true) + } + } + Some(Ok(Err(e))) => { + error!(error = %e, "Failed to process S3 object"); + Err(e) + } + Some(Err(e)) => { + error!(error = %e, "Task panicked while processing S3 object"); + Err(ParserError::ParseError( + "Task panicked while processing S3 object".to_string(), + )) + } + } + } + pub async fn parse( - &self, + self, s3_event: S3Event, result_tx: mpsc::Sender>, ) -> Result<(), ParserError> { @@ -88,78 +138,78 @@ impl Parser { let mut total_resource_logs: usize = 0; - // Process records in parallel with controlled concurrency + // Process records in parallel with controlled concurrency. let mut tasks: JoinSet, ParserError>> = JoinSet::new(); let max_concurrent = self.config.max_parallel_objects; for (idx, record) in s3_event.records.into_iter().enumerate() { + let bucket_name = match record.s3.bucket.name.as_deref() { + Some(b) => b.to_string(), + None => { + return Err(ParserError::ParseError( + "Invalid S3 record - no bucket name".to_string(), + )); + } + }; + let object_key = match record.s3.object.key.as_deref() { + Some(k) => k.to_string(), + None => { + return Err(ParserError::ParseError( + "Invalid S3 record - no object key".to_string(), + )); + } + }; + let s3_client = self.s3_client.clone(); let aws_attributes = self.aws_attributes.clone(); let request_id = self.request_id.clone(); let batch_size = self.config.batch_size; + // Clone the manager handle — cheap (Arc bump), safe to send across tasks. + let flow_log_manager = self.flow_log_manager.clone(); - // Wait for the first task to finish if we've hit the concurrency limit, then - // stream its results immediately rather than accumulating them. + // Wait for a slot to open before spawning the next task. while tasks.len() >= max_concurrent { - match tasks.join_next().await { - Some(Ok(Ok(logs))) => { - total_resource_logs += logs.len(); - if result_tx.send(logs).await.is_err() { - // Receiver was dropped; the caller has given up — stop processing. - error!( - request_id = %self.request_id, - "Result receiver dropped; aborting S3 parse" - ); - return Err(ParserError::ParseError( - "Result receiver dropped".to_string(), - )); - } - } - Some(Ok(Err(e))) => { - error!(error = %e, "Failed to process S3 object"); - } - Some(Err(e)) => { - error!(error = %e, "Task panicked while processing S3 object"); - } - None => break, + if !Self::drain_one_task( + &mut tasks, + &result_tx, + &mut total_resource_logs, + &self.request_id, + ) + .await? + { + break; } } - let s3_record = S3Record::new( - record, - idx, - s3_client, - aws_attributes, - request_id, - batch_size, - ); - - tasks.spawn(async move { s3_record.process().await }); + // Lookup the flow log config + let flow_log_config = flow_log_manager + .get_config_by_bucket(&bucket_name, &object_key) + .await; + + tasks.spawn(async move { + let s3_record = S3Record::new( + record, + idx, + s3_client, + aws_attributes, + request_id, + batch_size, + flow_log_config, + ); + + s3_record.process().await + }); } // Drain remaining tasks, streaming each result as it completes. - while let Some(result) = tasks.join_next().await { - match result { - Ok(Ok(logs)) => { - total_resource_logs += logs.len(); - if result_tx.send(logs).await.is_err() { - error!( - request_id = %self.request_id, - "Result receiver dropped; aborting S3 parse" - ); - return Err(ParserError::ParseError( - "Result receiver dropped".to_string(), - )); - } - } - Ok(Err(e)) => { - error!(error = %e, "Failed to process S3 object"); - } - Err(e) => { - error!(error = %e, "Task panicked while processing S3 object"); - } - } - } + while Self::drain_one_task( + &mut tasks, + &result_tx, + &mut total_resource_logs, + &self.request_id, + ) + .await? + {} debug!( request_id = %self.request_id, @@ -174,6 +224,7 @@ impl Parser { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum ParserType { Json, + VpcLog, Unknown, } diff --git a/src/s3logs/s3record.rs b/src/s3logs/s3record.rs index ca7bff3..f992410 100644 --- a/src/s3logs/s3record.rs +++ b/src/s3logs/s3record.rs @@ -1,10 +1,13 @@ +use std::collections::HashMap; use std::io::Read; +use std::sync::Arc; -use serde_json::Value as JsonValue; use tracing::warn; use aws_lambda_events::s3::S3EventRecord; use aws_sdk_s3::Client as S3Client; +use aws_sdk_s3::error::SdkError; +use aws_sdk_s3::operation::get_object::GetObjectError; use chrono::{DateTime, Utc}; use flate2::read::GzDecoder; use opentelemetry_proto::tonic::{ @@ -14,10 +17,12 @@ use opentelemetry_proto::tonic::{ }; use tracing::debug; +use crate::flowlogs::{FlowLogConfig, ParsedFields}; use crate::parse::json::parse_json_to_map; use crate::parse::platform::LogPlatform; use crate::parse::record_parser::LogBuilder; use crate::parse::utils::string_kv; +use crate::parse::vpclog::{parse_vpclog_header, parse_vpclog_to_map}; use crate::{aws_attributes::AwsAttributes, s3logs::ParserType}; use super::{JsonLogRecords, ParserError}; @@ -30,6 +35,8 @@ pub struct S3Record { aws_attributes: AwsAttributes, request_id: String, batch_size: usize, + /// VPC flow log configuration for the S3 bucket, if applicable. + flow_log_config: Option, } impl S3Record { @@ -41,6 +48,7 @@ impl S3Record { aws_attributes: AwsAttributes, request_id: String, batch_size: usize, + flow_log_config: Option, ) -> Self { Self { record, @@ -49,6 +57,7 @@ impl S3Record { aws_attributes, request_id, batch_size, + flow_log_config, } } @@ -113,6 +122,7 @@ impl S3Record { &self.aws_attributes, &self.request_id, self.batch_size, + self.flow_log_config.as_ref(), )?; Ok(resource_logs) @@ -139,7 +149,33 @@ async fn load_s3_object( .key(key) .send() .await - .map_err(|e| ParserError::S3Error(format!("Failed to get S3 object: {}", e)))?; + .map_err(|e| { + let detail = match &e { + SdkError::ServiceError(svc) => { + let err = svc.err(); + let code = match err { + GetObjectError::NoSuchKey(_) => "NoSuchKey", + _ => err.meta().code().unwrap_or("Unknown"), + }; + format!( + "service error: code={}, message={}", + code, + err.meta().message().unwrap_or("(none)") + ) + } + SdkError::ConstructionFailure(_) => "construction failure".to_string(), + SdkError::TimeoutError(_) => "timeout".to_string(), + SdkError::DispatchFailure(_) => "dispatch failure".to_string(), + SdkError::ResponseError(re) => { + format!("response error: http status={}", re.raw().status()) + } + _ => format!("{}", e), + }; + ParserError::S3Error(format!( + "Failed to get S3 object: bucket={}, key={}, {}", + bucket, key, detail + )) + })?; let body_bytes = response .body @@ -172,6 +208,7 @@ fn decompress_if_needed(data: &[u8], key: &str) -> Result, ParserError> } } +#[allow(clippy::too_many_arguments)] fn parse_log_lines( data: &[u8], event_time: DateTime, @@ -180,6 +217,7 @@ fn parse_log_lines( aws_attributes: &AwsAttributes, request_id: &str, batch_size: usize, + flow_log_config: Option<&FlowLogConfig>, ) -> Result, ParserError> { let content = std::str::from_utf8(data) .map_err(|e| ParserError::EncodingError(format!("Invalid UTF-8: {}", e)))?; @@ -228,23 +266,58 @@ fn parse_log_lines( "Parsing log lines" ); - // Detect log format from key name and content - let (log_platform, parser_type) = detect_log_format(key, &lines); - - debug!( - request_id = %request_id, - platform = ?log_platform, - parser_type = ?parser_type, - "Detected log format" - ); + // If we have a VPC flow log configuration for this bucket, use it directly. + // Otherwise detect format from the key and content. + let (log_platform, parser_type) = if flow_log_config.is_some() && key.contains("vpcflowlogs") { + debug!( + request_id = %request_id, + bucket = %bucket, + "Using VPC flow log parser for S3 object" + ); + (LogPlatform::VpcFlowLog, ParserType::VpcLog) + } else { + let (platform, pt) = detect_log_format(key, &lines); + debug!( + request_id = %request_id, + platform = ?platform, + parser_type = ?pt, + "Detected log format" + ); + (platform, pt) + }; let now_nanos = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_nanos() as u64; + // Build the extra flow-log tags that will be added to resource attributes. + let flow_log_tags: HashMap = + flow_log_config.map(|c| c.tags.clone()).unwrap_or_default(); + + // For VPC flow logs the first line is the column header, e.g.: + // "version account-id interface-id srcaddr dstaddr ..." + // Parse it into typed ParsedFields and treat the remainder as data lines. + let (flow_log_parsed_fields, data_lines): (Option>, &[&str]) = + if parser_type == ParserType::VpcLog { + if let Some((header, rest)) = lines.split_first() { + let fields = parse_vpclog_header(header); + debug!( + request_id = %request_id, + field_count = fields.len(), + "Parsed VPC flow log column header from S3 file" + ); + (Some(Arc::new(ParsedFields::Success(fields))), rest) + } else { + (None, &[]) + } + } else { + (None, &lines) + }; + // Create base resource attributes - let base_attributes = create_base_attributes(bucket, key, aws_attributes, log_platform); + let base_attributes = + create_base_attributes(bucket, key, aws_attributes, log_platform, &flow_log_tags); let builder = LogBuilder::new(log_platform); @@ -254,14 +327,20 @@ fn parse_log_lines( let mut resource_logs_list = Vec::new(); let mut current_batch = Vec::new(); - let lines_count = lines.len(); - for line in lines.into_iter() { + let lines_count = data_lines.len(); + for line in data_lines { + let line = line.trim(); + if line.is_empty() { + continue; + } + let log_record = parse_line( &builder, now_nanos, event_timestamp, - String::from(line), + line, parser_type, + flow_log_parsed_fields.clone(), ); current_batch.push(log_record); @@ -296,46 +375,67 @@ fn parse_log_lines( /// Parse a single log line string into a [`LogRecord`]. /// /// Dispatches based on `parser_type`: -/// * `Json` — parse as a JSON object; on failure the raw string becomes the body. -/// * `Unknown` — auto-detect: attempt JSON for `{`-prefixed lines, otherwise plain text. -/// * Any other type — treat as plain text (S3 log detection never produces `VpcLog` or -/// `KeyValue`). +/// * `VpcLog` — parse as space-separated VPC flow log fields; raw line becomes the body. +/// * `Json` — parse as a JSON object; on failure the raw string becomes the body. +/// * `Unknown` — auto-detect: attempt JSON for `{`-prefixed lines, otherwise plain text. fn parse_line( builder: &LogBuilder, now_nanos: u64, timestamp_ms: i64, - line: String, + line: &str, parser_type: ParserType, + flow_log_parsed_fields: Option>, ) -> opentelemetry_proto::tonic::logs::v1::LogRecord { let mut record_builder = builder.start(now_nanos, timestamp_ms, vec![]); - let map_result: Result>, _> = match parser_type { - ParserType::Json => parse_json_to_map(line.clone()) - .map(Some) - .map_err(|e| (e, line.clone())), - ParserType::Unknown => { - if line.len() > 2 && line.starts_with('{') { - parse_json_to_map(line.clone()) - .map(Some) - .map_err(|e| (e, line.clone())) - } else { - return record_builder.set_body_text(line).finish(); + match parser_type { + ParserType::VpcLog => { + // VPC Flow Logs always preserve the raw line as the body. + // When parsed fields are available, also emit individual fields as attributes. + record_builder = record_builder.set_body_text(line.to_string()); + if let Some(parsed_fields) = flow_log_parsed_fields { + match parse_vpclog_to_map(line, parsed_fields) { + Ok(map) => { + record_builder.populate_from_map(map); + } + Err(e) => { + warn!(error = ?e, "Failed to parse VPC flow log line as structured map"); + } + } } + record_builder.finish() } - }; - match map_result { - Ok(None) => {} - Ok(Some(map)) => { - record_builder.populate_from_map(map); + ParserType::Json => { + match parse_json_to_map(line) { + Ok(map) => { + record_builder.populate_from_map(map); + } + Err(err) => { + warn!(error = ?err, "Failed to parse log line as JSON, using raw text as body"); + return record_builder.set_body_text(line.to_string()).finish(); + } + } + record_builder.finish() } - Err((err, raw)) => { - warn!(error = ?err, "Failed to parse log line, using raw text as body"); - return record_builder.set_body_text(raw).finish(); + + ParserType::Unknown => { + if line.len() > 2 && line.starts_with('{') { + match parse_json_to_map(line) { + Ok(map) => { + record_builder.populate_from_map(map); + } + Err(err) => { + warn!(error = ?err, "Failed to parse log line, using raw text as body"); + return record_builder.set_body_text(line.to_string()).finish(); + } + } + record_builder.finish() + } else { + record_builder.set_body_text(line.to_string()).finish() + } } } - - record_builder.finish() } /// Parse a JSON blob containing a Records array @@ -356,8 +456,9 @@ fn parse_json_blob( // Detect log platform from key (e.g., CloudTrail files) let log_platform = detect_log_platform_from_key(key); - // Create base resource attributes - let base_attributes = create_base_attributes(bucket, key, aws_attributes, log_platform); + // Create base resource attributes (no flow log tags for JSON blob paths) + let base_attributes = + create_base_attributes(bucket, key, aws_attributes, log_platform, &HashMap::new()); debug!( request_id = %request_id, @@ -405,12 +506,16 @@ fn parse_json_blob( Ok(resource_logs_list) } -/// Create base resource attributes for S3 logs +/// Create base resource attributes for S3 logs. +/// +/// When `flow_log_tags` is non-empty the tags are emitted as +/// `ec2.flow-logs.tags.` resource attributes, mirroring the CloudWatch path. fn create_base_attributes( bucket: &str, key: &str, aws_attributes: &AwsAttributes, log_platform: LogPlatform, + flow_log_tags: &HashMap, ) -> Vec { let mut attributes = vec![ string_kv("cloud.provider", "aws"), @@ -425,6 +530,14 @@ fn create_base_attributes( attributes.push(string_kv("cloud.platform", log_platform.as_str())); } + // Add EC2 Flow Log tags as resource attributes (matches CloudWatch behaviour) + for (tag_key, tag_value) in flow_log_tags { + attributes.push(string_kv( + &format!("ec2.flow-logs.tags.{}", tag_key), + tag_value.clone(), + )); + } + attributes } @@ -514,7 +627,18 @@ fn detect_log_platform_from_key(key: &str) -> LogPlatform { #[cfg(test)] mod tests { use super::*; - use opentelemetry_proto::tonic::logs::v1::SeverityNumber; + + fn make_aws_attributes() -> AwsAttributes { + AwsAttributes { + region: "us-east-1".to_string(), + account_id: "123456789012".to_string(), + invoked_function_arn: "arn:aws:lambda:us-east-1:123456789012:function:test".to_string(), + } + } + + // ------------------------------------------------------------------ + // detect_log_format + // ------------------------------------------------------------------ #[test] fn test_detect_log_format_json_from_suffix() { @@ -554,18 +678,17 @@ mod tests { assert_eq!(parser_type, ParserType::Unknown); } + // ------------------------------------------------------------------ + // parse_log_lines (non-VPC) + // ------------------------------------------------------------------ + #[tokio::test] async fn test_parse_log_lines_json() { let log_data = r#"{"level":"info","msg":"test message 1","service":"test"} {"level":"error","msg":"test message 2","service":"test"} {"level":"debug","msg":"test message 3","service":"test"}"#; - let aws_attributes = AwsAttributes { - region: "us-east-1".to_string(), - account_id: "123456789012".to_string(), - invoked_function_arn: "arn:aws:lambda:us-east-1:123456789012:function:test".to_string(), - }; - + let aws_attributes = make_aws_attributes(); let event_time = Utc::now(); let result = parse_log_lines( log_data.as_bytes(), @@ -575,20 +698,17 @@ mod tests { &aws_attributes, "test-request-id", 1000, + None, ); assert!(result.is_ok()); let resource_logs = result.unwrap(); assert_eq!(resource_logs.len(), 1); - - let logs = &resource_logs[0]; - assert_eq!(logs.scope_logs.len(), 1); - assert_eq!(logs.scope_logs[0].log_records.len(), 3); + assert_eq!(resource_logs[0].scope_logs[0].log_records.len(), 3); } #[tokio::test] async fn test_parse_log_lines_batching() { - // Create more lines than batch size to test batching let mut log_lines = Vec::new(); for i in 0..2500 { log_lines.push(format!( @@ -598,12 +718,7 @@ mod tests { } let log_data = log_lines.join("\n"); - let aws_attributes = AwsAttributes { - region: "us-east-1".to_string(), - account_id: "123456789012".to_string(), - invoked_function_arn: "arn:aws:lambda:us-east-1:123456789012:function:test".to_string(), - }; - + let aws_attributes = make_aws_attributes(); let event_time = Utc::now(); let result = parse_log_lines( log_data.as_bytes(), @@ -612,14 +727,13 @@ mod tests { "test-key.log", &aws_attributes, "test-request-id", - 1000, // batch size + 1000, + None, ); assert!(result.is_ok()); let resource_logs = result.unwrap(); - // Should create 3 ResourceLogs: 1000 + 1000 + 500 assert_eq!(resource_logs.len(), 3); - assert_eq!(resource_logs[0].scope_logs[0].log_records.len(), 1000); assert_eq!(resource_logs[1].scope_logs[0].log_records.len(), 1000); assert_eq!(resource_logs[2].scope_logs[0].log_records.len(), 500); @@ -634,12 +748,7 @@ mod tests { ] }"#; - let aws_attributes = AwsAttributes { - region: "us-east-1".to_string(), - account_id: "123456789012".to_string(), - invoked_function_arn: "arn:aws:lambda:us-east-1:123456789012:function:test".to_string(), - }; - + let aws_attributes = make_aws_attributes(); let event_time = Utc::now(); let result = parse_log_lines( json_data.as_bytes(), @@ -649,6 +758,7 @@ mod tests { &aws_attributes, "test-request-id", 1000, + None, ); assert!(result.is_ok()); @@ -656,12 +766,15 @@ mod tests { assert_eq!(resource_logs.len(), 1); assert_eq!(resource_logs[0].scope_logs[0].log_records.len(), 2); - // Verify attributes contain eventName let first_log = &resource_logs[0].scope_logs[0].log_records[0]; let has_event_name = first_log.attributes.iter().any(|kv| kv.key == "eventName"); assert!(has_event_name); } + // ------------------------------------------------------------------ + // detect_log_platform_from_key + // ------------------------------------------------------------------ + #[tokio::test] async fn test_detect_log_platform_from_key() { assert_eq!( @@ -682,17 +795,21 @@ mod tests { ); } + // ------------------------------------------------------------------ + // parse_line (non-VPC) + // ------------------------------------------------------------------ + #[test] fn test_parse_line_json() { let builder = LogBuilder::new(LogPlatform::Unknown); let lr = parse_line( &builder, 123_456_789, - 1000, - r#"{"level":"info","msg":"hello"}"#.to_string(), + 1_000, + r#"{"level":"info","msg":"hello"}"#, ParserType::Json, + None, ); - assert_eq!(lr.severity_number, SeverityNumber::Info as i32); assert!(lr.body.is_some()); } @@ -702,161 +819,414 @@ mod tests { let lr = parse_line( &builder, 123_456_789, - 1000, - r#"{"level":"debug","msg":"auto"}"#.to_string(), + 1_000, + r#"{"level":"info","msg":"hello"}"#, ParserType::Unknown, + None, ); - assert_eq!(lr.severity_number, SeverityNumber::Debug as i32); assert!(lr.body.is_some()); } #[test] fn test_parse_line_plain_text_fallback() { + use opentelemetry_proto::tonic::common::v1::any_value::Value; + let builder = LogBuilder::new(LogPlatform::Unknown); - let msg = "just plain text"; - let lr = parse_line(&builder, 0, 0, msg.to_string(), ParserType::Unknown); + let lr = parse_line( + &builder, + 123_456_789, + 1_000, + "plain text log line", + ParserType::Unknown, + None, + ); if let Some(body) = &lr.body { - if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = - &body.value - { - assert_eq!(s, msg); + if let Some(Value::StringValue(s)) = &body.value { + assert_eq!(s, "plain text log line"); } else { - panic!("expected string body"); + panic!("Expected StringValue body"); } } else { - panic!("body should be set"); + panic!("Expected body"); } } #[test] fn test_parse_line_invalid_json_falls_back_to_plain_text() { + use opentelemetry_proto::tonic::common::v1::any_value::Value; + let builder = LogBuilder::new(LogPlatform::Unknown); - let bad = r#"{ not valid json }"#; - let lr = parse_line(&builder, 0, 0, bad.to_string(), ParserType::Json); + let raw = r#"{"broken json"#.to_string(); + let lr = parse_line(&builder, 123_456_789, 1_000, &raw, ParserType::Json, None); if let Some(body) = &lr.body { - if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = - &body.value - { - assert_eq!(s, bad); + if let Some(Value::StringValue(s)) = &body.value { + assert_eq!(s, &raw); } else { - panic!("expected string body on parse failure"); + panic!("Expected StringValue body"); } } else { - panic!("body should be set on parse failure"); + panic!("Expected body"); } } - #[tokio::test] - async fn test_parse_real_cloudtrail_file() { - // Test with real CloudTrail file structure - let json_data = r#"{"Records":[ - { - "eventVersion":"1.11", - "userIdentity":{ - "type":"AssumedRole", - "principalId":"AROAUCA5KE6IWTHVPZEOT:DatadogAWSIntegration", - "arn":"arn:aws:sts::279234357137:assumed-role/DatadogIntegrationRole/DatadogAWSIntegration", - "accountId":"279234357137", - "accessKeyId":"ASIAUCA5KE6I76GOZKLW" - }, - "eventTime":"2026-02-20T23:20:01Z", - "eventSource":"ecs.amazonaws.com", - "eventName":"ListClusters", - "awsRegion":"us-east-1", - "sourceIPAddress":"44.192.28.4", - "userAgent":"Datadog Botocore/1.40.61", - "requestParameters":null, - "responseElements":null, - "requestID":"3b7eb167-5db2-4cc3-a287-6fff6cbcbf01", - "eventID":"b4fa7f02-2133-4272-94ad-730c55694200", - "readOnly":true, - "eventType":"AwsApiCall", - "managementEvent":true, - "recipientAccountId":"279234357137", - "eventCategory":"Management" - }, - { - "eventVersion":"1.11", - "userIdentity":{ - "type":"AssumedRole", - "principalId":"AROAUCA5KE6IWTHVPZEOT:DatadogAWSIntegration" - }, - "eventTime":"2026-02-20T23:20:39Z", - "eventSource":"rds.amazonaws.com", - "eventName":"DescribeDBInstances", - "awsRegion":"us-east-1", - "sourceIPAddress":"44.192.28.123", - "userAgent":"Datadog Botocore/1.40.61", - "eventID":"6147784f-d134-4e7d-bd44-f0ca28dc5b7f", - "readOnly":true, - "eventType":"AwsApiCall", - "managementEvent":true, - "recipientAccountId":"279234357137", - "eventCategory":"Management" + // ------------------------------------------------------------------ + // VPC flow log via S3 + // ------------------------------------------------------------------ + + #[test] + fn test_parse_line_vpc_log_sets_body_and_attributes() { + use crate::flowlogs::{ParsedField, ParsedFieldType, ParsedFields}; + use opentelemetry_proto::tonic::common::v1::any_value::Value; + use std::sync::Arc; + + let fields = vec![ + ParsedField::new("version".to_string(), ParsedFieldType::Int32), + ParsedField::new("account-id".to_string(), ParsedFieldType::String), + ParsedField::new("interface-id".to_string(), ParsedFieldType::String), + ParsedField::new("srcaddr".to_string(), ParsedFieldType::String), + ParsedField::new("dstaddr".to_string(), ParsedFieldType::String), + ParsedField::new("srcport".to_string(), ParsedFieldType::Int32), + ParsedField::new("dstport".to_string(), ParsedFieldType::Int32), + ParsedField::new("protocol".to_string(), ParsedFieldType::Int32), + ParsedField::new("packets".to_string(), ParsedFieldType::Int64), + ParsedField::new("bytes".to_string(), ParsedFieldType::Int64), + ParsedField::new("start".to_string(), ParsedFieldType::Int64), + ParsedField::new("end".to_string(), ParsedFieldType::Int64), + ParsedField::new("action".to_string(), ParsedFieldType::String), + ParsedField::new("log-status".to_string(), ParsedFieldType::String), + ]; + let parsed_fields = Arc::new(ParsedFields::Success(fields)); + + let line = + "2 123456789012 eni-abc123 10.0.0.1 10.0.0.2 443 52000 6 10 840 1620000000 1620000060 ACCEPT OK" + .to_string(); + + let builder = LogBuilder::new(LogPlatform::VpcFlowLog); + let lr = parse_line( + &builder, + 123_456_789, + 1_000, + &line, + ParserType::VpcLog, + Some(parsed_fields), + ); + + // Body should be the raw line + if let Some(body) = &lr.body { + if let Some(Value::StringValue(s)) = &body.value { + assert_eq!(s, &line); + } else { + panic!("Expected StringValue body"); } - ]}"#; + } else { + panic!("Expected body"); + } - let aws_attributes = AwsAttributes { - region: "us-east-1".to_string(), - account_id: "279234357137".to_string(), - invoked_function_arn: "arn:aws:lambda:us-east-1:279234357137:function:test".to_string(), + // Structured attributes should be present + assert!(lr.attributes.iter().any(|kv| kv.key == "srcaddr")); + assert!(lr.attributes.iter().any(|kv| kv.key == "dstaddr")); + assert!(lr.attributes.iter().any(|kv| kv.key == "action")); + } + + #[test] + fn test_parse_log_lines_vpc_flow_logs_with_tags() { + use crate::flowlogs::FlowLogConfig; + + let log_format = "${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${start} ${end} ${action} ${log-status}"; + + let mut tags = HashMap::new(); + tags.insert("Environment".to_string(), "production".to_string()); + tags.insert("Team".to_string(), "networking".to_string()); + + let flow_log_config = FlowLogConfig { + log_format: log_format.to_string(), + flow_log_id: "fl-s3-test123".to_string(), + tags, + folder_prefix: None, + // No pre-parsed fields — the header line in the file is the source of truth. + parsed_fields: None, }; + let vpc_data = "version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status\n\ + 2 123456789012 eni-abc123 10.0.0.1 10.0.0.2 443 52000 6 10 840 1620000000 1620000060 ACCEPT OK\n\ + 2 123456789012 eni-abc123 10.0.0.2 10.0.0.1 52000 443 6 8 620 1620000060 1620000120 ACCEPT OK\n"; + + let aws_attributes = make_aws_attributes(); let event_time = Utc::now(); + let result = parse_log_lines( - json_data.as_bytes(), + vpc_data.as_bytes(), event_time, - "aws-cloudtrail-logs-279234357137-test", - "AWSLogs/279234357137/CloudTrail/us-east-1/2026/02/20/279234357137_CloudTrail_us-east-1_20260220T2325Z_mdDrHV1NpfjtXf29.json.gz", + "my-flow-logs-bucket", + "AWSLogs/123456789012/vpcflowlogs/us-east-1/2024/01/01/flow.log.gz", &aws_attributes, "test-request-id", 1000, + Some(&flow_log_config), + ); + + assert!(result.is_ok(), "parse_log_lines failed: {:?}", result.err()); + let resource_logs = result.unwrap(); + assert_eq!(resource_logs.len(), 1); + + let rl = &resource_logs[0]; + + // cloud.platform should be set to VPC flow log value + let resource = rl.resource.as_ref().unwrap(); + let platform_attr = resource + .attributes + .iter() + .find(|kv| kv.key == "cloud.platform"); + assert!(platform_attr.is_some(), "cloud.platform attribute missing"); + if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = + &platform_attr.unwrap().value.as_ref().unwrap().value + { + assert_eq!(s, "aws_vpc_flow_log"); + } + + // Flow log tags should appear as resource attributes + let has_env_tag = resource + .attributes + .iter() + .any(|kv| kv.key == "ec2.flow-logs.tags.Environment"); + let has_team_tag = resource + .attributes + .iter() + .any(|kv| kv.key == "ec2.flow-logs.tags.Team"); + assert!(has_env_tag, "ec2.flow-logs.tags.Environment missing"); + assert!(has_team_tag, "ec2.flow-logs.tags.Team missing"); + + // Two log records for two data lines + assert_eq!(rl.scope_logs[0].log_records.len(), 2); + + // Each record should have structured attributes + let first_record = &rl.scope_logs[0].log_records[0]; + assert!( + first_record.attributes.iter().any(|kv| kv.key == "srcaddr"), + "srcaddr attribute missing" + ); + assert!( + first_record.attributes.iter().any(|kv| kv.key == "action"), + "action attribute missing" + ); + } + + #[test] + fn test_parse_log_lines_vpc_header_defines_columns() { + use crate::flowlogs::FlowLogConfig; + + // FlowLogConfig with no pre-parsed fields — the header line must be used instead. + let flow_log_config = FlowLogConfig { + log_format: "${version} ${account-id}".to_string(), + flow_log_id: "fl-hdr-test".to_string(), + tags: HashMap::new(), + folder_prefix: None, + parsed_fields: None, + }; + + // Header line followed by one data line. + let data = "version account-id\n2 123456789012\n"; + let aws_attributes = make_aws_attributes(); + let event_time = Utc::now(); + + let result = parse_log_lines( + data.as_bytes(), + event_time, + "my-bucket", + "AWSLogs/123456789012/vpcflowlogs/us-east-1/2024/01/01/flow.log", + &aws_attributes, + "req-id", + 1000, + Some(&flow_log_config), + ); + + assert!(result.is_ok()); + let resource_logs = result.unwrap(); + // Only the data line should produce a record; the header line is consumed as column info. + assert_eq!(resource_logs[0].scope_logs[0].log_records.len(), 1); + + // Structured attributes should be populated from the header-derived fields. + let record = &resource_logs[0].scope_logs[0].log_records[0]; + assert!( + record.attributes.iter().any(|kv| kv.key == "account-id"), + "account-id attribute should be present" + ); + } + + #[test] + fn test_create_base_attributes_with_flow_log_tags() { + let aws_attributes = make_aws_attributes(); + let mut tags = HashMap::new(); + tags.insert("Env".to_string(), "prod".to_string()); + + let attrs = create_base_attributes( + "my-bucket", + "some/key.log", + &aws_attributes, + LogPlatform::VpcFlowLog, + &tags, + ); + + let has_platform = attrs.iter().any(|kv| { + kv.key == "cloud.platform" + && matches!( + &kv.value.as_ref().unwrap().value, + Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) + if s == "aws_vpc_flow_log" + ) + }); + assert!(has_platform); + + let has_tag = attrs.iter().any(|kv| kv.key == "ec2.flow-logs.tags.Env"); + assert!(has_tag); + } + + #[test] + fn test_create_base_attributes_no_platform_for_unknown() { + let aws_attributes = make_aws_attributes(); + let attrs = create_base_attributes( + "bucket", + "key", + &aws_attributes, + LogPlatform::Unknown, + &HashMap::new(), + ); + let has_platform = attrs.iter().any(|kv| kv.key == "cloud.platform"); + // Unknown platform should not add a cloud.platform attribute + assert!(!has_platform); + } + + // ------------------------------------------------------------------ + // CloudTrail real-file test (carried over from original) + // ------------------------------------------------------------------ + + #[tokio::test] + async fn test_parse_real_cloudtrail_file() { + let cloudtrail_json = r#"{ + "Records": [ + { + "eventVersion": "1.08", + "userIdentity": { + "type": "IAMUser", + "principalId": "AIDACKCEVSQ6C2EXAMPLE", + "arn": "arn:aws:iam::123456789012:user/Alice", + "accountId": "123456789012", + "userName": "Alice" + }, + "eventTime": "2024-01-15T10:30:00Z", + "eventSource": "s3.amazonaws.com", + "eventName": "GetObject", + "awsRegion": "us-east-1", + "sourceIPAddress": "198.51.100.1", + "userAgent": "aws-sdk-go/1.44.0", + "requestParameters": { + "bucketName": "my-bucket", + "key": "my-object" + }, + "responseElements": null, + "requestID": "EXAMPLE123456789", + "eventID": "EXAMPLE-1234-5678-abcd-123456789012", + "readOnly": true, + "resources": [ + { + "ARN": "arn:aws:s3:::my-bucket/my-object", + "accountId": "123456789012", + "type": "AWS::S3::Object" + } + ], + "eventType": "AwsApiCall", + "managementEvent": false, + "recipientAccountId": "123456789012" + }, + { + "eventVersion": "1.08", + "userIdentity": { + "type": "Root", + "principalId": "123456789012", + "arn": "arn:aws:iam::123456789012:root", + "accountId": "123456789012" + }, + "eventTime": "2024-01-15T11:00:00Z", + "eventSource": "signin.amazonaws.com", + "eventName": "ConsoleLogin", + "awsRegion": "us-east-1", + "sourceIPAddress": "203.0.113.5", + "userAgent": "Mozilla/5.0", + "requestParameters": null, + "responseElements": { + "ConsoleLogin": "Success" + }, + "requestID": "EXAMPLE987654321", + "eventID": "EXAMPLE-abcd-ef01-2345-678901234567", + "readOnly": false, + "eventType": "AwsApiCall", + "managementEvent": true, + "recipientAccountId": "123456789012" + } + ] +}"#; + + let aws_attributes = make_aws_attributes(); + let event_time = Utc::now(); + + let result = parse_log_lines( + cloudtrail_json.as_bytes(), + event_time, + "my-cloudtrail-bucket", + "AWSLogs/123456789012/CloudTrail/us-east-1/2024/01/15/123456789012_CloudTrail_us-east-1_20240115T1030Z_abc123.json.gz", + &aws_attributes, + "test-cloudtrail-request", + 1000, + None, ); assert!( result.is_ok(), - "Failed to parse CloudTrail JSON blob: {:?}", + "CloudTrail parse failed: {:?}", result.err() ); let resource_logs = result.unwrap(); assert_eq!(resource_logs.len(), 1); - assert_eq!(resource_logs[0].scope_logs[0].log_records.len(), 2); - // Verify CloudTrail platform was detected - let first_log = &resource_logs[0].scope_logs[0].log_records[0]; + let logs = &resource_logs[0]; + assert_eq!(logs.scope_logs[0].log_records.len(), 2); - // CloudTrail should have body set to "eventType::eventName" - assert!(first_log.body.is_some()); - if let Some(body) = &first_log.body { - if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = - &body.value - { - assert_eq!(s, "AwsApiCall::ListClusters"); - } - } + let resource = logs.resource.as_ref().unwrap(); - // Verify attributes are present - let has_event_source = first_log + // Verify S3 attributes are present + assert!( + resource + .attributes + .iter() + .any(|kv| kv.key == "aws.s3.bucket") + ); + assert!(resource.attributes.iter().any(|kv| kv.key == "aws.s3.key")); + + // Verify cloud.platform is set for CloudTrail + let platform_attr = resource .attributes .iter() - .any(|kv| kv.key == "eventSource"); - let has_event_name = first_log.attributes.iter().any(|kv| kv.key == "eventName"); - let has_event_type = first_log.attributes.iter().any(|kv| kv.key == "eventType"); - let has_aws_region = first_log.attributes.iter().any(|kv| kv.key == "awsRegion"); - - assert!(has_event_source, "Missing eventSource attribute"); - assert!(has_event_name, "Missing eventName attribute"); - assert!(has_event_type, "Missing eventType attribute"); - assert!(has_aws_region, "Missing awsRegion attribute"); - - // Verify second record - let second_log = &resource_logs[0].scope_logs[0].log_records[1]; - if let Some(body) = &second_log.body { - if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = - &body.value - { - assert_eq!(s, "AwsApiCall::DescribeDBInstances"); - } + .find(|kv| kv.key == "cloud.platform"); + assert!(platform_attr.is_some()); + if let Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(s)) = + &platform_attr.unwrap().value.as_ref().unwrap().value + { + assert_eq!(s, "aws_cloudtrail"); } + + // Verify first record has expected CloudTrail fields + let first_record = &logs.scope_logs[0].log_records[0]; + assert!( + first_record + .attributes + .iter() + .any(|kv| kv.key == "eventName") + ); + assert!( + first_record + .attributes + .iter() + .any(|kv| kv.key == "eventSource") + ); } }