From 2c87ce70ba60db5bea37a167bf6cd0ee593d4fba Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Fri, 13 Mar 2026 15:51:00 -0700 Subject: [PATCH] Add --max-storage quota enforcement and statfs() support Signed-off-by: Cong Wang --- src/branch.rs | 87 +++++++++++++++++++++++++- src/daemon.rs | 37 ++++++++---- src/fs.rs | 128 ++++++++++++++++++++++++++++++++++++--- src/fs_helpers.rs | 6 ++ src/main.rs | 38 +++++++++++- tests/test_quota.sh | 144 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 413 insertions(+), 27 deletions(-) create mode 100644 tests/test_quota.sh diff --git a/src/branch.rs b/src/branch.rs index 5ccfb52..1b53c5a 100644 --- a/src/branch.rs +++ b/src/branch.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::fs::{self, File}; use std::io::{BufRead, BufReader, Write}; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; @@ -13,6 +13,79 @@ use crate::error::{BranchError, Result}; use crate::inode::ROOT_INO; use crate::storage; +/// Tracks storage usage across all branches and enforces an optional quota. +/// Only counts delta files (the actual disk cost of branching). +pub struct StorageQuota { + /// Current total bytes used in the storage directory (all branches' deltas). + used_bytes: AtomicI64, + /// Maximum allowed bytes. None = unlimited. + max_bytes: Option, +} + +impl StorageQuota { + pub fn new(max_bytes: Option) -> Self { + Self { + used_bytes: AtomicI64::new(0), + max_bytes, + } + } + + /// Walk the storage directory to compute initial usage. + pub fn scan_usage(storage_path: &Path, max_bytes: Option) -> Self { + let quota = Self::new(max_bytes); + let branches_dir = storage_path.join("branches"); + if branches_dir.exists() { + let bytes = Self::dir_size(&branches_dir); + quota.used_bytes.store(bytes as i64, Ordering::Relaxed); + } + quota + } + + fn dir_size(path: &Path) -> u64 { + let mut total = 0u64; + if let Ok(entries) = fs::read_dir(path) { + for entry in entries.flatten() { + let p = entry.path(); + if p.is_dir() { + total += Self::dir_size(&p); + } else if let Ok(meta) = p.symlink_metadata() { + total += meta.len(); + } + } + } + total + } + + /// Check if `additional` bytes can be allocated. Returns Err(ENOSPC) if not. + pub fn check(&self, additional: u64) -> std::result::Result<(), i32> { + if let Some(max) = self.max_bytes { + let current = self.used_bytes.load(Ordering::Relaxed); + if current as u64 + additional > max { + return Err(libc::ENOSPC); + } + } + Ok(()) + } + + /// Record that `bytes` were added to storage. + pub fn add(&self, bytes: u64) { + self.used_bytes.fetch_add(bytes as i64, Ordering::Relaxed); + } + + /// Record that `bytes` were removed from storage. + pub fn sub(&self, bytes: u64) { + self.used_bytes.fetch_sub(bytes as i64, Ordering::Relaxed); + } + + pub fn used(&self) -> u64 { + self.used_bytes.load(Ordering::Relaxed).max(0) as u64 + } + + pub fn max(&self) -> Option { + self.max_bytes + } +} + /// Remove a file or directory at `path`, following symlinks for the type check. /// Returns `Ok(())` even if the path doesn't exist; propagates real I/O errors. fn remove_entry(path: &Path) -> std::io::Result<()> { @@ -162,12 +235,21 @@ pub struct BranchManager { opened_inodes: Mutex>>, /// Current branch per mount — single source of truth mount_branches: RwLock>, + /// Storage quota enforcement + pub quota: StorageQuota, } impl BranchManager { - pub fn new(storage_path: PathBuf, base_path: PathBuf, workspace_path: PathBuf) -> Result { + pub fn new( + storage_path: PathBuf, + base_path: PathBuf, + workspace_path: PathBuf, + max_storage: Option, + ) -> Result { fs::create_dir_all(&storage_path)?; + let quota = StorageQuota::scan_usage(&storage_path, max_storage); + // Always start fresh with just the "main" branch let mut branches = HashMap::new(); let main_branch = Branch::new("main", None, &storage_path, 0)?; @@ -182,6 +264,7 @@ impl BranchManager { notifiers: Mutex::new(HashMap::new()), opened_inodes: Mutex::new(HashMap::new()), mount_branches: RwLock::new(HashMap::new()), + quota, }) } diff --git a/src/daemon.rs b/src/daemon.rs index ccf9dd6..2177a5b 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -92,6 +92,7 @@ impl Daemon { base_path: PathBuf, storage_path: PathBuf, _workspace_path: PathBuf, + max_storage: Option, ) -> Result { let socket_path = storage_path.join("daemon.sock"); @@ -121,6 +122,7 @@ impl Daemon { storage_path.clone(), base_path.clone(), base_path.clone(), + max_storage, )?); Ok(Self { @@ -379,7 +381,11 @@ pub fn is_daemon_running(socket_path: &Path) -> bool { UnixStream::connect(socket_path).is_ok() } -pub fn start_daemon_background(base_path: &Path, storage_path: &Path) -> std::io::Result<()> { +pub fn start_daemon_background( + base_path: &Path, + storage_path: &Path, + max_storage: Option, +) -> std::io::Result<()> { let socket_path = storage_path.join("daemon.sock"); // Spawn the daemon as a detached child process. The env var tells @@ -389,15 +395,18 @@ pub fn start_daemon_background(base_path: &Path, storage_path: &Path) -> std::io // so callers that capture output won't block. let exe = std::env::current_exe()?; - Command::new(exe) - .args([ - "run-daemon", - "--base", - &base_path.to_string_lossy(), - "--storage", - &storage_path.to_string_lossy(), - ]) - .stdin(Stdio::null()) + let mut cmd = Command::new(exe); + cmd.args([ + "run-daemon", + "--base", + &base_path.to_string_lossy(), + "--storage", + &storage_path.to_string_lossy(), + ]); + if let Some(max) = max_storage { + cmd.args(["--max-storage", &max.to_string()]); + } + cmd.stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::null()) .spawn()?; @@ -416,7 +425,11 @@ pub fn start_daemon_background(base_path: &Path, storage_path: &Path) -> std::io )) } -pub fn ensure_daemon(base_path: Option<&Path>, storage_path: &Path) -> std::io::Result<()> { +pub fn ensure_daemon( + base_path: Option<&Path>, + storage_path: &Path, + max_storage: Option, +) -> std::io::Result<()> { let socket_path = storage_path.join("daemon.sock"); if is_daemon_running(&socket_path) { @@ -440,5 +453,5 @@ pub fn ensure_daemon(base_path: Option<&Path>, storage_path: &Path) -> std::io:: } }; - start_daemon_background(&base_path, storage_path) + start_daemon_background(&base_path, storage_path, max_storage) } diff --git a/src/fs.rs b/src/fs.rs index e50d483..8e0658e 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -9,7 +9,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use fuser::{ BackingId, FileType, Filesystem, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry, - ReplyIoctl, ReplyOpen, ReplyWrite, Request, TimeOrNow, + ReplyIoctl, ReplyOpen, ReplyStatfs, ReplyWrite, Request, TimeOrNow, }; use parking_lot::RwLock; @@ -714,12 +714,25 @@ impl Filesystem for BranchFs { // to the same inode (after COW is already done). if let Some(file) = self.write_cache.get(ino, epoch) { use std::io::{Seek, SeekFrom, Write}; + // Check quota for writes that extend the file + let old_size = file.metadata().map(|m| m.len()).unwrap_or(0); + let write_end = offset as u64 + data.len() as u64; + if write_end > old_size && self.manager.quota.check(write_end - old_size).is_err() { + reply.error(libc::ENOSPC); + return; + } if file.seek(SeekFrom::Start(offset as u64)).is_err() { reply.error(libc::EIO); return; } match file.write(data) { - Ok(n) => reply.written(n as u32), + Ok(n) => { + let new_end = offset as u64 + n as u64; + if new_end > old_size { + self.manager.quota.add(new_end - old_size); + } + reply.written(n as u32) + } Err(_) => reply.error(libc::EIO), } return; @@ -780,6 +793,12 @@ impl Filesystem for BranchFs { // Serve from the just-cached write fd if let Some(file) = self.write_cache.get(ino, epoch) { use std::io::{Seek, SeekFrom, Write}; + let old_size = file.metadata().map(|m| m.len()).unwrap_or(0); + let write_end = offset as u64 + data.len() as u64; + if write_end > old_size && self.manager.quota.check(write_end - old_size).is_err() { + reply.error(libc::ENOSPC); + return; + } if file.seek(SeekFrom::Start(offset as u64)).is_err() { reply.error(libc::EIO); return; @@ -790,6 +809,10 @@ impl Filesystem for BranchFs { reply.error(libc::ESTALE); return; } + let new_end = offset as u64 + n as u64; + if new_end > old_size { + self.manager.quota.add(new_end - old_size); + } reply.written(n as u32) } Err(_) => reply.error(libc::EIO), @@ -1076,7 +1099,9 @@ impl Filesystem for BranchFs { b.add_tombstone(&rel_path)?; let delta = b.delta_path(&rel_path); if delta.exists() { + let freed = delta.symlink_metadata().map(|m| m.len()).unwrap_or(0); std::fs::remove_file(&delta)?; + self.manager.quota.sub(freed); } Ok(()) }); @@ -1106,7 +1131,9 @@ impl Filesystem for BranchFs { b.add_tombstone(&path)?; let delta = b.delta_path(&path); if delta.exists() { + let freed = delta.symlink_metadata().map(|m| m.len()).unwrap_or(0); std::fs::remove_file(&delta)?; + self.manager.quota.sub(freed); } Ok(()) }); @@ -1450,11 +1477,30 @@ impl Filesystem for BranchFs { return; } if let Some(new_size) = size { - if let Ok(delta) = self.ensure_cow_for_branch(&branch, &rel_path) { - let file = std::fs::OpenOptions::new().write(true).open(&delta); - if let Ok(f) = file { - let _ = f.set_len(new_size); + match self.ensure_cow_for_branch(&branch, &rel_path) { + Ok(delta) => { + let file = std::fs::OpenOptions::new().write(true).open(&delta); + if let Ok(f) = file { + let old_size = f.metadata().map(|m| m.len()).unwrap_or(0); + if new_size > old_size + && self.manager.quota.check(new_size - old_size).is_err() + { + reply.error(libc::ENOSPC); + return; + } + let _ = f.set_len(new_size); + if new_size > old_size { + self.manager.quota.add(new_size - old_size); + } else if old_size > new_size { + self.manager.quota.sub(old_size - new_size); + } + } } + Err(e) if e.raw_os_error() == Some(libc::ENOSPC) => { + reply.error(libc::ENOSPC); + return; + } + Err(_) => {} } } if mode.is_some() @@ -1478,11 +1524,30 @@ impl Filesystem for BranchFs { _ => { // Root path (existing logic) if let Some(new_size) = size { - if let Ok(delta) = self.ensure_cow(&path) { - let file = std::fs::OpenOptions::new().write(true).open(&delta); - if let Ok(f) = file { - let _ = f.set_len(new_size); + match self.ensure_cow(&path) { + Ok(delta) => { + let file = std::fs::OpenOptions::new().write(true).open(&delta); + if let Ok(f) = file { + let old_size = f.metadata().map(|m| m.len()).unwrap_or(0); + if new_size > old_size + && self.manager.quota.check(new_size - old_size).is_err() + { + reply.error(libc::ENOSPC); + return; + } + let _ = f.set_len(new_size); + if new_size > old_size { + self.manager.quota.add(new_size - old_size); + } else if old_size > new_size { + self.manager.quota.sub(old_size - new_size); + } + } + } + Err(e) if e.raw_os_error() == Some(libc::ENOSPC) => { + reply.error(libc::ENOSPC); + return; } + Err(_) => {} } } if mode.is_some() @@ -1817,4 +1882,47 @@ impl Filesystem for BranchFs { } } } + + fn statfs(&mut self, _req: &Request, _ino: u64, reply: ReplyStatfs) { + let block_size = 4096u32; + let quota = &self.manager.quota; + + if let Some(max_bytes) = quota.max() { + let used = quota.used(); + let total_blocks = max_bytes / block_size as u64; + let used_blocks = used.min(max_bytes) / block_size as u64; + let avail_blocks = total_blocks.saturating_sub(used_blocks); + + reply.statfs( + total_blocks, // total blocks + avail_blocks, // free blocks + avail_blocks, // available blocks (to unprivileged users) + 0, // total inodes (0 = unspecified) + 0, // free inodes + block_size, // block size + 255, // max name length + block_size, // fragment size + ); + } else { + // No quota — report from the storage filesystem + let storage_path = &self.manager.storage_path; + match nix::sys::statvfs::statvfs(storage_path) { + Ok(stat) => { + reply.statfs( + stat.blocks(), + stat.blocks_free(), + stat.blocks_available(), + stat.files(), + stat.files_free(), + stat.block_size() as u32, + stat.name_max() as u32, + stat.fragment_size() as u32, + ); + } + Err(_) => { + reply.error(libc::EIO); + } + } + } + } } diff --git a/src/fs_helpers.rs b/src/fs_helpers.rs index 2159f37..f26aab1 100644 --- a/src/fs_helpers.rs +++ b/src/fs_helpers.rs @@ -54,8 +54,14 @@ impl BranchFs { if let Some(src) = self.resolve_for_branch(branch, rel_path) { if let Ok(meta) = src.symlink_metadata() { if meta.file_type().is_symlink() || meta.file_type().is_file() { + let src_size = meta.len(); + self.manager + .quota + .check(src_size) + .map_err(std::io::Error::from_raw_os_error)?; storage::copy_entry(&src, &delta) .map_err(|e| std::io::Error::other(e.to_string()))?; + self.manager.quota.add(src_size); } } } diff --git a/src/main.rs b/src/main.rs index 2ef1edc..cd36e94 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,6 +31,10 @@ enum Commands { #[arg(long)] passthrough: bool, + /// Maximum storage size for all branch deltas (e.g. "500M", "2G", bytes) + #[arg(long, value_parser = parse_size)] + max_storage: Option, + /// Mount point mountpoint: PathBuf, }, @@ -97,9 +101,32 @@ enum Commands { #[arg(long)] storage: PathBuf, + + #[arg(long)] + max_storage: Option, }, } +/// Parse a human-readable size string like "500M", "2G", "1024", "1T". +fn parse_size(s: &str) -> std::result::Result { + let s = s.trim(); + if s.is_empty() { + return Err("empty size string".to_string()); + } + let (num_str, multiplier) = match s.as_bytes().last() { + Some(b'K' | b'k') => (&s[..s.len() - 1], 1024u64), + Some(b'M' | b'm') => (&s[..s.len() - 1], 1024 * 1024), + Some(b'G' | b'g') => (&s[..s.len() - 1], 1024 * 1024 * 1024), + Some(b'T' | b't') => (&s[..s.len() - 1], 1024 * 1024 * 1024 * 1024), + _ => (s, 1), + }; + let num: u64 = num_str + .parse() + .map_err(|_| format!("invalid size: {}", s))?; + num.checked_mul(multiplier) + .ok_or_else(|| format!("size overflow: {}", s)) +} + fn get_socket_path(storage: &Path) -> PathBuf { storage.join("daemon.sock") } @@ -135,6 +162,7 @@ fn main() -> Result<()> { base, storage, passthrough, + max_storage, mountpoint, } => { if passthrough && nix::unistd::geteuid().as_raw() != 0 { @@ -149,7 +177,7 @@ fn main() -> Result<()> { let base = base.map(|b| b.canonicalize()).transpose()?; // Ensure daemon is running (auto-start if needed) - daemon::ensure_daemon(base.as_deref(), &storage) + daemon::ensure_daemon(base.as_deref(), &storage, max_storage) .map_err(|e| anyhow::anyhow!("{}", e))?; // Create mountpoint @@ -321,12 +349,16 @@ fn main() -> Result<()> { process::exit(1); } } - Commands::RunDaemon { base, storage } => { + Commands::RunDaemon { + base, + storage, + max_storage, + } => { std::fs::create_dir_all(&storage)?; let storage = storage.canonicalize()?; let base = base.canonicalize()?; - let d = daemon::Daemon::new(base.clone(), storage, base) + let d = daemon::Daemon::new(base.clone(), storage, base, max_storage) .map_err(|e| anyhow::anyhow!("Failed to create daemon: {}", e))?; d.run() .map_err(|e| anyhow::anyhow!("Daemon error: {}", e))?; diff --git a/tests/test_quota.sh b/tests/test_quota.sh new file mode 100644 index 0000000..4046ece --- /dev/null +++ b/tests/test_quota.sh @@ -0,0 +1,144 @@ +#!/bin/bash +# Test storage quota enforcement (--max-storage) + +source "$(dirname "$0")/test_helper.sh" + +# Mount with a storage quota +do_mount_with_quota() { + local quota="$1" + local extra_args=() + if [[ "$(id -u)" == "0" ]]; then + extra_args+=(--passthrough) + fi + "$BRANCHFS" mount --base "$TEST_BASE" --storage "$TEST_STORAGE" \ + --max-storage "$quota" "${extra_args[@]}" "$TEST_MNT" + sleep 0.5 +} + +test_write_within_quota() { + setup + do_mount_with_quota "1M" + do_create "quota_ok" "main" + + # Write a small file — should succeed + echo "hello" > "$TEST_MNT/small.txt" + assert_file_contains "$TEST_MNT/small.txt" "hello" "Small write within quota succeeds" + + do_unmount +} + +test_write_exceeds_quota() { + setup + do_mount_with_quota "10K" + do_create "quota_exceed" "main" + + # Write a file larger than the quota — should fail with ENOSPC + if dd if=/dev/zero of="$TEST_MNT/big.txt" bs=1K count=20 2>/dev/null; then + TESTS_RUN=$((TESTS_RUN + 1)) + TESTS_FAILED=$((TESTS_FAILED + 1)) + echo -e " ${RED}✗${NC} Large write should have failed with ENOSPC" + else + TESTS_RUN=$((TESTS_RUN + 1)) + TESTS_PASSED=$((TESTS_PASSED + 1)) + echo -e " ${GREEN}✓${NC} Large write rejected (ENOSPC)" + fi + + do_unmount +} + +test_cow_exceeds_quota() { + setup + + # Create a base file larger than the quota + dd if=/dev/urandom of="$TEST_BASE/large_base.bin" bs=1K count=50 2>/dev/null + + do_mount_with_quota "10K" + do_create "cow_exceed" "main" + + # Try to modify the large base file — COW copy should fail + if echo "modified" > "$TEST_MNT/large_base.bin" 2>/dev/null; then + TESTS_RUN=$((TESTS_RUN + 1)) + TESTS_FAILED=$((TESTS_FAILED + 1)) + echo -e " ${RED}✗${NC} COW of large file should have failed with ENOSPC" + else + TESTS_RUN=$((TESTS_RUN + 1)) + TESTS_PASSED=$((TESTS_PASSED + 1)) + echo -e " ${GREEN}✓${NC} COW copy rejected when exceeds quota" + fi + + # Original file should still be readable + assert "[[ -f '$TEST_MNT/large_base.bin' ]]" "Original file still readable" + + do_unmount +} + +test_delete_frees_quota() { + setup + do_mount_with_quota "20K" + do_create "quota_free" "main" + + # Write a file that uses most of the quota + dd if=/dev/zero of="$TEST_MNT/fill.txt" bs=1K count=15 2>/dev/null + assert_file_exists "$TEST_MNT/fill.txt" "File created using most of quota" + + # Another write should fail + if dd if=/dev/zero of="$TEST_MNT/overflow.txt" bs=1K count=10 2>/dev/null; then + TESTS_RUN=$((TESTS_RUN + 1)) + TESTS_FAILED=$((TESTS_FAILED + 1)) + echo -e " ${RED}✗${NC} Second write should have failed" + else + TESTS_RUN=$((TESTS_RUN + 1)) + TESTS_PASSED=$((TESTS_PASSED + 1)) + echo -e " ${GREEN}✓${NC} Quota full, second write rejected" + fi + + # Delete the first file to free space + rm "$TEST_MNT/fill.txt" + + # Now writing should succeed again + dd if=/dev/zero of="$TEST_MNT/after_delete.txt" bs=1K count=10 2>/dev/null + assert_file_exists "$TEST_MNT/after_delete.txt" "Write succeeds after deleting file" + + do_unmount +} + +test_statfs_reports_quota() { + setup + do_mount_with_quota "1M" + + # df should report the quota as total size + local total_k + total_k=$(df -k "$TEST_MNT" | tail -1 | awk '{print $2}') + # 1M = 1048576 bytes = 256 blocks of 4096 = 1024K + assert "[[ $total_k -le 1100 && $total_k -ge 900 ]]" \ + "statfs reports ~1M total (got ${total_k}K)" + + do_unmount +} + +test_no_quota_unlimited() { + setup + do_mount # No --max-storage + + # Should be able to write freely + dd if=/dev/zero of="$TEST_MNT/free.txt" bs=1K count=100 2>/dev/null + assert_file_exists "$TEST_MNT/free.txt" "Write without quota succeeds" + + # df should report underlying filesystem size (much larger than 1M) + local total_k + total_k=$(df -k "$TEST_MNT" | tail -1 | awk '{print $2}') + assert "[[ $total_k -gt 10000 ]]" \ + "statfs reports large total without quota (got ${total_k}K)" + + do_unmount +} + +# Run tests +run_test "Write Within Quota" test_write_within_quota +run_test "Write Exceeds Quota" test_write_exceeds_quota +run_test "COW Exceeds Quota" test_cow_exceeds_quota +run_test "Delete Frees Quota" test_delete_frees_quota +run_test "Statfs Reports Quota" test_statfs_reports_quota +run_test "No Quota Unlimited" test_no_quota_unlimited + +print_summary