Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 85 additions & 2 deletions src/branch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -13,6 +13,79 @@ use crate::error::{BranchError, Result};
use crate::inode::ROOT_INO;
use crate::storage;

/// Tracks storage usage across all branches and enforces an optional quota.
/// Only counts delta files (the actual disk cost of branching).
pub struct StorageQuota {
/// Current total bytes used in the storage directory (all branches' deltas).
used_bytes: AtomicI64,
/// Maximum allowed bytes. None = unlimited.
max_bytes: Option<u64>,
}

impl StorageQuota {
pub fn new(max_bytes: Option<u64>) -> Self {
Self {
used_bytes: AtomicI64::new(0),
max_bytes,
}
}

/// Walk the storage directory to compute initial usage.
pub fn scan_usage(storage_path: &Path, max_bytes: Option<u64>) -> Self {
let quota = Self::new(max_bytes);
let branches_dir = storage_path.join("branches");
if branches_dir.exists() {
let bytes = Self::dir_size(&branches_dir);
quota.used_bytes.store(bytes as i64, Ordering::Relaxed);
}
quota
}

fn dir_size(path: &Path) -> u64 {
let mut total = 0u64;
if let Ok(entries) = fs::read_dir(path) {
for entry in entries.flatten() {
let p = entry.path();
if p.is_dir() {
total += Self::dir_size(&p);
} else if let Ok(meta) = p.symlink_metadata() {
total += meta.len();
}
}
}
total
}

/// Check if `additional` bytes can be allocated. Returns Err(ENOSPC) if not.
pub fn check(&self, additional: u64) -> std::result::Result<(), i32> {
if let Some(max) = self.max_bytes {
let current = self.used_bytes.load(Ordering::Relaxed);
if current as u64 + additional > max {
return Err(libc::ENOSPC);
}
}
Ok(())
}

/// Record that `bytes` were added to storage.
pub fn add(&self, bytes: u64) {
self.used_bytes.fetch_add(bytes as i64, Ordering::Relaxed);
}

/// Record that `bytes` were removed from storage.
pub fn sub(&self, bytes: u64) {
self.used_bytes.fetch_sub(bytes as i64, Ordering::Relaxed);
}

pub fn used(&self) -> u64 {
self.used_bytes.load(Ordering::Relaxed).max(0) as u64
}

pub fn max(&self) -> Option<u64> {
self.max_bytes
}
}

