Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/compaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//! Background compaction: periodic optimization of LanceDB on-disk storage.
//!
//! LanceDB is append-only — every store, delete, and update creates new data files.
//! This module spawns a background task that compacts fragments, prunes old versions,
//! and re-optimizes indices to reclaim disk space.

use std::path::PathBuf;
use std::sync::Arc;

use tracing::{debug, info, warn};

use crate::db::CompactConfig;
use crate::embedder::Embedder;

/// Spawn a background compaction task.
/// Returns immediately. Uses a semaphore to ensure only one runs at a time.
pub fn spawn_compaction(
db_path: Arc<PathBuf>,
project_id: Option<String>,
embedder: Arc<Embedder>,
semaphore: Arc<tokio::sync::Semaphore>,
) {
tokio::spawn(async move {
let _permit = match semaphore.try_acquire_owned() {
Ok(p) => p,
Err(_) => {
debug!("Compaction already running, skipping");
return;
}
};

info!("Starting background compaction");

let config = CompactConfig {
prune_older_than: Some(chrono::Duration::days(1)),
delete_unverified: false,
num_threads: Some(1),
skip_prune: false,
};

match crate::Database::open_with_embedder(&*db_path, project_id, embedder).await {
Ok(db) => match db.optimize_tables(&config).await {
Ok(_stats) => {
info!("Background compaction completed");
}
Err(e) => {
warn!("Background compaction failed: {}", e);
}
},
Err(e) => {
warn!("Failed to open database for compaction: {}", e);
}
}
});
}
137 changes: 137 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,143 @@ impl Database {

Ok(stats)
}

/// Optimize on-disk storage: compact fragments, prune old versions, and rebuild indices.
///
/// LanceDB is append-only: every store, delete, and update creates new data files.
/// This method reclaims space by merging small fragments, materializing deletions,
/// and removing old dataset versions.
pub async fn optimize_tables(&self, config: &CompactConfig) -> Result<CompactStats> {
let mut stats = CompactStats::default();

for (label, table) in [("items", &self.items_table), ("chunks", &self.chunks_table)] {
let Some(table) = table else { continue };

// Phase 1: Compact — merge small files, materialize deletions
let compact_opts = lancedb::table::CompactionOptions {
target_rows_per_fragment: 1024,
materialize_deletions: true,
materialize_deletions_threshold: 0.0,
num_threads: config.num_threads,
..Default::default()
};
match table
.optimize(lancedb::table::OptimizeAction::Compact {
options: compact_opts,
remap_options: None,
})
.await
{
Ok(s) => {
if let Some(ref c) = s.compaction {
info!(
"{} compaction: {} fragments removed, {} new, {} files removed",
label, c.fragments_removed, c.fragments_added, c.files_removed,
);
}
match label {
"items" => stats.items_compacted = s.compaction.is_some(),
"chunks" => stats.chunks_compacted = s.compaction.is_some(),
_ => {}
}
}
Err(e) => {
tracing::warn!("{} compaction failed: {}", label, e);
}
}

// Phase 2: Prune — remove old dataset versions
if !config.skip_prune {
let older_than = config.prune_older_than;
match table
.optimize(lancedb::table::OptimizeAction::Prune {
older_than,
delete_unverified: Some(config.delete_unverified),
error_if_tagged_old_versions: None,
})
.await
{
Ok(s) => {
if let Some(ref p) = s.prune {
info!(
"{} prune: {} bytes removed, {} old versions removed",
label, p.bytes_removed, p.old_versions,
);
}
match label {
"items" => stats.items_pruned = s.prune.is_some(),
"chunks" => stats.chunks_pruned = s.prune.is_some(),
_ => {}
}
}
Err(e) => {
tracing::warn!("{} prune failed: {}", label, e);
}
}
}

// Phase 3: Index — optimize indices for new data
match table
.optimize(lancedb::table::OptimizeAction::Index(
lancedb::table::OptimizeOptions::default(),
))
.await
{
Ok(_) => {
info!("{} index optimization complete", label);
}
Err(e) => {
tracing::warn!("{} index optimization failed: {}", label, e);
}
}
}

Ok(stats)
}
}

/// Configuration for table optimization.
#[derive(Debug, Clone, Default)]
pub struct CompactConfig {
/// Minimum age of versions to prune. None uses Duration::ZERO (prune all).
pub prune_older_than: Option<chrono::Duration>,
/// Whether to delete unverified files (files created less than 7 days ago).
pub delete_unverified: bool,
/// Number of threads for compaction. None uses all available CPUs.
pub num_threads: Option<usize>,
/// Skip the prune phase entirely.
pub skip_prune: bool,
}

/// Results from table optimization.
#[derive(Debug, Default)]
pub struct CompactStats {
pub items_compacted: bool,
pub chunks_compacted: bool,
pub items_pruned: bool,
pub chunks_pruned: bool,
}

/// Measure the total size (in bytes) of all files in a directory tree.
pub fn dir_size(path: &std::path::Path) -> u64 {
walkdir(path)
}

fn walkdir(path: &std::path::Path) -> u64 {
let mut total = 0u64;
if let Ok(entries) = std::fs::read_dir(path) {
for entry in entries.flatten() {
let ft = entry.file_type();
if let Ok(ft) = ft {
if ft.is_file() {
total += entry.metadata().map(|m| m.len()).unwrap_or(0);
} else if ft.is_dir() {
total += walkdir(&entry.path());
}
}
}
}
total
}

// ==================== Project ID Migration ====================
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use uuid::Uuid;

pub mod access;
pub mod chunker;
pub mod compaction;
pub mod consolidation;
pub mod db;
pub mod document;
Expand Down
104 changes: 103 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ enum Commands {
/// Item ID to delete
id: String,
},

