diff --git a/src/compact/worker.rs b/src/compact/worker.rs index c1f7de8..1a84810 100644 --- a/src/compact/worker.rs +++ b/src/compact/worker.rs @@ -26,7 +26,9 @@ use tracing::{error, info, warn}; use crate::compact::planner::{CompactionPlan, CompactionPlanner}; use crate::ingest::flusher::build_segment_meta; use crate::model::event::UsageEvent; +use crate::runtime::recovery::compute_event_hashes; use crate::runtime::state::AppState; +use crate::storage::dedupe_index::{index_path, write_dedupe_index, DedupeEntry}; use crate::storage::manifest::ReplacementRecord; use crate::storage::segment_reader::RawSegmentReader; use crate::storage::segment_writer::RawSegmentWriter; @@ -150,6 +152,21 @@ impl CompactionWorker { }; let new_meta = build_segment_meta(&output_id, &deduped, plan.bucket, checksum); + // Write the dedupe sidecar for the compacted output too — same + // recovery speedup applies. Non-fatal on failure. + let idx_entries: Vec = deduped + .iter() + .map(|e| { + let (id_h, p_h) = compute_event_hashes(e); + (id_h, p_h, e.ingested_at_ms) + }) + .collect(); + let idx = index_path(db_root, &output_id); + if let Err(e) = write_dedupe_index(&idx, &idx_entries) { + warn!("compaction: dedupe sidecar write for {} failed: {}", output_id, e); + let _ = std::fs::remove_file(&idx); + } + // Atomically swap old → new in the manifest. let committed = { let mut manifest = self.state.manifest.write().await; diff --git a/src/ingest/flusher.rs b/src/ingest/flusher.rs index 1eb9a3c..07d25fa 100644 --- a/src/ingest/flusher.rs +++ b/src/ingest/flusher.rs @@ -3,8 +3,10 @@ use tokio::sync::mpsc; use tracing::{info, error}; use crate::model::event::UsageEvent; use crate::model::ids::{AccountId, bucket_for_account}; +use crate::storage::dedupe_index::{index_path, write_dedupe_index, DedupeEntry}; use crate::storage::segment_writer::RawSegmentWriter; use crate::storage::manifest::{SegmentMeta, SegmentKind}; +use crate::runtime::recovery::compute_event_hashes; use crate::ingest::wal::Wal; use std::collections::{BTreeMap, HashSet}; use std::path::PathBuf; @@ -174,11 +176,16 @@ impl FlusherWorker { Ok(w) => w, Err(e) => return Err(format!("create segment for bucket {}: {}", bucket, e)), }; + let mut index_entries: Vec = Vec::with_capacity(sorted.len()); for event in &sorted { if let Err(e) = writer.write_event(event) { let _ = std::fs::remove_file(&path); return Err(format!("write event to bucket {} segment {}: {}", bucket, segment_id, e)); } + // Build the per-row dedupe sidecar entry so recovery can + // skip the bincode + zstd decode of the event column. + let (id_hash, payload_hash) = compute_event_hashes(event); + index_entries.push((id_hash, payload_hash, event.ingested_at_ms)); } let (_row_count, checksum) = match writer.finish() { Ok(t) => t, @@ -188,6 +195,18 @@ impl FlusherWorker { } }; + // Write the dedupe sidecar. Failure here is non-fatal — + // recovery falls back to scanning the segment — but we log it + // and clean up so it doesn't leave a half-written file behind. + let idx_path = index_path(&self.state.config.db_root, &segment_id); + if let Err(e) = write_dedupe_index(&idx_path, &index_entries) { + tracing::warn!( + "flusher: dedupe sidecar write for {} failed: {} — segment is fine, recovery will scan", + segment_id, e + ); + let _ = std::fs::remove_file(&idx_path); + } + Ok((build_segment_meta(&segment_id, &sorted, bucket, checksum), path)) } } diff --git a/src/runtime/recovery.rs b/src/runtime/recovery.rs index 4032347..dcb9d6e 100644 --- a/src/runtime/recovery.rs +++ b/src/runtime/recovery.rs @@ -6,6 +6,7 @@ use crate::ingest::wal::Wal; use crate::ingest::dedupe::{HotDedupe, DEFAULT_TTL_MS}; use crate::ingest::memtable::Memtable; use crate::model::event::UsageEvent; +use crate::storage::dedupe_index::{index_path, read_dedupe_index}; use crate::storage::segment_reader::RawSegmentReader; use std::io::Result as IoResult; use tracing::{info, warn}; @@ -101,6 +102,12 @@ impl Recovery { let cutoff = now_ms_recovery().saturating_sub(DEFAULT_TTL_MS); let mut segments_scanned = 0usize; let mut events_registered = 0usize; + // Track how many segments used the fast sidecar path vs the + // segment-scan fallback, so we can spot when sidecars are + // missing (e.g., after upgrading from a build that didn't write them). + let mut sidecar_hits = 0usize; + let mut sidecar_misses = 0usize; + for seg in &manifest.raw_segments { if seg.max_timestamp_ms < cutoff { continue; @@ -113,6 +120,32 @@ impl Recovery { ); continue; } + + // Fast path: try the sidecar `.idx` file. It carries the + // same (id_hash, payload_hash, ingested_at_ms) triples in + // segment row order, without bincode-decoding the event + // column. Falls through to the slow path on missing or + // corrupt sidecar. + let idx = index_path(&self.db_root, &seg.segment_id); + let sidecar = match read_dedupe_index(&idx) { + Ok(s) => s, + Err(e) => { + warn!("recovery: dedupe sidecar for {} unusable ({}); falling back to scan", seg.segment_id, e); + None + } + }; + if let Some(entries) = sidecar { + for (id_hash, payload_hash, ingested_at_ms) in entries { + dedupe.insert_known(id_hash, payload_hash, ingested_at_ms); + events_registered += 1; + } + sidecar_hits += 1; + segments_scanned += 1; + continue; + } + + // Slow path: open the segment and rehash each event. + sidecar_misses += 1; match RawSegmentReader::new(path) { Ok(mut reader) => { loop { @@ -134,6 +167,12 @@ impl Recovery { Err(e) => warn!("recovery: failed to open segment {}: {}", seg.segment_id, e), } } + if segments_scanned > 0 { + info!( + "Dedupe rebuild: sidecar hits {}, fallback scans {}", + sidecar_hits, sidecar_misses + ); + } if segments_scanned > 0 { info!( "Dedupe rebuild: scanned {} recent segments, registered {} events", diff --git a/src/storage/dedupe_index.rs b/src/storage/dedupe_index.rs new file mode 100644 index 0000000..83e4b96 --- /dev/null +++ b/src/storage/dedupe_index.rs @@ -0,0 +1,126 @@ +//! Per-segment sidecar dedupe index. +//! +//! Each raw segment `raw_.seg` gets a companion `raw_.idx` +//! file holding `Vec<(event_id_hash, payload_hash, ingested_at_ms)>` in +//! the same row order. Recovery uses this to rebuild the hot dedupe +//! cache without decompressing + bincode-decoding the full event +//! column — a 10–100× speedup on segments with rich payloads. +//! +//! The sidecar is an *optimization*: if missing or corrupt, recovery +//! falls back to scanning the segment file with `RawSegmentReader` and +//! recomputing hashes. So the format change is non-breaking — old +//! segments without an `.idx` still recover correctly, just slower. +//! +//! File layout (the entire file): +//! - magic b"UDBIDX01" (8 bytes) +//! - count u32 LE (number of entries) +//! - entries `Vec<(u128, u128, i64)>` bincode-serialized +//! - checksum u64 LE (low 8 bytes of blake3 over above) +//! +//! Corruption is detected on read; a checksum mismatch causes the +//! caller to fall back to the segment scan. + +use std::fs::File; +use std::io::{Read, Write}; +use std::path::{Path, PathBuf}; + +use crate::ingest::dedupe::EventHash; +use crate::storage::segment_format::checksum; + +pub const MAGIC: &[u8; 8] = b"UDBIDX01"; + +/// `(event_id_hash, payload_hash, ingested_at_ms)` for one event in +/// segment row order. +pub type DedupeEntry = (EventHash, EventHash, i64); + +/// Sidecar `.idx` path for a segment. +pub fn index_path(db_root: &Path, segment_id: &str) -> PathBuf { + db_root.join(format!("{}.idx", segment_id)) +} + +/// Write the index next to the segment file. Caller is responsible for +/// ordering this relative to the manifest commit — typically: write +/// segment → write index → commit manifest. If the manifest commit +/// fails, both files are orphaned; recovery cleans them up by ignoring +/// segments not in the manifest. +pub fn write_dedupe_index(path: &Path, entries: &[DedupeEntry]) -> std::io::Result<()> { + let body = bincode::serialize(entries) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + + let mut buf: Vec = Vec::with_capacity(MAGIC.len() + 4 + body.len() + 8); + buf.extend_from_slice(MAGIC); + buf.extend_from_slice(&(entries.len() as u32).to_le_bytes()); + buf.extend_from_slice(&body); + let cs = checksum(&buf); + buf.extend_from_slice(&cs.to_le_bytes()); + + let mut file = File::create(path)?; + file.write_all(&buf)?; + file.sync_all()?; + Ok(()) +} + +/// Read the sidecar. Returns `Ok(None)` if the file is missing (callers +/// fall back to segment scanning). Returns `Err` for corruption / +/// truncation / checksum mismatch — also a signal to fall back. +pub fn read_dedupe_index(path: &Path) -> std::io::Result>> { + if !path.exists() { + return Ok(None); + } + let mut file = File::open(path)?; + let mut bytes = Vec::new(); + file.read_to_end(&mut bytes)?; + + const MIN_LEN: usize = 8 + 4 + 8; // magic + count + checksum + if bytes.len() < MIN_LEN { + return Err(corrupt("file too small")); + } + if &bytes[..MAGIC.len()] != MAGIC { + return Err(corrupt("missing magic")); + } + let cs_off = bytes.len() - 8; + let stored_cs = u64::from_le_bytes([ + bytes[cs_off], + bytes[cs_off + 1], + bytes[cs_off + 2], + bytes[cs_off + 3], + bytes[cs_off + 4], + bytes[cs_off + 5], + bytes[cs_off + 6], + bytes[cs_off + 7], + ]); + let body = &bytes[..cs_off]; + let computed_cs = checksum(body); + if computed_cs != stored_cs { + return Err(corrupt(&format!( + "checksum mismatch (stored {:#x}, computed {:#x})", + stored_cs, computed_cs + ))); + } + + let count = u32::from_le_bytes([ + bytes[MAGIC.len()], + bytes[MAGIC.len() + 1], + bytes[MAGIC.len() + 2], + bytes[MAGIC.len() + 3], + ]) as usize; + + let entries_bytes = &bytes[MAGIC.len() + 4..cs_off]; + let entries: Vec = bincode::deserialize(entries_bytes) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + if entries.len() != count { + return Err(corrupt(&format!( + "entry count mismatch (header={}, decoded={})", + count, + entries.len() + ))); + } + Ok(Some(entries)) +} + +fn corrupt(msg: &str) -> std::io::Error { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("corrupt dedupe index: {}", msg), + ) +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index ec51dec..d0b1f41 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,6 +1,7 @@ // Storage module pub mod columns; pub mod compression; +pub mod dedupe_index; pub mod encoding; pub mod manifest; pub mod segment_format; diff --git a/tests/dedupe_index.rs b/tests/dedupe_index.rs new file mode 100644 index 0000000..0fd69a8 --- /dev/null +++ b/tests/dedupe_index.rs @@ -0,0 +1,220 @@ +//! Tests for the persistent dedupe sidecar (`raw_.idx`): +//! - Round-trip: write entries, read them back identically +//! - Missing file → Ok(None) (signal to fall back to segment scan) +//! - Corruption (bad magic / checksum / count) → Err (also a fallback signal) +//! - Recovery prefers the sidecar when present +//! - Recovery falls back to segment scan when the sidecar is absent +//! - The flusher writes a sidecar alongside every segment + +use std::path::PathBuf; + +use usagedb::ingest::flusher::build_segment_meta; +use usagedb::ingest::dedupe::{DedupeResult, EventHash}; +use usagedb::model::dimensions::SmallDimensions; +use usagedb::model::event::{EventKind, UsageEvent}; +use usagedb::model::ids::{ + AccountId, EventId, MeterId, ModelId, ProductId, SourceId, SubscriptionId, Unit, +}; +use usagedb::runtime::recovery::{compute_event_hashes, Recovery}; +use usagedb::storage::dedupe_index::{ + index_path, read_dedupe_index, write_dedupe_index, DedupeEntry, +}; +use usagedb::storage::manifest::Manifest; +use usagedb::storage::segment_writer::RawSegmentWriter; + +fn tmp_root() -> PathBuf { + let dir = tempfile::tempdir().expect("tempdir"); + let p = dir.path().to_path_buf(); + std::mem::forget(dir); + p +} + +fn make_event(id: &str, account: &str, ts: i64, qty: i128) -> UsageEvent { + UsageEvent { + event_id: EventId(id.to_string()), + kind: EventKind::Usage, + correction_ref: None, + account_id: AccountId(account.to_string()), + subscription_id: Some(SubscriptionId("sub_1".into())), + product_id: ProductId("ai_gateway".into()), + meter_id: MeterId("tokens.input".into()), + timestamp_ms: ts, + quantity: qty, + unit: Unit("token".into()), + source: SourceId("test".into()), + model_id: Some(ModelId("m1".into())), + dimensions: SmallDimensions::default(), + ingested_at_ms: ts, + } +} + +// ========================================================================= +// Direct sidecar API +// ========================================================================= + +#[test] +fn write_then_read_round_trips_entries() { + let dir = tmp_root(); + let path = dir.join("test.idx"); + let entries: Vec = vec![ + (1u128, 100u128, 1000), + (2u128, 200u128, 1001), + (u128::MAX, 0u128, i64::MAX), + ]; + write_dedupe_index(&path, &entries).unwrap(); + let read = read_dedupe_index(&path).unwrap().expect("sidecar present"); + assert_eq!(read, entries); +} + +#[test] +fn missing_sidecar_returns_ok_none() { + let dir = tmp_root(); + let path = dir.join("absent.idx"); + let result = read_dedupe_index(&path).unwrap(); + assert!(result.is_none()); +} + +#[test] +fn corrupt_checksum_is_rejected() { + let dir = tmp_root(); + let path = dir.join("tamper.idx"); + write_dedupe_index(&path, &[(1u128, 2u128, 3i64)]).unwrap(); + + // Flip a byte in the entries section (between magic+count and checksum). + let mut bytes = std::fs::read(&path).unwrap(); + let pos = 8 + 4 + 4; // past magic + count + a few bytes + bytes[pos] ^= 0xFF; + std::fs::write(&path, bytes).unwrap(); + + let err = read_dedupe_index(&path).unwrap_err(); + assert!( + err.to_string().contains("checksum mismatch") + || err.to_string().contains("corrupt dedupe index"), + "{}", + err + ); +} + +#[test] +fn truncated_sidecar_is_rejected() { + let dir = tmp_root(); + let path = dir.join("trunc.idx"); + write_dedupe_index(&path, &[(1u128, 2u128, 3i64)]).unwrap(); + let bytes = std::fs::read(&path).unwrap(); + std::fs::write(&path, &bytes[..bytes.len() - 4]).unwrap(); + + let err = read_dedupe_index(&path).unwrap_err(); + assert!(err.to_string().contains("corrupt dedupe index"), "{}", err); +} + +#[test] +fn missing_magic_is_rejected() { + let dir = tmp_root(); + let path = dir.join("nomagic.idx"); + std::fs::write(&path, b"not the right header at all").unwrap(); + let err = read_dedupe_index(&path).unwrap_err(); + assert!(err.to_string().contains("magic"), "{}", err); +} + +// ========================================================================= +// Recovery integration +// ========================================================================= + +/// Write a segment + matching sidecar directly, then run recovery and +/// assert that dedupe picked up the entries via the fast path (sidecar) +/// rather than scanning the segment. +#[test] +fn recovery_uses_sidecar_to_rebuild_dedupe() { + use usagedb::runtime::config::Config; + + let root = tmp_root(); + let config = Config { db_root: root.clone(), default_bucket_count: 2, ..Config::default() }; + std::fs::create_dir_all(&config.db_root).unwrap(); + + // Write a recent segment + index manually. + let now = chrono::Utc::now().timestamp_millis(); + let events = vec![ + make_event("e1", "acc_r", now - 1000, 10), + make_event("e2", "acc_r", now - 500, 20), + ]; + let segment_id = format!("raw_{}", uuid::Uuid::new_v4().simple()); + let seg_path = root.join(format!("{}.seg", segment_id)); + let mut w = RawSegmentWriter::new(seg_path).unwrap(); + for e in &events { w.write_event(e).unwrap(); } + let (_rows, checksum) = w.finish().unwrap(); + + // Sidecar matches segment row order. + let entries: Vec = events + .iter() + .map(|e| { + let (id_h, p_h) = compute_event_hashes(e); + (id_h, p_h, e.ingested_at_ms) + }) + .collect(); + write_dedupe_index(&index_path(&root, &segment_id), &entries).unwrap(); + + // Add the segment to the manifest. + let meta = build_segment_meta(&segment_id, &events, 0, checksum); + let mut manifest = Manifest { bucket_count: 2, ..Manifest::default() }; + manifest.raw_segments.push(meta); + manifest.save(&root).unwrap(); + + // Run recovery and verify dedupe is populated. + let recovery = Recovery::new(root.clone()); + let result = recovery.run_startup_recovery(1000).unwrap(); + let mut dedupe = result.dedupe; + for e in &events { + let (id_h, p_h) = compute_event_hashes(e); + // Re-inserting the exact same event should be flagged as duplicate. + assert_eq!( + dedupe.check_and_insert(id_h, p_h), + DedupeResult::ExactDuplicate, + "event {} should already be in dedupe via sidecar", + e.event_id.0 + ); + } +} + +/// Same setup as above but with NO sidecar — recovery must fall back +/// to scanning the segment file and still rebuild dedupe correctly. +#[test] +fn recovery_falls_back_to_segment_scan_without_sidecar() { + use usagedb::runtime::config::Config; + + let root = tmp_root(); + let config = Config { db_root: root.clone(), default_bucket_count: 2, ..Config::default() }; + std::fs::create_dir_all(&config.db_root).unwrap(); + + let now = chrono::Utc::now().timestamp_millis(); + let events = vec![make_event("e1", "acc_f", now - 1000, 10)]; + let segment_id = format!("raw_{}", uuid::Uuid::new_v4().simple()); + let seg_path = root.join(format!("{}.seg", segment_id)); + let mut w = RawSegmentWriter::new(seg_path).unwrap(); + for e in &events { w.write_event(e).unwrap(); } + let (_rows, checksum) = w.finish().unwrap(); + // Deliberately do NOT write a sidecar. + + let meta = build_segment_meta(&segment_id, &events, 0, checksum); + let mut manifest = Manifest { bucket_count: 2, ..Manifest::default() }; + manifest.raw_segments.push(meta); + manifest.save(&root).unwrap(); + + let recovery = Recovery::new(root.clone()); + let result = recovery.run_startup_recovery(1000).unwrap(); + let mut dedupe = result.dedupe; + let (id_h, p_h) = compute_event_hashes(&events[0]); + assert_eq!( + dedupe.check_and_insert(id_h, p_h), + DedupeResult::ExactDuplicate, + "fallback scan should have populated dedupe" + ); +} + +// ========================================================================= +// EventHash typed correctly (compile-time sanity) +// ========================================================================= + +#[test] +fn dedupe_entry_uses_event_hash_type() { + let _entry: DedupeEntry = (0u128 as EventHash, 0u128 as EventHash, 0i64); +}