Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/compact/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use tracing::{error, info, warn};
use crate::compact::planner::{CompactionPlan, CompactionPlanner};
use crate::ingest::flusher::build_segment_meta;
use crate::model::event::UsageEvent;
use crate::runtime::recovery::compute_event_hashes;
use crate::runtime::state::AppState;
use crate::storage::dedupe_index::{index_path, write_dedupe_index, DedupeEntry};
use crate::storage::manifest::ReplacementRecord;
use crate::storage::segment_reader::RawSegmentReader;
use crate::storage::segment_writer::RawSegmentWriter;
Expand Down Expand Up @@ -150,6 +152,21 @@ impl CompactionWorker {
};
let new_meta = build_segment_meta(&output_id, &deduped, plan.bucket, checksum);

// Write the dedupe sidecar for the compacted output too — same
// recovery speedup applies. Non-fatal on failure.
let idx_entries: Vec<DedupeEntry> = deduped
.iter()
.map(|e| {
let (id_h, p_h) = compute_event_hashes(e);
(id_h, p_h, e.ingested_at_ms)
})
.collect();
let idx = index_path(db_root, &output_id);
if let Err(e) = write_dedupe_index(&idx, &idx_entries) {
warn!("compaction: dedupe sidecar write for {} failed: {}", output_id, e);
let _ = std::fs::remove_file(&idx);
}

