Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ rust-version = "1.91"
[dependencies]
hyper-util = { version = "0.1", features = ["full"] }
hyper = { version = "1", features = ["full"] }
lambda-extension = { git = "https://github.com/streamfold/aws-lambda-rust-runtime", branch = "json-record-types" }
lambda-extension = { version = "1.0.3" }
http-body-util = "0.1.2"
bytes = "1.11.1"
serde = "1"
Expand Down
6 changes: 4 additions & 2 deletions src/lambda/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,12 @@ mod tests {
"record": "[INFO] Hello world, I am an extension!"
}"#;

let as_json: LambdaTelemetryRecord = serde_json::from_str(json_rec).unwrap();
let as_json: LambdaTelemetryRecord<serde_json::Value> =
serde_json::from_str(json_rec).unwrap();
println!("as json: {:?}", as_json);

let as_str: LambdaTelemetryRecord = serde_json::from_str(str_rec).unwrap();
let as_str: LambdaTelemetryRecord<serde_json::Value> =
serde_json::from_str(str_rec).unwrap();
println!("as str: {:?}", as_str);
}

Expand Down
12 changes: 7 additions & 5 deletions src/lambda/telemetry_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use tokio_util::sync::CancellationToken;
use tower::{BoxError, Service, ServiceBuilder};
use tracing::{debug, error, warn};

type JsonLambdaTelemetry = LambdaTelemetry<serde_json::Value>;

// We don't want to create a logging loop, so limit how often we log
// failures in certain code paths that may loop.
const LOG_LIMIT_INTERVAL_SECS: u64 = 60;
Expand All @@ -53,7 +55,7 @@ impl TelemetryAPI {
// todo: abstract this with the server code in the otlp http receiver
pub async fn run(
self,
bus_tx: BoundedSender<LambdaTelemetry>,
bus_tx: BoundedSender<JsonLambdaTelemetry>,
cancellation: CancellationToken,
) -> Result<(), BoxError> {
let resource = resource_from_env();
Expand Down Expand Up @@ -119,14 +121,14 @@ impl TelemetryAPI {
#[derive(Clone)]
pub struct TelemetryService {
resource: Resource,
bus_tx: BoundedSender<LambdaTelemetry>,
bus_tx: BoundedSender<JsonLambdaTelemetry>,
logs_tx: BoundedSender<Message<ResourceLogs>>,
}

impl TelemetryService {
fn new(
resource: Resource,
bus_tx: BoundedSender<LambdaTelemetry>,
bus_tx: BoundedSender<JsonLambdaTelemetry>,
logs_tx: BoundedSender<Message<ResourceLogs>>,
) -> Self {
Self {
Expand Down Expand Up @@ -182,7 +184,7 @@ where
}

async fn handle_request<H>(
bus_tx: BoundedSender<LambdaTelemetry>,
bus_tx: BoundedSender<JsonLambdaTelemetry>,
logs_tx: BoundedSender<Message<ResourceLogs>>,
resource: Resource,
body: H,
Expand All @@ -193,7 +195,7 @@ where
{
let buf = body.collect().await.unwrap().to_bytes();

let events: Vec<LambdaTelemetry> = serde_json::from_slice(&buf.to_vec())
let events: Vec<JsonLambdaTelemetry> = serde_json::from_slice(&buf.to_vec())
.map_err(|e| format!("unable to parse telemetry events from json: {}", e))?;

let mut log_events = vec![];
Expand Down
Loading