/// Remove a file or directory at `path`, following symlinks for the type check.
/// Returns `Ok(())` even if the path doesn't exist; propagates real I/O errors.
fn remove_entry(path: &Path) -> std::io::Result<()> {
Expand Down Expand Up @@ -162,12 +235,21 @@ pub struct BranchManager {
opened_inodes: Mutex<HashMap<String, HashSet<u64>>>,
/// Current branch per mount — single source of truth
mount_branches: RwLock<HashMap<PathBuf, String>>,
/// Storage quota enforcement
pub quota: StorageQuota,
}

impl BranchManager {
pub fn new(storage_path: PathBuf, base_path: PathBuf, workspace_path: PathBuf) -> Result<Self> {
pub fn new(
storage_path: PathBuf,
base_path: PathBuf,
workspace_path: PathBuf,
max_storage: Option<u64>,
) -> Result<Self> {
fs::create_dir_all(&storage_path)?;

let quota = StorageQuota::scan_usage(&storage_path, max_storage);

// Always start fresh with just the "main" branch
let mut branches = HashMap::new();
let main_branch = Branch::new("main", None, &storage_path, 0)?;
Expand All @@ -182,6 +264,7 @@ impl BranchManager {
notifiers: Mutex::new(HashMap::new()),
opened_inodes: Mutex::new(HashMap::new()),
mount_branches: RwLock::new(HashMap::new()),
quota,
})
}

Expand Down
37 changes: 25 additions & 12 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl Daemon {
base_path: PathBuf,
storage_path: PathBuf,
_workspace_path: PathBuf,
max_storage: Option<u64>,
) -> Result<Self> {
let socket_path = storage_path.join("daemon.sock");

Expand Down Expand Up @@ -121,6 +122,7 @@ impl Daemon {
storage_path.clone(),
base_path.clone(),
base_path.clone(),
max_storage,
)?);

Ok(Self {
Expand Down Expand Up @@ -379,7 +381,11 @@ pub fn is_daemon_running(socket_path: &Path) -> bool {
UnixStream::connect(socket_path).is_ok()
}

pub fn start_daemon_background(base_path: &Path, storage_path: &Path) -> std::io::Result<()> {
pub fn start_daemon_background(
base_path: &Path,
storage_path: &Path,
max_storage: Option<u64>,
) -> std::io::Result<()> {
let socket_path = storage_path.join("daemon.sock");

// Spawn the daemon as a detached child process. The env var tells
Expand All @@ -389,15 +395,18 @@ pub fn start_daemon_background(base_path: &Path, storage_path: &Path) -> std::io
// so callers that capture output won't block.
let exe = std::env::current_exe()?;

Command::new(exe)
.args([
"run-daemon",
"--base",
&base_path.to_string_lossy(),
"--storage",
&storage_path.to_string_lossy(),
])
.stdin(Stdio::null())
let mut cmd = Command::new(exe);
cmd.args([
"run-daemon",
"--base",
&base_path.to_string_lossy(),
"--storage",
&storage_path.to_string_lossy(),
]);
if let Some(max) = max_storage {
cmd.args(["--max-storage", &max.to_string()]);
}
cmd.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()?;
Expand All @@ -416,7 +425,11 @@ pub fn start_daemon_background(base_path: &Path, storage_path: &Path) -> std::io
))
}

pub fn ensure_daemon(base_path: Option<&Path>, storage_path: &Path) -> std::io::Result<()> {
pub fn ensure_daemon(
base_path: Option<&Path>,
storage_path: &Path,
max_storage: Option<u64>,
) -> std::io::Result<()> {
let socket_path = storage_path.join("daemon.sock");

if is_daemon_running(&socket_path) {
Expand All @@ -440,5 +453,5 @@ pub fn ensure_daemon(base_path: Option<&Path>, storage_path: &Path) -> std::io::
}
};

start_daemon_background(&base_path, storage_path)
start_daemon_background(&base_path, storage_path, max_storage)
}
128 changes: 118 additions & 10 deletions src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};

use fuser::{
BackingId, FileType, Filesystem, ReplyAttr, ReplyData, ReplyDirectory, ReplyEmpty, ReplyEntry,
ReplyIoctl, ReplyOpen, ReplyWrite, Request, TimeOrNow,
ReplyIoctl, ReplyOpen, ReplyStatfs, ReplyWrite, Request, TimeOrNow,
};
use parking_lot::RwLock;

Expand Down Expand Up @@ -714,12 +714,25 @@ impl Filesystem for BranchFs {
// to the same inode (after COW is already done).
if let Some(file) = self.write_cache.get(ino, epoch) {
use std::io::{Seek, SeekFrom, Write};
// Check quota for writes that extend the file
let old_size = file.metadata().map(|m| m.len()).unwrap_or(0);
let write_end = offset as u64 + data.len() as u64;
if write_end > old_size && self.manager.quota.check(write_end - old_size).is_err() {
reply.error(libc::ENOSPC);
return;
}
if file.seek(SeekFrom::Start(offset as u64)).is_err() {
reply.error(libc::EIO);
return;
}
match file.write(data) {
Ok(n) => reply.written(n as u32),
Ok(n) => {
let new_end = offset as u64 + n as u64;
if new_end > old_size {
self.manager.quota.add(new_end - old_size);
}
reply.written(n as u32)
}
Err(_) => reply.error(libc::EIO),
}
return;
Expand Down Expand Up @@ -780,6 +793,12 @@ impl Filesystem for BranchFs {
// Serve from the just-cached write fd
if let Some(file) = self.write_cache.get(ino, epoch) {
use std::io::{Seek, SeekFrom, Write};
let old_size = file.metadata().map(|m| m.len()).unwrap_or(0);
let write_end = offset as u64 + data.len() as u64;
if write_end > old_size && self.manager.quota.check(write_end - old_size).is_err() {
reply.error(libc::ENOSPC);
return;
}
if file.seek(SeekFrom::Start(offset as u64)).is_err() {
reply.error(libc::EIO);
return;
Expand All @@ -790,6 +809,10 @@ impl Filesystem for BranchFs {
reply.error(libc::ESTALE);
return;
}
let new_end = offset as u64 + n as u64;
if new_end > old_size {
self.manager.quota.add(new_end - old_size);
}
reply.written(n as u32)
}
Err(_) => reply.error(libc::EIO),
Expand Down Expand Up @@ -1076,7 +1099,9 @@ impl Filesystem for BranchFs {
b.add_tombstone(&rel_path)?;
let delta = b.delta_path(&rel_path);
if delta.exists() {
let freed = delta.symlink_metadata().map(|m| m.len()).unwrap_or(0);
std::fs::remove_file(&delta)?;
self.manager.quota.sub(freed);
}
Ok(())
});
Expand Down Expand Up @@ -1106,7 +1131,9 @@ impl Filesystem for BranchFs {
b.add_tombstone(&path)?;
let delta = b.delta_path(&path);
if delta.exists() {
let freed = delta.symlink_metadata().map(|m| m.len()).unwrap_or(0);
std::fs::remove_file(&delta)?;
self.manager.quota.sub(freed);
}
Ok(())
});
Expand Down Expand Up @@ -1450,11 +1477,30 @@ impl Filesystem for BranchFs {
return;
}
if let Some(new_size) = size {
if let Ok(delta) = self.ensure_cow_for_branch(&branch, &rel_path) {
let file = std::fs::OpenOptions::new().write(true).open(&delta);
if let Ok(f) = file {
let _ = f.set_len(new_size);
match self.ensure_cow_for_branch(&branch, &rel_path) {
Ok(delta) => {
let file = std::fs::OpenOptions::new().write(true).open(&delta);
if let Ok(f) = file {
let old_size = f.metadata().map(|m| m.len()).unwrap_or(0);
if new_size > old_size
&& self.manager.quota.check(new_size - old_size).is_err()
{
reply.error(libc::ENOSPC);
return;
}
let _ = f.set_len(new_size);
if new_size > old_size {
self.manager.quota.add(new_size - old_size);
} else if old_size > new_size {
self.manager.quota.sub(old_size - new_size);
}
}
}
Err(e) if e.raw_os_error() == Some(libc::ENOSPC) => {
reply.error(libc::ENOSPC);
return;
}
Err(_) => {}
}
}
if mode.is_some()
Expand All @@ -1478,11 +1524,30 @@ impl Filesystem for BranchFs {
_ => {
// Root path (existing logic)
if let Some(new_size) = size {
if let Ok(delta) = self.ensure_cow(&path) {
let file = std::fs::OpenOptions::new().write(true).open(&delta);
if let Ok(f) = file {
let _ = f.set_len(new_size);
match self.ensure_cow(&path) {
Ok(delta) => {
let file = std::fs::OpenOptions::new().write(true).open(&delta);
if let Ok(f) = file {
let old_size = f.metadata().map(|m| m.len()).unwrap_or(0);
if new_size > old_size
&& self.manager.quota.check(new_size - old_size).is_err()
{
reply.error(libc::ENOSPC);
return;
}
let _ = f.set_len(new_size);
if new_size > old_size {
self.manager.quota.add(new_size - old_size);
} else if old_size > new_size {
self.manager.quota.sub(old_size - new_size);
}
}
}
Err(e) if e.raw_os_error() == Some(libc::ENOSPC) => {
reply.error(libc::ENOSPC);
return;
}
Err(_) => {}
}
}
if mode.is_some()
Expand Down Expand Up @@ -1817,4 +1882,47 @@ impl Filesystem for BranchFs {
}
}
}

fn statfs(&mut self, _req: &Request, _ino: u64, reply: ReplyStatfs) {
let block_size = 4096u32;
let quota = &self.manager.quota;

if let Some(max_bytes) = quota.max() {
let used = quota.used();
let total_blocks = max_bytes / block_size as u64;
let used_blocks = used.min(max_bytes) / block_size as u64;
let avail_blocks = total_blocks.saturating_sub(used_blocks);

reply.statfs(
total_blocks, // total blocks
avail_blocks, // free blocks
avail_blocks, // available blocks (to unprivileged users)
0, // total inodes (0 = unspecified)
0, // free inodes
block_size, // block size
255, // max name length
block_size, // fragment size
);
} else {
// No quota — report from the storage filesystem
let storage_path = &self.manager.storage_path;
match nix::sys::statvfs::statvfs(storage_path) {
Ok(stat) => {
reply.statfs(
stat.blocks(),
stat.blocks_free(),
stat.blocks_available(),
stat.files(),
stat.files_free(),
stat.block_size() as u32,
stat.name_max() as u32,
stat.fragment_size() as u32,
);
}
Err(_) => {
reply.error(libc::EIO);
}
}
}
}
}
Loading
Loading