// Atomically swap old → new in the manifest.
let committed = {
let mut manifest = self.state.manifest.write().await;
Expand Down
19 changes: 19 additions & 0 deletions src/ingest/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use tokio::sync::mpsc;
use tracing::{info, error};
use crate::model::event::UsageEvent;
use crate::model::ids::{AccountId, bucket_for_account};
use crate::storage::dedupe_index::{index_path, write_dedupe_index, DedupeEntry};
use crate::storage::segment_writer::RawSegmentWriter;
use crate::storage::manifest::{SegmentMeta, SegmentKind};
use crate::runtime::recovery::compute_event_hashes;
use crate::ingest::wal::Wal;
use std::collections::{BTreeMap, HashSet};
use std::path::PathBuf;
Expand Down Expand Up @@ -174,11 +176,16 @@ impl FlusherWorker {
Ok(w) => w,
Err(e) => return Err(format!("create segment for bucket {}: {}", bucket, e)),
};
let mut index_entries: Vec<DedupeEntry> = Vec::with_capacity(sorted.len());
for event in &sorted {
if let Err(e) = writer.write_event(event) {
let _ = std::fs::remove_file(&path);
return Err(format!("write event to bucket {} segment {}: {}", bucket, segment_id, e));
}
// Build the per-row dedupe sidecar entry so recovery can
// skip the bincode + zstd decode of the event column.
let (id_hash, payload_hash) = compute_event_hashes(event);
index_entries.push((id_hash, payload_hash, event.ingested_at_ms));
}
let (_row_count, checksum) = match writer.finish() {
Ok(t) => t,
Expand All @@ -188,6 +195,18 @@ impl FlusherWorker {
}
};

// Write the dedupe sidecar. Failure here is non-fatal —
// recovery falls back to scanning the segment — but we log it
// and clean up so it doesn't leave a half-written file behind.
let idx_path = index_path(&self.state.config.db_root, &segment_id);
if let Err(e) = write_dedupe_index(&idx_path, &index_entries) {
tracing::warn!(
"flusher: dedupe sidecar write for {} failed: {} — segment is fine, recovery will scan",
segment_id, e
);
let _ = std::fs::remove_file(&idx_path);
}

Ok((build_segment_meta(&segment_id, &sorted, bucket, checksum), path))
}
}
Expand Down
39 changes: 39 additions & 0 deletions src/runtime/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::ingest::wal::Wal;
use crate::ingest::dedupe::{HotDedupe, DEFAULT_TTL_MS};
use crate::ingest::memtable::Memtable;
use crate::model::event::UsageEvent;
use crate::storage::dedupe_index::{index_path, read_dedupe_index};
use crate::storage::segment_reader::RawSegmentReader;
use std::io::Result as IoResult;
use tracing::{info, warn};
Expand Down Expand Up @@ -101,6 +102,12 @@ impl Recovery {
let cutoff = now_ms_recovery().saturating_sub(DEFAULT_TTL_MS);
let mut segments_scanned = 0usize;
let mut events_registered = 0usize;
// Track how many segments used the fast sidecar path vs the
// segment-scan fallback, so we can spot when sidecars are
// missing (e.g., after upgrading from a build that didn't write them).
let mut sidecar_hits = 0usize;
let mut sidecar_misses = 0usize;

for seg in &manifest.raw_segments {
if seg.max_timestamp_ms < cutoff {
continue;
Expand All @@ -113,6 +120,32 @@ impl Recovery {
);
continue;
}

// Fast path: try the sidecar `.idx` file. It carries the
// same (id_hash, payload_hash, ingested_at_ms) triples in
// segment row order, without bincode-decoding the event
// column. Falls through to the slow path on missing or
// corrupt sidecar.
let idx = index_path(&self.db_root, &seg.segment_id);
let sidecar = match read_dedupe_index(&idx) {
Ok(s) => s,
Err(e) => {
warn!("recovery: dedupe sidecar for {} unusable ({}); falling back to scan", seg.segment_id, e);
None
}
};
if let Some(entries) = sidecar {
for (id_hash, payload_hash, ingested_at_ms) in entries {
dedupe.insert_known(id_hash, payload_hash, ingested_at_ms);
events_registered += 1;
}
sidecar_hits += 1;
segments_scanned += 1;
continue;
}

// Slow path: open the segment and rehash each event.
sidecar_misses += 1;
match RawSegmentReader::new(path) {
Ok(mut reader) => {
loop {
Expand All @@ -134,6 +167,12 @@ impl Recovery {
Err(e) => warn!("recovery: failed to open segment {}: {}", seg.segment_id, e),
}
}
if segments_scanned > 0 {
info!(
"Dedupe rebuild: sidecar hits {}, fallback scans {}",
sidecar_hits, sidecar_misses
);
}
if segments_scanned > 0 {
info!(
"Dedupe rebuild: scanned {} recent segments, registered {} events",
Expand Down
126 changes: 126 additions & 0 deletions src/storage/dedupe_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
//! Per-segment sidecar dedupe index.
//!
//! Each raw segment `raw_<uuid>.seg` gets a companion `raw_<uuid>.idx`
//! file holding `Vec<(event_id_hash, payload_hash, ingested_at_ms)>` in
//! the same row order. Recovery uses this to rebuild the hot dedupe
//! cache without decompressing + bincode-decoding the full event
//! column — a 10–100× speedup on segments with rich payloads.
//!
//! The sidecar is an *optimization*: if missing or corrupt, recovery
//! falls back to scanning the segment file with `RawSegmentReader` and
//! recomputing hashes. So the format change is non-breaking — old
//! segments without an `.idx` still recover correctly, just slower.
//!
//! File layout (the entire file):
//! - magic b"UDBIDX01" (8 bytes)
//! - count u32 LE (number of entries)
//! - entries `Vec<(u128, u128, i64)>` bincode-serialized
//! - checksum u64 LE (low 8 bytes of blake3 over above)
//!
//! Corruption is detected on read; a checksum mismatch causes the
//! caller to fall back to the segment scan.

use std::fs::File;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};

use crate::ingest::dedupe::EventHash;
use crate::storage::segment_format::checksum;

pub const MAGIC: &[u8; 8] = b"UDBIDX01";

/// `(event_id_hash, payload_hash, ingested_at_ms)` for one event in
/// segment row order.
pub type DedupeEntry = (EventHash, EventHash, i64);

/// Sidecar `.idx` path for a segment.
pub fn index_path(db_root: &Path, segment_id: &str) -> PathBuf {
db_root.join(format!("{}.idx", segment_id))
}

/// Write the index next to the segment file. Caller is responsible for
/// ordering this relative to the manifest commit — typically: write
/// segment → write index → commit manifest. If the manifest commit
/// fails, both files are orphaned; recovery cleans them up by ignoring
/// segments not in the manifest.
pub fn write_dedupe_index(path: &Path, entries: &[DedupeEntry]) -> std::io::Result<()> {
let body = bincode::serialize(entries)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;

let mut buf: Vec<u8> = Vec::with_capacity(MAGIC.len() + 4 + body.len() + 8);
buf.extend_from_slice(MAGIC);
buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
buf.extend_from_slice(&body);
let cs = checksum(&buf);
buf.extend_from_slice(&cs.to_le_bytes());

let mut file = File::create(path)?;
file.write_all(&buf)?;
file.sync_all()?;
Ok(())
}

/// Read the sidecar. Returns `Ok(None)` if the file is missing (callers
/// fall back to segment scanning). Returns `Err` for corruption /
/// truncation / checksum mismatch — also a signal to fall back.
pub fn read_dedupe_index(path: &Path) -> std::io::Result<Option<Vec<DedupeEntry>>> {
if !path.exists() {
return Ok(None);
}
let mut file = File::open(path)?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)?;

const MIN_LEN: usize = 8 + 4 + 8; // magic + count + checksum
if bytes.len() < MIN_LEN {
return Err(corrupt("file too small"));
}
if &bytes[..MAGIC.len()] != MAGIC {
return Err(corrupt("missing magic"));
}
let cs_off = bytes.len() - 8;
let stored_cs = u64::from_le_bytes([
bytes[cs_off],
bytes[cs_off + 1],
bytes[cs_off + 2],
bytes[cs_off + 3],
bytes[cs_off + 4],
bytes[cs_off + 5],
bytes[cs_off + 6],
bytes[cs_off + 7],
]);
let body = &bytes[..cs_off];
let computed_cs = checksum(body);
if computed_cs != stored_cs {
return Err(corrupt(&format!(
"checksum mismatch (stored {:#x}, computed {:#x})",
stored_cs, computed_cs
)));
}

let count = u32::from_le_bytes([
bytes[MAGIC.len()],
bytes[MAGIC.len() + 1],
bytes[MAGIC.len() + 2],
bytes[MAGIC.len() + 3],
]) as usize;

let entries_bytes = &bytes[MAGIC.len() + 4..cs_off];
let entries: Vec<DedupeEntry> = bincode::deserialize(entries_bytes)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
if entries.len() != count {
return Err(corrupt(&format!(
"entry count mismatch (header={}, decoded={})",
count,
entries.len()
)));
}
Ok(Some(entries))
}

fn corrupt(msg: &str) -> std::io::Error {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("corrupt dedupe index: {}", msg),
)
}
1 change: 1 addition & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Storage module
pub mod columns;
pub mod compression;
pub mod dedupe_index;
pub mod encoding;
pub mod manifest;
pub mod segment_format;
Expand Down
Loading
Loading