From 1848a161f0c272f797c5a24e5c8ca1f09bb05da0 Mon Sep 17 00:00:00 2001 From: Robert Fleischmann Date: Mon, 30 Mar 2026 16:39:24 -0400 Subject: [PATCH] Add LanceDB compaction to reclaim disk space LanceDB is append-only: every store, delete, and consolidation merge creates new data files without reclaiming space, causing unbounded storage growth. This adds three-phase optimization (compact fragments, prune old versions, rebuild indices) via LanceDB's Table::optimize API. - `sediment compact` CLI command for manual optimization - Automatic background compaction every 50 write operations - Configurable prune duration, thread count, and force mode Co-Authored-By: Claude Opus 4.6 (1M context) --- src/compaction.rs | 55 +++++++++++++++++++ src/db.rs | 137 ++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/main.rs | 104 ++++++++++++++++++++++++++++++++++- src/mcp/server.rs | 17 +++++- src/mcp/tools.rs | 24 ++++++++ 6 files changed, 334 insertions(+), 4 deletions(-) create mode 100644 src/compaction.rs diff --git a/src/compaction.rs b/src/compaction.rs new file mode 100644 index 0000000..81d8566 --- /dev/null +++ b/src/compaction.rs @@ -0,0 +1,55 @@ +//! Background compaction: periodic optimization of LanceDB on-disk storage. +//! +//! LanceDB is append-only — every store, delete, and update creates new data files. +//! This module spawns a background task that compacts fragments, prunes old versions, +//! and re-optimizes indices to reclaim disk space. + +use std::path::PathBuf; +use std::sync::Arc; + +use tracing::{debug, info, warn}; + +use crate::db::CompactConfig; +use crate::embedder::Embedder; + +/// Spawn a background compaction task. +/// Returns immediately. Uses a semaphore to ensure only one runs at a time. +pub fn spawn_compaction( + db_path: Arc, + project_id: Option, + embedder: Arc, + semaphore: Arc, +) { + tokio::spawn(async move { + let _permit = match semaphore.try_acquire_owned() { + Ok(p) => p, + Err(_) => { + debug!("Compaction already running, skipping"); + return; + } + }; + + info!("Starting background compaction"); + + let config = CompactConfig { + prune_older_than: Some(chrono::Duration::days(1)), + delete_unverified: false, + num_threads: Some(1), + skip_prune: false, + }; + + match crate::Database::open_with_embedder(&*db_path, project_id, embedder).await { + Ok(db) => match db.optimize_tables(&config).await { + Ok(_stats) => { + info!("Background compaction completed"); + } + Err(e) => { + warn!("Background compaction failed: {}", e); + } + }, + Err(e) => { + warn!("Failed to open database for compaction: {}", e); + } + } + }); +} diff --git a/src/db.rs b/src/db.rs index 1b4047d..df42b0c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1310,6 +1310,143 @@ impl Database { Ok(stats) } + + /// Optimize on-disk storage: compact fragments, prune old versions, and rebuild indices. + /// + /// LanceDB is append-only: every store, delete, and update creates new data files. + /// This method reclaims space by merging small fragments, materializing deletions, + /// and removing old dataset versions. + pub async fn optimize_tables(&self, config: &CompactConfig) -> Result { + let mut stats = CompactStats::default(); + + for (label, table) in [("items", &self.items_table), ("chunks", &self.chunks_table)] { + let Some(table) = table else { continue }; + + // Phase 1: Compact — merge small files, materialize deletions + let compact_opts = lancedb::table::CompactionOptions { + target_rows_per_fragment: 1024, + materialize_deletions: true, + materialize_deletions_threshold: 0.0, + num_threads: config.num_threads, + ..Default::default() + }; + match table + .optimize(lancedb::table::OptimizeAction::Compact { + options: compact_opts, + remap_options: None, + }) + .await + { + Ok(s) => { + if let Some(ref c) = s.compaction { + info!( + "{} compaction: {} fragments removed, {} new, {} files removed", + label, c.fragments_removed, c.fragments_added, c.files_removed, + ); + } + match label { + "items" => stats.items_compacted = s.compaction.is_some(), + "chunks" => stats.chunks_compacted = s.compaction.is_some(), + _ => {} + } + } + Err(e) => { + tracing::warn!("{} compaction failed: {}", label, e); + } + } + + // Phase 2: Prune — remove old dataset versions + if !config.skip_prune { + let older_than = config.prune_older_than; + match table + .optimize(lancedb::table::OptimizeAction::Prune { + older_than, + delete_unverified: Some(config.delete_unverified), + error_if_tagged_old_versions: None, + }) + .await + { + Ok(s) => { + if let Some(ref p) = s.prune { + info!( + "{} prune: {} bytes removed, {} old versions removed", + label, p.bytes_removed, p.old_versions, + ); + } + match label { + "items" => stats.items_pruned = s.prune.is_some(), + "chunks" => stats.chunks_pruned = s.prune.is_some(), + _ => {} + } + } + Err(e) => { + tracing::warn!("{} prune failed: {}", label, e); + } + } + } + + // Phase 3: Index — optimize indices for new data + match table + .optimize(lancedb::table::OptimizeAction::Index( + lancedb::table::OptimizeOptions::default(), + )) + .await + { + Ok(_) => { + info!("{} index optimization complete", label); + } + Err(e) => { + tracing::warn!("{} index optimization failed: {}", label, e); + } + } + } + + Ok(stats) + } +} + +/// Configuration for table optimization. +#[derive(Debug, Clone, Default)] +pub struct CompactConfig { + /// Minimum age of versions to prune. None uses Duration::ZERO (prune all). + pub prune_older_than: Option, + /// Whether to delete unverified files (files created less than 7 days ago). + pub delete_unverified: bool, + /// Number of threads for compaction. None uses all available CPUs. + pub num_threads: Option, + /// Skip the prune phase entirely. + pub skip_prune: bool, +} + +/// Results from table optimization. +#[derive(Debug, Default)] +pub struct CompactStats { + pub items_compacted: bool, + pub chunks_compacted: bool, + pub items_pruned: bool, + pub chunks_pruned: bool, +} + +/// Measure the total size (in bytes) of all files in a directory tree. +pub fn dir_size(path: &std::path::Path) -> u64 { + walkdir(path) +} + +fn walkdir(path: &std::path::Path) -> u64 { + let mut total = 0u64; + if let Ok(entries) = std::fs::read_dir(path) { + for entry in entries.flatten() { + let ft = entry.file_type(); + if let Ok(ft) = ft { + if ft.is_file() { + total += entry.metadata().map(|m| m.len()).unwrap_or(0); + } else if ft.is_dir() { + total += walkdir(&entry.path()); + } + } + } + } + total } // ==================== Project ID Migration ==================== diff --git a/src/lib.rs b/src/lib.rs index 72700b3..0f4f2bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,7 @@ use uuid::Uuid; pub mod access; pub mod chunker; +pub mod compaction; pub mod consolidation; pub mod db; pub mod document; diff --git a/src/main.rs b/src/main.rs index 4259e85..1e76c9c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -96,6 +96,17 @@ enum Commands { /// Item ID to delete id: String, }, + + /// Compact and optimize LanceDB storage (reclaim disk space) + Compact { + /// Also delete unverified files (less than 7 days old) + #[arg(long)] + force: bool, + + /// Skip the prune phase (only compact and re-index) + #[arg(long)] + no_prune: bool, + }, } fn main() -> Result<()> { @@ -133,6 +144,7 @@ fn main() -> Result<()> { }) => run_store(cli.db, &content, &scope, replace, cli.json), Some(Commands::Recall { query, limit }) => run_recall(cli.db, &query, limit, cli.json), Some(Commands::Forget { id }) => run_forget(cli.db, &id, cli.json), + Some(Commands::Compact { force, no_prune }) => run_compact(cli.db, force, no_prune), } } @@ -524,7 +536,8 @@ fn run_store( let rt = tokio::runtime::Runtime::new()?; rt.block_on(async { - let mut db = sediment::Database::open_with_project(&ctx.db_path, ctx.project_id.clone()).await?; + let mut db = + sediment::Database::open_with_project(&ctx.db_path, ctx.project_id.clone()).await?; let mut item = sediment::Item::new(&content); @@ -868,6 +881,95 @@ fn run_forget(db_override: Option, id: &str, output_json: bool) -> Resu }) } +/// Compact and optimize LanceDB storage +fn run_compact(db_override: Option, force: bool, no_prune: bool) -> Result<()> { + let db_path = db_override.unwrap_or_else(sediment::central_db_path); + + if !db_path.exists() { + println!("Database does not exist yet. Nothing to compact."); + return Ok(()); + } + + println!("Database: {}", db_path.display()); + + let size_before = sediment::db::dir_size(&db_path); + println!("Size before: {}", format_bytes(size_before)); + + let config = sediment::db::CompactConfig { + prune_older_than: if no_prune { + None + } else { + Some(chrono::Duration::zero()) + }, + delete_unverified: force, + num_threads: None, // use all CPUs for CLI + skip_prune: no_prune, + }; + + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(async { + let db = sediment::Database::open(&db_path).await?; + let stats = db.optimize_tables(&config).await?; + + let size_after = sediment::db::dir_size(&db_path); + + println!("\nOptimization results:"); + println!( + " Items table: {}", + if stats.items_compacted { + "compacted" + } else { + "no changes needed" + } + ); + println!( + " Chunks table: {}", + if stats.chunks_compacted { + "compacted" + } else { + "no changes needed" + } + ); + + if !no_prune { + println!( + " Items pruned: {}", + if stats.items_pruned { "yes" } else { "no" } + ); + println!( + " Chunks pruned: {}", + if stats.chunks_pruned { "yes" } else { "no" } + ); + } + + println!("\nSize after: {}", format_bytes(size_after)); + + if size_before > size_after { + println!("Reclaimed: {}", format_bytes(size_before - size_after)); + } else { + println!("No space reclaimed (storage is already optimized)"); + } + + Ok(()) + }) +} + +fn format_bytes(bytes: u64) -> String { + const KB: u64 = 1024; + const MB: u64 = 1024 * KB; + const GB: u64 = 1024 * MB; + + if bytes >= GB { + format!("{:.2} GB", bytes as f64 / GB as f64) + } else if bytes >= MB { + format!("{:.2} MB", bytes as f64 / MB as f64) + } else if bytes >= KB { + format!("{:.2} KB", bytes as f64 / KB as f64) + } else { + format!("{} bytes", bytes) + } +} + /// Generate CLAUDE.md instructions for Sediment fn generate_claude_md_instructions() -> String { r#"# Sediment Memory System diff --git a/src/mcp/server.rs b/src/mcp/server.rs index d1e368b..3c95ff6 100644 --- a/src/mcp/server.rs +++ b/src/mcp/server.rs @@ -39,8 +39,12 @@ pub struct ServerContext { pub cwd: PathBuf, /// Semaphore to ensure only one consolidation task runs at a time pub consolidation_semaphore: Arc, + /// Semaphore to ensure only one compaction task runs at a time + pub compaction_semaphore: Arc, /// Counter for recall invocations (triggers periodic clustering and expired cleanup) pub recall_count: std::sync::atomic::AtomicU64, + /// Counter for write operations (store/forget) that trigger periodic compaction + pub write_count: std::sync::atomic::AtomicU64, /// Rate limiter state (mutex protects window+count as a unit) pub rate_limit: Mutex, } @@ -70,7 +74,9 @@ pub fn run(db_path: &Path, project_id: Option) -> Result<()> { embedder, cwd, consolidation_semaphore: Arc::new(Semaphore::new(1)), + compaction_semaphore: Arc::new(Semaphore::new(1)), recall_count: std::sync::atomic::AtomicU64::new(0), + write_count: std::sync::atomic::AtomicU64::new(0), rate_limit: Mutex::new(RateLimitState { window_start_ms: 0, count: 0, @@ -163,11 +169,16 @@ pub fn run(db_path: &Path, project_id: Option) -> Result<()> { } } - // Graceful shutdown: wait for in-flight consolidation before dropping + // Graceful shutdown: wait for in-flight background tasks before dropping tracing::info!("Client disconnected, shutting down..."); - let sem = ctx.consolidation_semaphore.clone(); + let consol_sem = ctx.consolidation_semaphore.clone(); + let compact_sem = ctx.compaction_semaphore.clone(); let _ = rt.block_on(async { - tokio::time::timeout(std::time::Duration::from_secs(10), sem.acquire()).await + tokio::time::timeout(std::time::Duration::from_secs(10), async { + let _ = consol_sem.acquire().await; + let _ = compact_sem.acquire().await; + }) + .await }); drop(ctx); rt.shutdown_timeout(std::time::Duration::from_secs(5)); diff --git a/src/mcp/tools.rs b/src/mcp/tools.rs index da27dec..ce9c203 100644 --- a/src/mcp/tools.rs +++ b/src/mcp/tools.rs @@ -8,6 +8,7 @@ use serde::Deserialize; use serde_json::{Value, json}; use crate::access::AccessTracker; +use crate::compaction::spawn_compaction; use crate::consolidation::{ConsolidationQueue, spawn_consolidation}; use crate::db::{is_valid_id, score_with_decay}; use crate::graph::GraphStore; @@ -29,6 +30,21 @@ fn spawn_logged(name: &'static str, fut: impl std::future::Future + }); } +/// Increment the write counter and spawn background compaction every 50 writes. +fn maybe_spawn_compaction(ctx: &ServerContext) { + let count = ctx + .write_count + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + if count % 50 == 49 { + spawn_compaction( + Arc::new(ctx.db_path.clone()), + ctx.project_id.clone(), + ctx.embedder.clone(), + ctx.compaction_semaphore.clone(), + ); + } +} + /// Get all available tools (4 total) pub fn get_tools() -> Vec { let store_schema = { @@ -392,6 +408,9 @@ async fn execute_store( } } + // Periodic compaction: every 50 write operations + maybe_spawn_compaction(ctx); + let message = if replaced { format!( "Stored in {} scope (replaced {})", @@ -909,6 +928,9 @@ async fn execute_forget( tracing::warn!("remove_node failed: {}", e); } + // Periodic compaction: every 50 write operations + maybe_spawn_compaction(ctx); + let result = json!({ "success": true, "message": format!("Deleted item: {}", params.id) @@ -998,7 +1020,9 @@ mod tests { embedder, cwd: PathBuf::from("."), consolidation_semaphore: Arc::new(Semaphore::new(1)), + compaction_semaphore: Arc::new(Semaphore::new(1)), recall_count: std::sync::atomic::AtomicU64::new(0), + write_count: std::sync::atomic::AtomicU64::new(0), rate_limit: Mutex::new(super::super::server::RateLimitState { window_start_ms: 0, count: 0,