diff --git a/aw-datastore/src/datastore.rs b/aw-datastore/src/datastore.rs index 253ae3d6..a8553112 100644 --- a/aw-datastore/src/datastore.rs +++ b/aw-datastore/src/datastore.rs @@ -713,13 +713,14 @@ impl DatastoreInstance { Ok(row) } - pub fn get_events( + fn get_events_inner( &mut self, conn: &Connection, bucket_id: &str, starttime_opt: Option>, endtime_opt: Option>, limit_opt: Option, + clip_to_query_range: bool, ) -> Result, DatastoreError> { let bucket = self.get_bucket(bucket_id)?; @@ -774,11 +775,13 @@ impl DatastoreInstance { let mut endtime_ns: i64 = row.get(2)?; let data_str: String = row.get(3)?; - if starttime_ns < starttime_filter_ns { - starttime_ns = starttime_filter_ns - } - if endtime_ns > endtime_filter_ns { - endtime_ns = endtime_filter_ns + if clip_to_query_range { + if starttime_ns < starttime_filter_ns { + starttime_ns = starttime_filter_ns + } + if endtime_ns > endtime_filter_ns { + endtime_ns = endtime_filter_ns + } } let duration_ns = endtime_ns - starttime_ns; @@ -812,6 +815,35 @@ impl DatastoreInstance { Ok(list) } + pub fn get_events( + &mut self, + conn: &Connection, + bucket_id: &str, + starttime_opt: Option>, + endtime_opt: Option>, + limit_opt: Option, + ) -> Result, DatastoreError> { + self.get_events_inner(conn, bucket_id, starttime_opt, endtime_opt, limit_opt, true) + } + + pub fn get_events_unclipped( + &mut self, + conn: &Connection, + bucket_id: &str, + starttime_opt: Option>, + endtime_opt: Option>, + limit_opt: Option, + ) -> Result, DatastoreError> { + self.get_events_inner( + conn, + bucket_id, + starttime_opt, + endtime_opt, + limit_opt, + false, + ) + } + pub fn get_event_count( &self, conn: &Connection, diff --git a/aw-datastore/src/worker.rs b/aw-datastore/src/worker.rs index b116a1f3..aada5a4a 100644 --- a/aw-datastore/src/worker.rs +++ b/aw-datastore/src/worker.rs @@ -69,6 +69,7 @@ pub enum Command { Option>, Option>, Option, + bool, ), GetEventCount(String, Option>, Option>), DeleteEventsById(String, Vec), @@ -267,8 +268,13 @@ impl DatastoreWorker { Err(e) => Err(e), } } - Command::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt) => { - match ds.get_events(tx, &bucketname, starttime_opt, endtime_opt, limit_opt) { + Command::GetEvents(bucketname, starttime_opt, endtime_opt, limit_opt, unclipped) => { + let result = if unclipped { + ds.get_events_unclipped(tx, &bucketname, starttime_opt, endtime_opt, limit_opt) + } else { + ds.get_events(tx, &bucketname, starttime_opt, endtime_opt, limit_opt) + }; + match result { Ok(el) => Ok(Response::EventList(el)), Err(e) => Err(e), } @@ -433,7 +439,37 @@ impl Datastore { endtime_opt: Option>, limit_opt: Option, ) -> Result, DatastoreError> { - let cmd = Command::GetEvents(bucket_id.to_string(), starttime_opt, endtime_opt, limit_opt); + let cmd = Command::GetEvents( + bucket_id.to_string(), + starttime_opt, + endtime_opt, + limit_opt, + false, + ); + let receiver = self.requester.request(cmd).unwrap(); + match receiver.collect().unwrap() { + Ok(r) => match r { + Response::EventList(el) => Ok(el), + _ => panic!("Invalid response"), + }, + Err(e) => Err(e), + } + } + + pub fn get_events_unclipped( + &self, + bucket_id: &str, + starttime_opt: Option>, + endtime_opt: Option>, + limit_opt: Option, + ) -> Result, DatastoreError> { + let cmd = Command::GetEvents( + bucket_id.to_string(), + starttime_opt, + endtime_opt, + limit_opt, + true, + ); let receiver = self.requester.request(cmd).unwrap(); match receiver.collect().unwrap() { Ok(r) => match r { diff --git a/aw-server/src/endpoints/import.rs b/aw-server/src/endpoints/import.rs index 6d4a00af..af2c7b83 100644 --- a/aw-server/src/endpoints/import.rs +++ b/aw-server/src/endpoints/import.rs @@ -3,19 +3,99 @@ use rocket::http::Status; use rocket::serde::json::Json; use rocket::State; +use std::collections::HashSet; use std::sync::Mutex; -use aw_models::BucketsExport; +use aw_models::{BucketsExport, Event}; -use aw_datastore::Datastore; +use aw_datastore::{Datastore, DatastoreError}; use crate::endpoints::{HttpErrorJson, ServerState}; +fn event_identity( + event: &Event, +) -> Result<(chrono::DateTime, i64, String), HttpErrorJson> { + let duration_ns = event.duration.num_nanoseconds().ok_or_else(|| { + HttpErrorJson::new( + Status::InternalServerError, + "Failed to encode event duration for dedup".to_string(), + ) + })?; + let data_json = serde_json::to_string(&event.data).map_err(|e| { + HttpErrorJson::new( + Status::InternalServerError, + format!("Failed to encode event data for dedup: {e}"), + ) + })?; + Ok((event.timestamp, duration_ns, data_json)) +} + fn import(datastore_mutex: &Mutex, import: BucketsExport) -> Result<(), HttpErrorJson> { let datastore = endpoints_get_lock!(datastore_mutex); - for (_bucketname, bucket) in import.buckets { + for (_bucketname, mut bucket) in import.buckets { match datastore.create_bucket(&bucket) { Ok(_) => (), + Err(DatastoreError::BucketAlreadyExists(_)) => { + // Bucket already exists — merge events, skipping duplicates + info!("Bucket '{}' already exists, merging events", bucket.id); + if let Some(events) = bucket.events.take() { + let events_vec = events.take_inner(); + if !events_vec.is_empty() { + // Determine time range of events to import + let start = events_vec.iter().map(|e| e.timestamp).min().unwrap(); + let end = events_vec + .iter() + .map(|e| e.calculate_endtime()) + .max() + .unwrap(); + + // Fetch existing events in that range to detect duplicates. + // Events without an explicit ID would otherwise be inserted as new rows + // via AUTOINCREMENT, silently creating duplicates on re-import. + let existing = datastore + .get_events_unclipped(&bucket.id, Some(start), Some(end), None) + .map_err(|e| { + HttpErrorJson::new( + Status::InternalServerError, + format!( + "Failed to fetch existing events for dedup in '{}': {e:?}", + bucket.id + ), + ) + })?; + + let existing_identities: HashSet<_> = existing + .iter() + .map(event_identity) + .collect::>()?; + + // Filter out events already present (matched by timestamp, duration, data) + let new_events: Vec<_> = events_vec + .into_iter() + .map(|event| Ok((event_identity(&event)?, event))) + .collect::, HttpErrorJson>>()? + .into_iter() + .filter_map(|(identity, event)| { + (!existing_identities.contains(&identity)).then_some(event) + }) + .collect(); + + if !new_events.is_empty() { + if let Err(e) = datastore.insert_events(&bucket.id, &new_events) { + let err_msg = format!( + "Failed to merge events into existing bucket '{}': {e:?}", + bucket.id + ); + warn!("{}", err_msg); + return Err(HttpErrorJson::new( + Status::InternalServerError, + err_msg, + )); + } + } + } + } + } Err(e) => { let err_msg = format!("Failed to import bucket: {e:?}"); warn!("{}", err_msg); diff --git a/aw-server/tests/api.rs b/aw-server/tests/api.rs index c5da04b0..d45b4872 100644 --- a/aw-server/tests/api.rs +++ b/aw-server/tests/api.rs @@ -296,8 +296,39 @@ mod api_tests { .dispatch(); assert_eq!(res.status(), rocket::http::Status::Ok); - // TODO: test more error cases - // Import already existing bucket + // Import already existing bucket with a new event — should merge instead of fail + let res = client + .post("/api/0/import") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .body( + r#"{"buckets": + {"id1": { + "id": "id1", + "type": "type", + "client": "client", + "hostname": "hostname", + "events": [{ + "timestamp":"2000-01-02T00:00:00Z", + "duration":1.0, + "data": {} + }] + }}}"#, + ) + .dispatch(); + assert_eq!(res.status(), rocket::http::Status::Ok); + + // Verify events were merged — bucket should now have 2 events + let res = client + .get("/api/0/buckets/id1/events") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .dispatch(); + assert_eq!(res.status(), rocket::http::Status::Ok); + let events: serde_json::Value = serde_json::from_str(&res.into_string().unwrap()).unwrap(); + assert_eq!(events.as_array().unwrap().len(), 2); + + // Re-import the first event again — should be idempotent (no duplicate created) let res = client .post("/api/0/import") .header(ContentType::JSON) @@ -317,10 +348,56 @@ mod api_tests { }}}"#, ) .dispatch(); - assert_eq!(res.status(), rocket::http::Status::InternalServerError); + assert_eq!(res.status(), rocket::http::Status::Ok); + + // Count should still be 2, not 3 — re-import is idempotent + let res = client + .get("/api/0/buckets/id1/events") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .dispatch(); + assert_eq!(res.status(), rocket::http::Status::Ok); + let events: serde_json::Value = serde_json::from_str(&res.into_string().unwrap()).unwrap(); assert_eq!( - res.into_string().unwrap(), - r#"{"message":"Failed to import bucket: BucketAlreadyExists(\"id1\")"}"# + events.as_array().unwrap().len(), + 2, + "Re-importing the same event should be idempotent" + ); + + // Import a narrower event fully contained within an existing longer event. + // This should be preserved as a distinct event, not dropped by clipped dedup. + let res = client + .post("/api/0/import") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .body( + r#"{"buckets": + {"id1": { + "id": "id1", + "type": "type", + "client": "client", + "hostname": "hostname", + "events": [{ + "timestamp":"2000-01-01T00:00:30Z", + "duration":30.0, + "data": {} + }] + }}}"#, + ) + .dispatch(); + assert_eq!(res.status(), rocket::http::Status::Ok); + + let res = client + .get("/api/0/buckets/id1/events") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .dispatch(); + assert_eq!(res.status(), rocket::http::Status::Ok); + let events: serde_json::Value = serde_json::from_str(&res.into_string().unwrap()).unwrap(); + assert_eq!( + events.as_array().unwrap().len(), + 3, + "Contained event should not be dropped by clipped dedup" ); // Export single created bucket @@ -388,7 +465,11 @@ mod api_tests { let mut buckets = export.buckets; assert_eq!(buckets.len(), 1); let b = buckets.remove("id1").unwrap(); - assert_eq!(b.events.unwrap().take_inner().len(), 1); + assert_eq!( + b.events.unwrap().take_inner().len(), + 3, + "Export should preserve the contained event added during merge testing" + ); assert_eq!(buckets.len(), 0); }