Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ be expanded as we verify support for additional services.
| CloudTrail Logs | CloudWatch, S3 |
| EKS Control Plane Logs | CloudWatch |
| Lambda Logs | CloudWatch |
| VPC Flow Logs | CloudWatch |
| VPC Flow Logs | CloudWatch, S3 |

## Deploying

Expand Down Expand Up @@ -569,7 +569,7 @@ from. We plan to make this configurable in the future.

## VPC Flow Logs

The forwarder includes comprehensive support for AWS VPC Flow Logs with the following features:
The forwarder includes support for ingesting AWS VPC Flow Logs written to CloudWatch or S3.

### Automatic Format Detection

Expand Down
10 changes: 5 additions & 5 deletions src/cwlogs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ pub struct Parser<'a> {
aws_attributes: &'a AwsAttributes,
request_id: &'a String,
tag_manager: &'a mut TagManager,
flow_log_manager: &'a mut FlowLogManager,
flow_log_manager: &'a FlowLogManager,
}

impl<'a> Parser<'a> {
pub fn new(
aws_attributes: &'a AwsAttributes,
request_id: &'a String,
tag_manager: &'a mut TagManager,
flow_log_manager: &'a mut FlowLogManager,
flow_log_manager: &'a FlowLogManager,
) -> Self {
Self {
aws_attributes,
Expand Down Expand Up @@ -192,7 +192,7 @@ impl<'a> Parser<'a> {
/// Detects the log platform and parser type based on log group and stream names.
/// Returns (platform, parser_type, optional_flow_log_parsed_fields, flow_log_tags)
async fn detect_log_type(
&mut self,
&self,
log_group_name: &str,
log_stream_name: &str,
) -> (
Expand Down Expand Up @@ -257,13 +257,13 @@ mod tests {
let mut tag_manager = crate::tags::TagManager::new(cw_client, None, None);

let ec2_client = aws_sdk_ec2::Client::new(&config);
let mut flow_log_manager = crate::flowlogs::FlowLogManager::new(ec2_client, None, None);
let flow_log_manager = crate::flowlogs::FlowLogManager::new(ec2_client, None, None);

let mut parser = Parser::new(
&aws_attributes,
&request_id,
&mut tag_manager,
&mut flow_log_manager,
&flow_log_manager,
);
let result = parser.parse(logs_event).await;

Expand Down
33 changes: 9 additions & 24 deletions src/cwlogs/record_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,11 @@ impl RecordParser {
///
/// On parse failure the raw message is preserved as the log body.
pub(crate) fn parse(&self, now_nanos: u64, log_entry: LogEntry) -> LogRecord {
// Seed the record with the CW timestamp and, when non-empty, the entry ID.
let initial_attributes = if !log_entry.id.is_empty() {
vec![string_kv("cloudwatch.id", log_entry.id)]
} else {
vec![]
};

let mut record_builder =
self.builder
.start(now_nanos, log_entry.timestamp, initial_attributes);
let mut record_builder = self.builder.start(
now_nanos,
log_entry.timestamp,
vec![string_kv("cloudwatch.id", log_entry.id)],
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat unrelated, but cleanup I noticed since cloudwatch.id is never empty in this path.


match self.parse_message(log_entry.message) {
ParsedMessage::Map(map) => {
Expand All @@ -101,12 +96,12 @@ impl RecordParser {
/// should do next — no mutation of external state as a side-effect.
fn parse_message(&self, message: String) -> ParsedMessage {
match self.parser_type {
ParserType::Json => match parse_json_to_map(message) {
ParserType::Json => match parse_json_to_map(&message) {
Ok(map) => ParsedMessage::Map(map),
Err(e) => ParsedMessage::Error(e),
},

ParserType::KeyValue => match parse_keyvalue_to_map(message) {
ParserType::KeyValue => match parse_keyvalue_to_map(&message) {
Ok(map) => ParsedMessage::Map(map),
Err(e) => ParsedMessage::Error(e),
},
Expand All @@ -117,7 +112,7 @@ impl RecordParser {
// individual flow-log fields are emitted as attributes.
match self.flow_log_parsed_fields.as_ref() {
Some(parsed_fields) => {
match parse_vpclog_to_map(message.clone(), parsed_fields.clone()) {
match parse_vpclog_to_map(&message, parsed_fields.clone()) {
Ok(map) => ParsedMessage::PlainText(message, Some(map)),
Err(e) => ParsedMessage::Error(e),
}
Expand All @@ -131,7 +126,7 @@ impl RecordParser {
// Auto-detect: attempt JSON for messages that look like objects;
// otherwise treat as opaque plain text.
if message.len() > 2 && message.starts_with('{') {
match parse_json_to_map(message) {
match parse_json_to_map(&message) {
Ok(map) => ParsedMessage::Map(map),
Err(e) => ParsedMessage::Error(e),
}
Expand Down Expand Up @@ -189,16 +184,6 @@ mod tests {
}
}

#[test]
fn test_no_cloudwatch_id_when_empty() {
let parser = RecordParser::new(LogPlatform::Unknown, ParserType::Json, None);
let lr = parser.parse(123_456_789, make_entry("", 1000, r#"{"msg":"hi"}"#));
assert!(
!lr.attributes.iter().any(|kv| kv.key == "cloudwatch.id"),
"cloudwatch.id should be absent when id is empty"
);
}

#[test]
fn test_timestamp_set_from_entry() {
// 1000 ms → 1_000_000_000 ns
Expand Down
Loading
Loading