diff --git a/Cargo.lock b/Cargo.lock index 3590693..d4e1692 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1222,11 +1222,11 @@ dependencies = [ [[package]] name = "lambda-extension" -version = "0.11.0" -source = "git+https://github.com/streamfold/aws-lambda-rust-runtime?branch=json-record-types#c0dedcf149593ffb17d25097d9e01f0e404d88d8" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f2c5885912a339367442b38e5f9cd99938f5d5fc39fbe81fefc0079a08158a2" dependencies = [ "async-stream", - "bytes", "chrono", "http", "http-body-util", @@ -1243,8 +1243,9 @@ dependencies = [ [[package]] name = "lambda_runtime_api_client" -version = "0.11.1" -source = "git+https://github.com/streamfold/aws-lambda-rust-runtime?branch=json-record-types#c0dedcf149593ffb17d25097d9e01f0e404d88d8" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4873061514cb57ffb6a599b77c46c65d6d783efe9bad8fd56b7cba7f0459ef" dependencies = [ "bytes", "futures-channel", @@ -1254,9 +1255,7 @@ dependencies = [ "http-body-util", "hyper", "hyper-util", - "tokio", "tower", - "tower-service", "tracing", "tracing-subscriber", ] diff --git a/Cargo.toml b/Cargo.toml index fdab14d..cd7f5e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ rust-version = "1.91" [dependencies] hyper-util = { version = "0.1", features = ["full"] } hyper = { version = "1", features = ["full"] } -lambda-extension = { git = "https://github.com/streamfold/aws-lambda-rust-runtime", branch = "json-record-types" } +lambda-extension = { version = "1.0.3" } http-body-util = "0.1.2" bytes = "1.11.1" serde = "1" diff --git a/src/lambda/logs.rs b/src/lambda/logs.rs index 71812f5..4531379 100644 --- a/src/lambda/logs.rs +++ b/src/lambda/logs.rs @@ -188,10 +188,12 @@ mod tests { "record": "[INFO] Hello world, I am an extension!" }"#; - let as_json: LambdaTelemetryRecord = serde_json::from_str(json_rec).unwrap(); + let as_json: LambdaTelemetryRecord = + serde_json::from_str(json_rec).unwrap(); println!("as json: {:?}", as_json); - let as_str: LambdaTelemetryRecord = serde_json::from_str(str_rec).unwrap(); + let as_str: LambdaTelemetryRecord = + serde_json::from_str(str_rec).unwrap(); println!("as str: {:?}", as_str); } diff --git a/src/lambda/telemetry_api.rs b/src/lambda/telemetry_api.rs index 2772c69..68d194a 100644 --- a/src/lambda/telemetry_api.rs +++ b/src/lambda/telemetry_api.rs @@ -31,6 +31,8 @@ use tokio_util::sync::CancellationToken; use tower::{BoxError, Service, ServiceBuilder}; use tracing::{debug, error, warn}; +type JsonLambdaTelemetry = LambdaTelemetry; + // We don't want to create a logging loop, so limit how often we log // failures in certain code paths that may loop. const LOG_LIMIT_INTERVAL_SECS: u64 = 60; @@ -53,7 +55,7 @@ impl TelemetryAPI { // todo: abstract this with the server code in the otlp http receiver pub async fn run( self, - bus_tx: BoundedSender, + bus_tx: BoundedSender, cancellation: CancellationToken, ) -> Result<(), BoxError> { let resource = resource_from_env(); @@ -119,14 +121,14 @@ impl TelemetryAPI { #[derive(Clone)] pub struct TelemetryService { resource: Resource, - bus_tx: BoundedSender, + bus_tx: BoundedSender, logs_tx: BoundedSender>, } impl TelemetryService { fn new( resource: Resource, - bus_tx: BoundedSender, + bus_tx: BoundedSender, logs_tx: BoundedSender>, ) -> Self { Self { @@ -182,7 +184,7 @@ where } async fn handle_request( - bus_tx: BoundedSender, + bus_tx: BoundedSender, logs_tx: BoundedSender>, resource: Resource, body: H, @@ -193,7 +195,7 @@ where { let buf = body.collect().await.unwrap().to_bytes(); - let events: Vec = serde_json::from_slice(&buf.to_vec()) + let events: Vec = serde_json::from_slice(&buf.to_vec()) .map_err(|e| format!("unable to parse telemetry events from json: {}", e))?; let mut log_events = vec![];