/// Compact and optimize LanceDB storage (reclaim disk space)
Compact {
/// Also delete unverified files (less than 7 days old)
#[arg(long)]
force: bool,

/// Skip the prune phase (only compact and re-index)
#[arg(long)]
no_prune: bool,
},
}

fn main() -> Result<()> {
Expand Down Expand Up @@ -133,6 +144,7 @@ fn main() -> Result<()> {
}) => run_store(cli.db, &content, &scope, replace, cli.json),
Some(Commands::Recall { query, limit }) => run_recall(cli.db, &query, limit, cli.json),
Some(Commands::Forget { id }) => run_forget(cli.db, &id, cli.json),
Some(Commands::Compact { force, no_prune }) => run_compact(cli.db, force, no_prune),
}
}

Expand Down Expand Up @@ -524,7 +536,8 @@ fn run_store(

let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let mut db = sediment::Database::open_with_project(&ctx.db_path, ctx.project_id.clone()).await?;
let mut db =
sediment::Database::open_with_project(&ctx.db_path, ctx.project_id.clone()).await?;

let mut item = sediment::Item::new(&content);

Expand Down Expand Up @@ -868,6 +881,95 @@ fn run_forget(db_override: Option<PathBuf>, id: &str, output_json: bool) -> Resu
})
}

/// Compact and optimize LanceDB storage
fn run_compact(db_override: Option<PathBuf>, force: bool, no_prune: bool) -> Result<()> {
let db_path = db_override.unwrap_or_else(sediment::central_db_path);

if !db_path.exists() {
println!("Database does not exist yet. Nothing to compact.");
return Ok(());
}

println!("Database: {}", db_path.display());

let size_before = sediment::db::dir_size(&db_path);
println!("Size before: {}", format_bytes(size_before));

let config = sediment::db::CompactConfig {
prune_older_than: if no_prune {
None
} else {
Some(chrono::Duration::zero())
},
delete_unverified: force,
num_threads: None, // use all CPUs for CLI
skip_prune: no_prune,
};

let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let db = sediment::Database::open(&db_path).await?;
let stats = db.optimize_tables(&config).await?;

let size_after = sediment::db::dir_size(&db_path);

println!("\nOptimization results:");
println!(
" Items table: {}",
if stats.items_compacted {
"compacted"
} else {
"no changes needed"
}
);
println!(
" Chunks table: {}",
if stats.chunks_compacted {
"compacted"
} else {
"no changes needed"
}
);

if !no_prune {
println!(
" Items pruned: {}",
if stats.items_pruned { "yes" } else { "no" }
);
println!(
" Chunks pruned: {}",
if stats.chunks_pruned { "yes" } else { "no" }
);
}

println!("\nSize after: {}", format_bytes(size_after));

if size_before > size_after {
println!("Reclaimed: {}", format_bytes(size_before - size_after));
} else {
println!("No space reclaimed (storage is already optimized)");
}

Ok(())
})
}

fn format_bytes(bytes: u64) -> String {
const KB: u64 = 1024;
const MB: u64 = 1024 * KB;
const GB: u64 = 1024 * MB;

if bytes >= GB {
format!("{:.2} GB", bytes as f64 / GB as f64)
} else if bytes >= MB {
format!("{:.2} MB", bytes as f64 / MB as f64)
} else if bytes >= KB {
format!("{:.2} KB", bytes as f64 / KB as f64)
} else {
format!("{} bytes", bytes)
}
}

/// Generate CLAUDE.md instructions for Sediment
fn generate_claude_md_instructions() -> String {
r#"# Sediment Memory System
Expand Down
17 changes: 14 additions & 3 deletions src/mcp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ pub struct ServerContext {
pub cwd: PathBuf,
/// Semaphore to ensure only one consolidation task runs at a time
pub consolidation_semaphore: Arc<Semaphore>,
/// Semaphore to ensure only one compaction task runs at a time
pub compaction_semaphore: Arc<Semaphore>,
/// Counter for recall invocations (triggers periodic clustering and expired cleanup)
pub recall_count: std::sync::atomic::AtomicU64,
/// Counter for write operations (store/forget) that trigger periodic compaction
pub write_count: std::sync::atomic::AtomicU64,
/// Rate limiter state (mutex protects window+count as a unit)
pub rate_limit: Mutex<RateLimitState>,
}
Expand Down Expand Up @@ -70,7 +74,9 @@ pub fn run(db_path: &Path, project_id: Option<String>) -> Result<()> {
embedder,
cwd,
consolidation_semaphore: Arc::new(Semaphore::new(1)),
compaction_semaphore: Arc::new(Semaphore::new(1)),
recall_count: std::sync::atomic::AtomicU64::new(0),
write_count: std::sync::atomic::AtomicU64::new(0),
rate_limit: Mutex::new(RateLimitState {
window_start_ms: 0,
count: 0,
Expand Down Expand Up @@ -163,11 +169,16 @@ pub fn run(db_path: &Path, project_id: Option<String>) -> Result<()> {
}
}

// Graceful shutdown: wait for in-flight consolidation before dropping
// Graceful shutdown: wait for in-flight background tasks before dropping
tracing::info!("Client disconnected, shutting down...");
let sem = ctx.consolidation_semaphore.clone();
let consol_sem = ctx.consolidation_semaphore.clone();
let compact_sem = ctx.compaction_semaphore.clone();
let _ = rt.block_on(async {
tokio::time::timeout(std::time::Duration::from_secs(10), sem.acquire()).await
tokio::time::timeout(std::time::Duration::from_secs(10), async {
let _ = consol_sem.acquire().await;
let _ = compact_sem.acquire().await;
})
.await
});
drop(ctx);
rt.shutdown_timeout(std::time::Duration::from_secs(5));
Expand Down
Loading
Loading