From e11844030f674ad067705e2975ad57d84b3d2f64 Mon Sep 17 00:00:00 2001 From: dylanwongtencent Date: Tue, 5 May 2026 11:42:37 -0700 Subject: [PATCH 1/2] added distributed NATs.io support --- Cargo.lock | 333 +++++++++- Cargo.toml | 2 + crates/openfiles-core/src/config.rs | 58 ++ crates/openfiles-core/src/engine.rs | 47 ++ crates/openfiles-core/src/lib.rs | 2 +- crates/openfiles-server/Cargo.toml | 3 + crates/openfiles-server/src/distributed.rs | 702 +++++++++++++++++++++ crates/openfiles-server/src/main.rs | 81 +-- crates/openfiles-server/src/main.rs.orig | 206 ++++++ openfiles.example.toml | 10 + 10 files changed, 1397 insertions(+), 47 deletions(-) create mode 100644 crates/openfiles-server/src/distributed.rs create mode 100644 crates/openfiles-server/src/main.rs.orig diff --git a/Cargo.lock b/Cargo.lock index 9d84f9b..bd1576d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -87,6 +87,42 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "async-nats" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07d6f157065c3461096d51aacde0c326fa49f3f6e0199e204c566842cdaa5299" +dependencies = [ + "base64", + "bytes", + "futures-util", + "memchr", + "nkeys", + "nuid", + "pin-project", + "portable-atomic", + "rand 0.8.6", + "regex", + "ring", + "rustls-native-certs", + "rustls-pki-types", + "rustls-webpki", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 1.0.69", + "time", + "tokio", + "tokio-rustls", + "tokio-stream", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -363,6 +399,16 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -409,6 +455,32 @@ dependencies = [ "typenum", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dashmap" version = "6.1.0" @@ -423,6 +495,12 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "data-encoding" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8" + [[package]] name = "der" version = "0.7.10" @@ -441,6 +519,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ "powerfmt", + "serde_core", ] [[package]] @@ -475,6 +554,28 @@ dependencies = [ "const-random", ] +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2", + "signature", + "subtle", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -497,6 +598,12 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -814,7 +921,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", + "webpki-roots 1.0.7", ] [[package]] @@ -1211,6 +1318,21 @@ dependencies = [ "libc", ] +[[package]] +name = "nkeys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom 0.2.17", + "log", + "rand 0.8.6", + "signatory", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1220,6 +1342,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.6", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -1354,7 +1485,7 @@ dependencies = [ "serde_json", "sha2", "tempfile", - "thiserror", + "thiserror 2.0.18", "tokio", "toml", "tracing", @@ -1381,9 +1512,11 @@ name = "openfiles-server" version = "0.1.0" dependencies = [ "anyhow", + "async-nats", "axum", "bytes", "clap", + "futures", "openfiles-core", "serde", "serde_json", @@ -1391,6 +1524,7 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -1404,6 +1538,12 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "openssl-probe" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" + [[package]] name = "ordered-multimap" version = "0.7.3" @@ -1482,6 +1622,26 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "pin-project" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbf0d9e68100b3a7989b4901972f265cd542e560a3a8a724e1e20322f4d06ce9" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a990e22f43e84855daf260dded30524ef4a9021cc7541c26540500a50b624389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.17" @@ -1624,7 +1784,7 @@ dependencies = [ "rustc-hash", "rustls", "socket2", - "thiserror", + "thiserror 2.0.18", "tokio", "tracing", "web-time", @@ -1645,7 +1805,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror", + "thiserror 2.0.18", "tinyvec", "tracing", "web-time", @@ -1754,6 +1914,18 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + [[package]] name = "regex-automata" version = "0.4.14" @@ -1840,7 +2012,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots", + "webpki-roots 1.0.7", ] [[package]] @@ -1930,6 +2102,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pki-types" version = "1.14.1" @@ -1972,6 +2156,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "schannel" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1989,6 +2182,29 @@ dependencies = [ "sha2", ] +[[package]] +name = "security-framework" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.28" @@ -2038,6 +2254,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_path_to_error" version = "0.1.20" @@ -2049,6 +2274,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_spanned" version = "0.6.9" @@ -2117,6 +2353,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "2.2.0" @@ -2135,7 +2383,7 @@ checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" dependencies = [ "num-bigint", "num-traits", - "thiserror", + "thiserror 2.0.18", "time", ] @@ -2239,13 +2487,33 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -2370,6 +2638,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -2383,6 +2662,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-sink", + "http", + "httparse", + "rand 0.8.6", + "ring", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tokio-util", + "webpki-roots 0.26.11", +] + [[package]] name = "toml" version = "0.8.23" @@ -2539,6 +2839,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "typenum" version = "1.20.0" @@ -2772,6 +3082,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.7", +] + [[package]] name = "webpki-roots" version = "1.0.7" diff --git a/Cargo.toml b/Cargo.toml index 0519956..c0723a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ repository = "https://github.com/openfiles/openfiles" [workspace.dependencies] anyhow = "1" +async-nats = "0.47" async-trait = "0.1" axum = { version = "0.8", features = ["macros"] } bytes = { version = "1", features = ["serde"] } @@ -42,5 +43,6 @@ tower-http = { version = "0.6", features = ["trace"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } urlencoding = "2" +uuid = { version = "1", features = ["v4"] } fuser = "0.15" wit-bindgen = "0.36" diff --git a/crates/openfiles-core/src/config.rs b/crates/openfiles-core/src/config.rs index 162a7f5..ecf1b36 100644 --- a/crates/openfiles-core/src/config.rs +++ b/crates/openfiles-core/src/config.rs @@ -104,6 +104,61 @@ impl Default for SyncConfig { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NatsConfig { + #[serde(default)] + pub enabled: bool, + #[serde(default = "default_nats_url")] + pub url: String, + #[serde(default = "default_nats_subject_prefix")] + pub subject_prefix: String, + #[serde(default)] + pub queue_group: Option, + #[serde(default)] + pub instance_id: Option, + #[serde(default = "default_nats_request_timeout_ms")] + pub request_timeout_ms: u64, + #[serde(default = "default_nats_max_payload_bytes")] + pub max_payload_bytes: usize, + #[serde(default = "default_nats_publish_events")] + pub publish_events: bool, +} + +fn default_nats_url() -> String { + "nats://127.0.0.1:4222".to_string() +} + +fn default_nats_subject_prefix() -> String { + "openfiles".to_string() +} + +fn default_nats_request_timeout_ms() -> u64 { + 30_000 +} + +fn default_nats_max_payload_bytes() -> usize { + 10 * 1024 * 1024 +} + +fn default_nats_publish_events() -> bool { + true +} + +impl Default for NatsConfig { + fn default() -> Self { + Self { + enabled: false, + url: default_nats_url(), + subject_prefix: default_nats_subject_prefix(), + queue_group: None, + instance_id: None, + request_timeout_ms: default_nats_request_timeout_ms(), + max_payload_bytes: default_nats_max_payload_bytes(), + publish_events: true, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OpenFilesConfig { #[serde(default = "default_fs_id")] @@ -118,6 +173,8 @@ pub struct OpenFilesConfig { pub cache: CacheConfig, #[serde(default)] pub sync: SyncConfig, + #[serde(default)] + pub nats: NatsConfig, } fn default_fs_id() -> String { @@ -133,6 +190,7 @@ impl Default for OpenFilesConfig { backend: BackendConfig::default(), cache: CacheConfig::default(), sync: SyncConfig::default(), + nats: NatsConfig::default(), } } } diff --git a/crates/openfiles-core/src/engine.rs b/crates/openfiles-core/src/engine.rs index 6450ee7..47a4d54 100644 --- a/crates/openfiles-core/src/engine.rs +++ b/crates/openfiles-core/src/engine.rs @@ -475,6 +475,53 @@ impl OpenFilesEngine { Ok(()) } + /// Remove a clean cached path so another process can re-import the object-store copy. + /// Dirty entries are intentionally preserved to avoid losing unsynced local edits. + pub async fn invalidate_path(&self, path: &str) -> Result { + let normalized = normalize_path(path)?; + if normalized.is_empty() { + return Ok(false); + } + + match self.cache.get(&normalized) { + Some(entry) if entry.dirty => Ok(false), + Some(_) => { + self.cache.remove_entry(&normalized).await?; + Ok(true) + } + None => Ok(false), + } + } + + /// Remove clean cache entries at or below a prefix. + /// + /// This is used by distributed cache-invalidation events after a remote + /// instance mutates a file tree. It never deletes dirty local entries. + pub async fn invalidate_prefix(&self, path: &str) -> Result { + let normalized = normalize_path(path)?; + let prefix = dir_prefix(&normalized); + let mut removed = 0usize; + + for entry in self.cache.iter_entries() { + if entry.dirty { + continue; + } + + let matches = if normalized.is_empty() { + !entry.path.is_empty() + } else { + entry.path == normalized || entry.path.starts_with(&prefix) + }; + + if matches { + self.cache.remove_entry(&entry.path).await?; + removed += 1; + } + } + + Ok(removed) + } + pub async fn flush(&self) -> Result { let dirty = self.cache.dirty_entries(); let mut ok = 0usize; diff --git a/crates/openfiles-core/src/lib.rs b/crates/openfiles-core/src/lib.rs index b5e7576..cc45908 100644 --- a/crates/openfiles-core/src/lib.rs +++ b/crates/openfiles-core/src/lib.rs @@ -16,7 +16,7 @@ pub mod vendor; pub use backend::{LocalFsBackend, ObjectBackend, ObjectMeta, ObjectVersion}; pub use cache::{Cache, CacheEntry}; -pub use config::{BackendConfig, OpenFilesConfig, ProviderKind}; +pub use config::{BackendConfig, NatsConfig, OpenFilesConfig, ProviderKind}; pub use engine::OpenFilesEngine; pub use error::{OpenFilesError, Result}; pub use types::{DirEntry, FileKind, FileStat, ImportDataRule, ImportTrigger}; diff --git a/crates/openfiles-server/Cargo.toml b/crates/openfiles-server/Cargo.toml index 6ee2417..40dfbf0 100644 --- a/crates/openfiles-server/Cargo.toml +++ b/crates/openfiles-server/Cargo.toml @@ -10,9 +10,11 @@ path = "src/main.rs" [dependencies] anyhow.workspace = true +async-nats.workspace = true axum.workspace = true bytes.workspace = true clap.workspace = true +futures.workspace = true openfiles-core = { path = "../openfiles-core", features = ["opendal-backend"] } serde = { workspace = true, features = ["derive"] } serde_json.workspace = true @@ -20,3 +22,4 @@ tokio.workspace = true tower-http.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +uuid.workspace = true diff --git a/crates/openfiles-server/src/distributed.rs b/crates/openfiles-server/src/distributed.rs new file mode 100644 index 0000000..34eef14 --- /dev/null +++ b/crates/openfiles-server/src/distributed.rs @@ -0,0 +1,702 @@ +use anyhow::{anyhow, Context}; +use bytes::Bytes; +use futures::StreamExt; +use openfiles_core::{DirEntry, FileStat, OpenFilesConfig, OpenFilesEngine, OpenFilesError}; +use serde::{Deserialize, Serialize}; +use std::{fmt, sync::Arc, time::Duration}; + +#[derive(Debug)] +pub enum DistributedError { + OpenFiles(OpenFilesError), + Transport(anyhow::Error), +} + +impl fmt::Display for DistributedError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::OpenFiles(err) => write!(f, "{err}"), + Self::Transport(err) => write!(f, "distributed transport error: {err}"), + } + } +} + +impl std::error::Error for DistributedError {} + +impl From for DistributedError { + fn from(value: OpenFilesError) -> Self { + Self::OpenFiles(value) + } +} + +impl From for DistributedError { + fn from(value: anyhow::Error) -> Self { + Self::Transport(value) + } +} + +pub type Result = std::result::Result; + +#[derive(Clone)] +pub enum DistributedOpenFiles { + Local(Arc), + Nats(Arc), +} + +impl DistributedOpenFiles { + pub async fn new(engine: Arc, config: &OpenFilesConfig) -> Result { + if !config.nats.enabled { + return Ok(Self::Local(engine)); + } + + let nats = NatsOpenFiles::connect(engine, config).await?; + Ok(Self::Nats(Arc::new(nats))) + } + + pub async fn stat(&self, path: &str) -> Result { + match self { + Self::Local(engine) => Ok(engine.stat(path).await?), + Self::Nats(nats) => nats.stat(path).await, + } + } + + pub async fn list_dir(&self, path: &str) -> Result> { + match self { + Self::Local(engine) => Ok(engine.list_dir(path).await?), + Self::Nats(nats) => nats.list_dir(path).await, + } + } + + pub async fn read_range(&self, path: &str, offset: u64, len: u64) -> Result { + match self { + Self::Local(engine) => Ok(engine.read_range(path, offset, len).await?), + + // Important: + // Reads are intentionally local even when NATS is enabled. + // The shared object backend is the source of truth, while cache files are per process. + // Routing reads through a NATS queue group can hit another worker whose cache metadata + // references a cache object that does not exist on that specific process. + Self::Nats(nats) => Ok(nats.engine.read_range(path, offset, len).await?), + } + } + + pub async fn read_all(&self, path: &str) -> Result { + match self { + Self::Local(engine) => Ok(engine.read_all(path).await?), + + // Same as read_range: keep reads local to avoid cross-process cache-file mismatch. + Self::Nats(nats) => Ok(nats.engine.read_all(path).await?), + } + } + + pub async fn write_file(&self, path: &str, data: Bytes) -> Result<()> { + match self { + Self::Local(engine) => Ok(engine.write_file(path, data).await?), + Self::Nats(nats) => nats.write_file(path, data).await, + } + } + + pub async fn delete_path(&self, path: &str) -> Result<()> { + match self { + Self::Local(engine) => Ok(engine.delete_path(path).await?), + Self::Nats(nats) => nats.delete_path(path).await, + } + } + + pub async fn rename_path(&self, from: &str, to: &str) -> Result<()> { + match self { + Self::Local(engine) => Ok(engine.rename_path(from, to).await?), + Self::Nats(nats) => nats.rename_path(from, to).await, + } + } + + pub async fn flush(&self) -> Result { + match self { + Self::Local(engine) => Ok(engine.flush().await?), + Self::Nats(nats) => nats.flush().await, + } + } + + pub async fn expire_cache(&self) -> Result { + match self { + Self::Local(engine) => Ok(engine.expire_cache().await?), + Self::Nats(nats) => nats.expire_cache().await, + } + } +} + +#[derive(Clone)] +pub struct NatsOpenFiles { + engine: Arc, + client: async_nats::Client, + subjects: Subjects, + request_timeout: Duration, + max_payload_bytes: usize, +} + +impl NatsOpenFiles { + pub async fn connect(engine: Arc, config: &OpenFilesConfig) -> Result { + let nats = &config.nats; + let client = async_nats::connect(nats.url.as_str()) + .await + .map_err(|err| anyhow!(err)) + .with_context(|| format!("failed to connect to NATS at {}", nats.url))?; + + let subjects = Subjects::new(config); + let queue_group = nats + .queue_group + .clone() + .unwrap_or_else(|| format!("{}.{}.workers", subjects.prefix, subjects.fs_token)); + + let instance_id = nats + .instance_id + .clone() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + + let request_timeout = Duration::from_millis(nats.request_timeout_ms.max(1)); + let max_payload_bytes = nats.max_payload_bytes.max(1024); + + spawn_worker( + client.clone(), + engine.clone(), + subjects.clone(), + queue_group, + instance_id.clone(), + max_payload_bytes, + nats.publish_events, + ) + .await?; + + spawn_event_listener( + client.clone(), + engine.clone(), + subjects.clone(), + instance_id, + ) + .await?; + + tracing::info!( + work_subject = %subjects.work, + event_subject = %subjects.events, + "NATS distribution enabled" + ); + + Ok(Self { + engine, + client, + subjects, + request_timeout, + max_payload_bytes, + }) + } + + pub async fn stat(&self, path: &str) -> Result { + match self + .request(WorkRequest::Stat { + path: path.to_string(), + }) + .await? + { + WorkResponse::Stat { stat } => Ok(stat), + other => Err(unexpected_response(other)), + } + } + + pub async fn list_dir(&self, path: &str) -> Result> { + match self + .request(WorkRequest::ListDir { + path: path.to_string(), + }) + .await? + { + WorkResponse::List { entries } => Ok(entries), + other => Err(unexpected_response(other)), + } + } + + pub async fn read_range(&self, path: &str, offset: u64, len: u64) -> Result { + match self + .request(WorkRequest::ReadRange { + path: path.to_string(), + offset, + len, + }) + .await? + { + WorkResponse::Bytes { data } => Ok(data), + other => Err(unexpected_response(other)), + } + } + + pub async fn read_all(&self, path: &str) -> Result { + match self + .request(WorkRequest::ReadAll { + path: path.to_string(), + }) + .await? + { + WorkResponse::Bytes { data } => Ok(data), + other => Err(unexpected_response(other)), + } + } + + pub async fn write_file(&self, path: &str, data: Bytes) -> Result<()> { + match self + .request(WorkRequest::WriteFile { + path: path.to_string(), + data, + }) + .await? + { + WorkResponse::Unit => Ok(()), + other => Err(unexpected_response(other)), + } + } + + pub async fn delete_path(&self, path: &str) -> Result<()> { + match self + .request(WorkRequest::DeletePath { + path: path.to_string(), + }) + .await? + { + WorkResponse::Unit => Ok(()), + other => Err(unexpected_response(other)), + } + } + + pub async fn rename_path(&self, from: &str, to: &str) -> Result<()> { + match self + .request(WorkRequest::RenamePath { + from: from.to_string(), + to: to.to_string(), + }) + .await? + { + WorkResponse::Unit => Ok(()), + other => Err(unexpected_response(other)), + } + } + + pub async fn flush(&self) -> Result { + match self.request(WorkRequest::Flush).await? { + WorkResponse::Usize { value } => Ok(value), + other => Err(unexpected_response(other)), + } + } + + pub async fn expire_cache(&self) -> Result { + match self.request(WorkRequest::ExpireCache).await? { + WorkResponse::U64 { value } => Ok(value), + other => Err(unexpected_response(other)), + } + } + + async fn request(&self, request: WorkRequest) -> Result { + let payload = serde_json::to_vec(&request).map_err(OpenFilesError::from)?; + + if payload.len() > self.max_payload_bytes { + return Err(OpenFilesError::Unsupported(format!( + "NATS payload is {} bytes, above configured max {} bytes", + payload.len(), + self.max_payload_bytes + )) + .into()); + } + + let response = tokio::time::timeout( + self.request_timeout, + self.client + .request(self.subjects.work.clone(), Bytes::from(payload)), + ) + .await + .map_err(|_| anyhow!("NATS request timed out after {:?}", self.request_timeout))? + .map_err(|err| anyhow!(err))?; + + let response: WorkResponse = + serde_json::from_slice(&response.payload).map_err(OpenFilesError::from)?; + + match response { + WorkResponse::Error { code, detail } => Err(error_from_wire(&code, detail).into()), + other => Ok(other), + } + } +} + +#[derive(Clone)] +struct Subjects { + prefix: String, + fs_token: String, + work: String, + events: String, +} + +impl Subjects { + fn new(config: &OpenFilesConfig) -> Self { + let prefix = clean_subject_prefix(&config.nats.subject_prefix); + let fs_token = subject_token(&config.fs_id); + + Self { + work: format!("{prefix}.{fs_token}.work"), + events: format!("{prefix}.{fs_token}.events"), + prefix, + fs_token, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "op", rename_all = "kebab-case")] +enum WorkRequest { + Stat { path: String }, + ListDir { path: String }, + ReadAll { path: String }, + ReadRange { path: String, offset: u64, len: u64 }, + WriteFile { path: String, data: Bytes }, + DeletePath { path: String }, + RenamePath { from: String, to: String }, + Flush, + ExpireCache, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "kebab-case")] +enum WorkResponse { + Unit, + Stat { stat: FileStat }, + List { entries: Vec }, + Bytes { data: Bytes }, + Usize { value: usize }, + U64 { value: u64 }, + Error { code: String, detail: String }, +} + +#[derive(Debug, Serialize, Deserialize)] +struct MutationEvent { + origin: String, + kind: String, + paths: Vec, + prefixes: Vec, +} + +async fn spawn_worker( + client: async_nats::Client, + engine: Arc, + subjects: Subjects, + queue_group: String, + instance_id: String, + max_payload_bytes: usize, + publish_events: bool, +) -> Result<()> { + let mut subscription = client + .queue_subscribe(subjects.work.clone(), queue_group.clone()) + .await + .map_err(|err| anyhow!(err)) + .with_context(|| format!("failed to subscribe to NATS queue group {queue_group}"))?; + + tokio::spawn(async move { + while let Some(message) = subscription.next().await { + let Some(reply) = message.reply.clone() else { + continue; + }; + + let response = match serde_json::from_slice::(&message.payload) { + Ok(request) => { + let event = mutation_event_for_request(&request, &instance_id); + let response = execute_work(engine.clone(), request).await; + + if publish_events && !matches!(response, WorkResponse::Error { .. }) { + if let Some(event) = event { + if let Err(err) = publish_event(&client, &subjects.events, &event).await + { + tracing::warn!( + error = %err, + "failed to publish NATS mutation event" + ); + } + } + } + + response + } + Err(err) => WorkResponse::Error { + code: "json".to_string(), + detail: err.to_string(), + }, + }; + + let payload = encode_response(response, max_payload_bytes); + + if let Err(err) = client.publish(reply, Bytes::from(payload)).await { + tracing::warn!(error = %err, "failed to send NATS work response"); + } + } + }); + + Ok(()) +} + +async fn spawn_event_listener( + client: async_nats::Client, + engine: Arc, + subjects: Subjects, + instance_id: String, +) -> Result<()> { + let mut subscription = client + .subscribe(subjects.events.clone()) + .await + .map_err(|err| anyhow!(err)) + .with_context(|| { + format!( + "failed to subscribe to NATS events subject {}", + subjects.events + ) + })?; + + tokio::spawn(async move { + while let Some(message) = subscription.next().await { + match serde_json::from_slice::(&message.payload) { + Ok(event) if event.origin == instance_id => {} + + Ok(event) => { + for path in &event.paths { + match engine.invalidate_path(path).await { + Ok(true) => tracing::debug!( + path = %path, + kind = %event.kind, + "invalidated cached path" + ), + Ok(false) => {} + Err(err) => tracing::warn!( + path = %path, + error = %err, + "failed to invalidate cached path" + ), + } + } + + for prefix in &event.prefixes { + match engine.invalidate_prefix(prefix).await { + Ok(count) if count > 0 => tracing::debug!( + prefix = %prefix, + count, + kind = %event.kind, + "invalidated cached prefix" + ), + Ok(_) => {} + Err(err) => tracing::warn!( + prefix = %prefix, + error = %err, + "failed to invalidate cached prefix" + ), + } + } + } + + Err(err) => { + tracing::warn!(error = %err, "failed to decode NATS mutation event"); + } + } + } + }); + + Ok(()) +} + +async fn execute_work(engine: Arc, request: WorkRequest) -> WorkResponse { + let result = match request { + WorkRequest::Stat { path } => engine + .stat(&path) + .await + .map(|stat| WorkResponse::Stat { stat }), + + WorkRequest::ListDir { path } => engine + .list_dir(&path) + .await + .map(|entries| WorkResponse::List { entries }), + + WorkRequest::ReadAll { path } => engine + .read_all(&path) + .await + .map(|data| WorkResponse::Bytes { data }), + + WorkRequest::ReadRange { path, offset, len } => engine + .read_range(&path, offset, len) + .await + .map(|data| WorkResponse::Bytes { data }), + + WorkRequest::WriteFile { path, data } => match engine.write_file(&path, data).await { + Ok(()) => engine.flush().await.map(|_| WorkResponse::Unit), + Err(err) => Err(err), + }, + + WorkRequest::DeletePath { path } => match engine.delete_path(&path).await { + Ok(()) => engine.flush().await.map(|_| WorkResponse::Unit), + Err(err) => Err(err), + }, + + WorkRequest::RenamePath { from, to } => match engine.rename_path(&from, &to).await { + Ok(()) => engine.flush().await.map(|_| WorkResponse::Unit), + Err(err) => Err(err), + }, + + WorkRequest::Flush => engine + .flush() + .await + .map(|value| WorkResponse::Usize { value }), + + WorkRequest::ExpireCache => engine + .expire_cache() + .await + .map(|value| WorkResponse::U64 { value }), + }; + + match result { + Ok(response) => response, + Err(err) => WorkResponse::Error { + code: error_code(&err).to_string(), + detail: error_detail(&err), + }, + } +} + +fn mutation_event_for_request(request: &WorkRequest, origin: &str) -> Option { + match request { + WorkRequest::WriteFile { path, .. } => Some(MutationEvent { + origin: origin.to_string(), + kind: "write".to_string(), + paths: vec![path.clone()], + prefixes: Vec::new(), + }), + + WorkRequest::DeletePath { path } => Some(MutationEvent { + origin: origin.to_string(), + kind: "delete".to_string(), + paths: vec![path.clone()], + prefixes: Vec::new(), + }), + + WorkRequest::RenamePath { from, to } => Some(MutationEvent { + origin: origin.to_string(), + kind: "rename".to_string(), + paths: Vec::new(), + prefixes: vec![from.clone(), to.clone()], + }), + + _ => None, + } +} + +async fn publish_event( + client: &async_nats::Client, + subject: &str, + event: &MutationEvent, +) -> anyhow::Result<()> { + let payload = serde_json::to_vec(event)?; + + client + .publish(subject.to_string(), Bytes::from(payload)) + .await + .map_err(|err| anyhow!(err))?; + + Ok(()) +} + +fn encode_response(response: WorkResponse, max_payload_bytes: usize) -> Vec { + match serde_json::to_vec(&response) { + Ok(payload) if payload.len() <= max_payload_bytes => payload, + + Ok(payload) => serde_json::to_vec(&WorkResponse::Error { + code: "unsupported".to_string(), + detail: format!( + "NATS response is {} bytes, above configured max {} bytes", + payload.len(), + max_payload_bytes + ), + }) + .expect("serializing fixed error response should not fail"), + + Err(err) => serde_json::to_vec(&WorkResponse::Error { + code: "json".to_string(), + detail: err.to_string(), + }) + .expect("serializing fixed error response should not fail"), + } +} + +fn unexpected_response(response: WorkResponse) -> DistributedError { + DistributedError::Transport(anyhow!("unexpected NATS work response: {response:?}")) +} + +fn clean_subject_prefix(prefix: &str) -> String { + let prefix = prefix.trim_matches('.'); + + if prefix.is_empty() { + "openfiles".to_string() + } else { + prefix + .split('.') + .map(subject_token) + .collect::>() + .join(".") + } +} + +fn subject_token(token: &str) -> String { + let cleaned: String = token + .chars() + .map(|ch| { + if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') { + ch + } else { + '_' + } + }) + .collect(); + + if cleaned.is_empty() { + "default".to_string() + } else { + cleaned + } +} + +fn error_code(err: &OpenFilesError) -> &'static str { + match err { + OpenFilesError::InvalidPath(_) => "invalid-path", + OpenFilesError::NotFound(_) => "not-found", + OpenFilesError::Conflict(_) => "conflict", + OpenFilesError::Unsupported(_) => "unsupported", + OpenFilesError::Storage(_) => "storage", + OpenFilesError::Io(_) => "io", + OpenFilesError::Json(_) => "json", + OpenFilesError::Toml(_) => "toml", + OpenFilesError::Internal(_) => "internal", + } +} + +fn error_detail(err: &OpenFilesError) -> String { + match err { + OpenFilesError::InvalidPath(detail) + | OpenFilesError::NotFound(detail) + | OpenFilesError::Conflict(detail) + | OpenFilesError::Unsupported(detail) + | OpenFilesError::Storage(detail) + | OpenFilesError::Internal(detail) => detail.clone(), + + OpenFilesError::Io(err) => err.to_string(), + OpenFilesError::Json(err) => err.to_string(), + OpenFilesError::Toml(err) => err.to_string(), + } +} + +fn error_from_wire(code: &str, detail: String) -> OpenFilesError { + match code { + "invalid-path" => OpenFilesError::InvalidPath(detail), + "not-found" => OpenFilesError::NotFound(detail), + "conflict" => OpenFilesError::Conflict(detail), + "unsupported" => OpenFilesError::Unsupported(detail), + "storage" => OpenFilesError::Storage(detail), + "internal" => OpenFilesError::Internal(detail), + other => OpenFilesError::Internal(format!("remote {other} error: {detail}")), + } +} diff --git a/crates/openfiles-server/src/main.rs b/crates/openfiles-server/src/main.rs index cf94028..1045271 100644 --- a/crates/openfiles-server/src/main.rs +++ b/crates/openfiles-server/src/main.rs @@ -8,15 +8,18 @@ use axum::{ Json, Router, }; use clap::Parser; +use distributed::{DistributedError, DistributedOpenFiles}; use openfiles_core::{ sync::{spawn_background_sync, BackgroundSyncConfig}, vendor::build_backend, - OpenFilesConfig, OpenFilesEngine, + OpenFilesConfig, OpenFilesEngine, OpenFilesError, }; use serde::Deserialize; use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use tower_http::trace::TraceLayer; +mod distributed; + #[derive(Debug, Parser)] struct Args { #[arg(short, long, default_value = "openfiles.toml")] @@ -27,20 +30,23 @@ struct Args { #[derive(Clone)] struct AppState { - engine: Arc, + engine: DistributedOpenFiles, } #[derive(Debug)] -struct ApiError(openfiles_core::OpenFilesError); +struct ApiError(DistributedError); impl IntoResponse for ApiError { fn into_response(self) -> Response { let status = match &self.0 { - openfiles_core::OpenFilesError::NotFound(_) => StatusCode::NOT_FOUND, - openfiles_core::OpenFilesError::Conflict(_) => StatusCode::CONFLICT, - openfiles_core::OpenFilesError::InvalidPath(_) => StatusCode::BAD_REQUEST, - openfiles_core::OpenFilesError::Unsupported(_) => StatusCode::NOT_IMPLEMENTED, - _ => StatusCode::INTERNAL_SERVER_ERROR, + DistributedError::OpenFiles(OpenFilesError::NotFound(_)) => StatusCode::NOT_FOUND, + DistributedError::OpenFiles(OpenFilesError::Conflict(_)) => StatusCode::CONFLICT, + DistributedError::OpenFiles(OpenFilesError::InvalidPath(_)) => StatusCode::BAD_REQUEST, + DistributedError::OpenFiles(OpenFilesError::Unsupported(_)) => { + StatusCode::NOT_IMPLEMENTED + } + DistributedError::OpenFiles(_) => StatusCode::INTERNAL_SERVER_ERROR, + DistributedError::Transport(_) => StatusCode::BAD_GATEWAY, }; ( status, @@ -50,8 +56,8 @@ impl IntoResponse for ApiError { } } -impl From for ApiError { - fn from(value: openfiles_core::OpenFilesError) -> Self { +impl From for ApiError { + fn from(value: DistributedError) -> Self { Self(value) } } @@ -76,28 +82,20 @@ fn slash(path: String) -> String { } } -async fn stat( - State(state): State, - Path(path): Path, -) -> ApiResult> { - Ok(Json(state.engine.stat(&slash(path)).await?)) +async fn stat(State(state): State, Path(path): Path) -> ApiResult { + Ok(Json(state.engine.stat(&slash(path)).await?).into_response()) } -async fn stat_root(State(state): State) -> ApiResult> { - Ok(Json(state.engine.stat("/").await?)) +async fn stat_root(State(state): State) -> ApiResult { + Ok(Json(state.engine.stat("/").await?).into_response()) } -async fn list( - State(state): State, - Path(path): Path, -) -> ApiResult>> { - Ok(Json(state.engine.list_dir(&slash(path)).await?)) +async fn list(State(state): State, Path(path): Path) -> ApiResult { + Ok(Json(state.engine.list_dir(&slash(path)).await?).into_response()) } -async fn list_root( - State(state): State, -) -> ApiResult>> { - Ok(Json(state.engine.list_dir("/").await?)) +async fn list_root(State(state): State) -> ApiResult { + Ok(Json(state.engine.list_dir("/").await?).into_response()) } async fn read_file( @@ -117,19 +115,19 @@ async fn write_file( State(state): State, Path(path): Path, body: Bytes, -) -> ApiResult> { +) -> ApiResult { let p = slash(path); state.engine.write_file(&p, body).await?; - Ok(Json(serde_json::json!({ "ok": true, "path": p }))) + Ok(Json(serde_json::json!({ "ok": true, "path": p })).into_response()) } async fn delete_file( State(state): State, Path(path): Path, -) -> ApiResult> { +) -> ApiResult { let p = slash(path); state.engine.delete_path(&p).await?; - Ok(Json(serde_json::json!({ "ok": true, "path": p }))) + Ok(Json(serde_json::json!({ "ok": true, "path": p })).into_response()) } #[derive(Debug, Deserialize)] @@ -141,21 +139,24 @@ struct RenameBody { async fn rename( State(state): State, Json(body): Json, -) -> ApiResult> { +) -> ApiResult { state.engine.rename_path(&body.from, &body.to).await?; - Ok(Json( - serde_json::json!({ "ok": true, "from": body.from, "to": body.to }), - )) + Ok(Json(serde_json::json!({ + "ok": true, + "from": body.from, + "to": body.to + })) + .into_response()) } -async fn flush(State(state): State) -> ApiResult> { +async fn flush(State(state): State) -> ApiResult { let n = state.engine.flush().await?; - Ok(Json(serde_json::json!({ "flushed": n }))) + Ok(Json(serde_json::json!({ "flushed": n })).into_response()) } -async fn expire(State(state): State) -> ApiResult> { +async fn expire(State(state): State) -> ApiResult { let n = state.engine.expire_cache().await?; - Ok(Json(serde_json::json!({ "expired": n }))) + Ok(Json(serde_json::json!({ "expired": n })).into_response()) } #[tokio::main] @@ -181,8 +182,10 @@ async fn main() -> Result<()> { }, ); + let engine = Arc::new(engine); + let distributed = DistributedOpenFiles::new(engine, &config).await?; let state = AppState { - engine: Arc::new(engine), + engine: distributed, }; let app = Router::new() .route("/healthz", get(health)) diff --git a/crates/openfiles-server/src/main.rs.orig b/crates/openfiles-server/src/main.rs.orig new file mode 100644 index 0000000..cf94028 --- /dev/null +++ b/crates/openfiles-server/src/main.rs.orig @@ -0,0 +1,206 @@ +use anyhow::{Context, Result}; +use axum::{ + body::Bytes, + extract::{Path, Query, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::{delete, get, post, put}, + Json, Router, +}; +use clap::Parser; +use openfiles_core::{ + sync::{spawn_background_sync, BackgroundSyncConfig}, + vendor::build_backend, + OpenFilesConfig, OpenFilesEngine, +}; +use serde::Deserialize; +use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use tower_http::trace::TraceLayer; + +#[derive(Debug, Parser)] +struct Args { + #[arg(short, long, default_value = "openfiles.toml")] + config: PathBuf, + #[arg(long, default_value = "127.0.0.1:8787")] + listen: SocketAddr, +} + +#[derive(Clone)] +struct AppState { + engine: Arc, +} + +#[derive(Debug)] +struct ApiError(openfiles_core::OpenFilesError); + +impl IntoResponse for ApiError { + fn into_response(self) -> Response { + let status = match &self.0 { + openfiles_core::OpenFilesError::NotFound(_) => StatusCode::NOT_FOUND, + openfiles_core::OpenFilesError::Conflict(_) => StatusCode::CONFLICT, + openfiles_core::OpenFilesError::InvalidPath(_) => StatusCode::BAD_REQUEST, + openfiles_core::OpenFilesError::Unsupported(_) => StatusCode::NOT_IMPLEMENTED, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + ( + status, + Json(serde_json::json!({ "error": self.0.to_string() })), + ) + .into_response() + } +} + +impl From for ApiError { + fn from(value: openfiles_core::OpenFilesError) -> Self { + Self(value) + } +} + +type ApiResult = std::result::Result; + +#[derive(Debug, Deserialize)] +struct ReadQuery { + offset: Option, + len: Option, +} + +async fn health() -> &'static str { + "ok" +} + +fn slash(path: String) -> String { + if path.is_empty() { + "/".to_string() + } else { + format!("/{path}") + } +} + +async fn stat( + State(state): State, + Path(path): Path, +) -> ApiResult> { + Ok(Json(state.engine.stat(&slash(path)).await?)) +} + +async fn stat_root(State(state): State) -> ApiResult> { + Ok(Json(state.engine.stat("/").await?)) +} + +async fn list( + State(state): State, + Path(path): Path, +) -> ApiResult>> { + Ok(Json(state.engine.list_dir(&slash(path)).await?)) +} + +async fn list_root( + State(state): State, +) -> ApiResult>> { + Ok(Json(state.engine.list_dir("/").await?)) +} + +async fn read_file( + State(state): State, + Path(path): Path, + Query(query): Query, +) -> ApiResult { + let p = slash(path); + let bytes = match (query.offset, query.len) { + (Some(offset), Some(len)) => state.engine.read_range(&p, offset, len).await?, + _ => state.engine.read_all(&p).await?, + }; + Ok((StatusCode::OK, bytes).into_response()) +} + +async fn write_file( + State(state): State, + Path(path): Path, + body: Bytes, +) -> ApiResult> { + let p = slash(path); + state.engine.write_file(&p, body).await?; + Ok(Json(serde_json::json!({ "ok": true, "path": p }))) +} + +async fn delete_file( + State(state): State, + Path(path): Path, +) -> ApiResult> { + let p = slash(path); + state.engine.delete_path(&p).await?; + Ok(Json(serde_json::json!({ "ok": true, "path": p }))) +} + +#[derive(Debug, Deserialize)] +struct RenameBody { + from: String, + to: String, +} + +async fn rename( + State(state): State, + Json(body): Json, +) -> ApiResult> { + state.engine.rename_path(&body.from, &body.to).await?; + Ok(Json( + serde_json::json!({ "ok": true, "from": body.from, "to": body.to }), + )) +} + +async fn flush(State(state): State) -> ApiResult> { + let n = state.engine.flush().await?; + Ok(Json(serde_json::json!({ "flushed": n }))) +} + +async fn expire(State(state): State) -> ApiResult> { + let n = state.engine.expire_cache().await?; + Ok(Json(serde_json::json!({ "expired": n }))) +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + std::env::var("RUST_LOG") + .unwrap_or_else(|_| "openfiles_server=info,tower_http=info".to_string()), + ) + .init(); + + let args = Args::parse(); + let config = OpenFilesConfig::from_toml_file(&args.config) + .with_context(|| format!("failed to load {}", args.config.display()))?; + let backend = build_backend(&config.backend)?; + let engine = OpenFilesEngine::new(config.clone(), backend).await?; + let flush_interval = Duration::from_secs(config.sync.export_batch_window_secs.max(1)); + let _sync = spawn_background_sync( + engine.clone(), + BackgroundSyncConfig { + flush_interval, + ..Default::default() + }, + ); + + let state = AppState { + engine: Arc::new(engine), + }; + let app = Router::new() + .route("/healthz", get(health)) + .route("/v1/stat", get(stat_root)) + .route("/v1/stat/{*path}", get(stat)) + .route("/v1/list", get(list_root)) + .route("/v1/list/{*path}", get(list)) + .route("/v1/read/{*path}", get(read_file)) + .route("/v1/write/{*path}", put(write_file)) + .route("/v1/delete/{*path}", delete(delete_file)) + .route("/v1/rename", post(rename)) + .route("/v1/flush", post(flush)) + .route("/v1/expire", post(expire)) + .layer(TraceLayer::new_for_http()) + .with_state(state); + + tracing::info!(listen=%args.listen, "openfiles HTTP server listening"); + let listener = tokio::net::TcpListener::bind(args.listen).await?; + axum::serve(listener, app).await?; + Ok(()) +} diff --git a/openfiles.example.toml b/openfiles.example.toml index ce95107..15c088a 100644 --- a/openfiles.example.toml +++ b/openfiles.example.toml @@ -21,3 +21,13 @@ size_less_than = 131072 [sync.expiration] days_after_last_access = 30 + +[nats] +enabled = false +url = "nats://127.0.0.1:4222" +subject_prefix = "openfiles" +# queue_group defaults to "..workers" when omitted. +# instance_id defaults to a random UUID when omitted. +request_timeout_ms = 30000 +max_payload_bytes = 10485760 +publish_events = true From 6bf568f3636a4280092d31ffef4ad845a91959d2 Mon Sep 17 00:00:00 2001 From: dylanwongtencent Date: Tue, 5 May 2026 11:48:56 -0700 Subject: [PATCH 2/2] updated readme for distributed nats.io --- README.md | 554 ++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 501 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index fedb5b6..3769d0b 100644 --- a/README.md +++ b/README.md @@ -2,20 +2,72 @@ OpenS3Files is an open source, vendor-neutral object-backed shared file system standard and Rust implementation inspired by the public behavior of Amazon S3 Files. -It gives file-oriented applications, agents, scripts, and WebAssembly components a POSIX-like filesystem view over object stores while keeping the object store as the durable source of truth. +It gives file-oriented applications, agents, scripts, HTTP services, FUSE mounts, and WebAssembly components a POSIX-like filesystem view over object stores while keeping the object store as the durable source of truth. -> **Not affiliated with AWS.** OpenFiles does not use AWS proprietary code or private APIs. It implements public, observable semantics: object store as source-of-truth, high-performance local cache, lazy import, batched export, conflict lost+found, POSIX metadata sidecars, and non-atomic object rename behavior. +> **Not affiliated with AWS.** OpenS3Files / OpenFiles does not use AWS proprietary code or private APIs. It implements public, observable semantics: object store as source-of-truth, high-performance local cache, lazy import, batched export, conflict lost+found, POSIX metadata sidecars, and non-atomic object rename behavior. + +--- ## What is in this repository - **OpenFiles Standard (OFSS)**: a written standard for object-backed file semantics, metadata, sync rules, conflict handling, and vendor requirements. - **Rust core engine**: shared cache, lazy import, range reads, sync/export worker, conflict detection, POSIX metadata encoding, directory inference, and vendor abstraction. -- **Vendor adapters via Apache OpenDAL**: AWS S3, GCP Cloud Storage, Azure Blob, Vercel Blob, Storj Gateway, MinIO, and NetApp StorageGRID/S3-compatible buckets. +- **Vendor adapters via Apache OpenDAL**: AWS S3, GCP Cloud Storage, Azure Blob, Vercel Blob, Storj Gateway, MinIO, Tencent COS, and NetApp StorageGRID/S3-compatible buckets. +- **Optional NATS distribution layer**: distribute mutating file operations across multiple OpenFiles server instances using NATS queue groups, plus mutation-event cache invalidation. - **wasmCloud Rust integration**: a v2-friendly pattern where OpenFiles exposes a host volume that wasmCloud components consume through `wasi:filesystem` preopens, plus a custom WIT interface for direct file operations. - **CLI and HTTP server**: local development, smoke testing, and non-FUSE access. - **Optional FUSE mount**: Linux/macOS developer experience for normal file APIs. - **Examples/bindings**: Python, Node, Go, Rust, and wasmCloud component examples. +--- + +## Architecture + +```text + ┌──────────────────────────────┐ + │ File-oriented apps / agents │ + │ CLI / HTTP / FUSE / WASI │ + └───────────────┬──────────────┘ + │ + ▼ + ┌──────────────────────────────┐ + │ OpenFiles engine │ + │ POSIX-like path semantics │ + │ cache + sync + metadata │ + └───────────────┬──────────────┘ + │ + ▼ + ┌──────────────────────────────┐ + │ Object store source of truth │ + │ S3 / MinIO / COS / GCS / Blob │ + └──────────────────────────────┘ +``` + +With optional NATS distribution: + +```text + ┌──────────────────────┐ + │ OpenFiles :8787 │ + │ HTTP gateway + worker │ + └──────────┬───────────┘ + │ + │ NATS queue group + │ + mutation events + ▼ + ┌──────────────────────┐ + │ OpenFiles :8788 │ + │ HTTP gateway + worker │ + └──────────┬───────────┘ + │ + ▼ + ┌──────────────────────┐ + │ Shared MinIO / S3 │ + │ durable source truth │ + └──────────────────────┘ +``` + +--- + ## Quick start: MinIO + OpenFiles + normal file APIs ```bash @@ -27,28 +79,287 @@ docker compose -f examples/docker-compose.yml up -d minio createbucket cargo run -p openfiles-server -- --config ./openfiles.toml # In another shell -cargo run -q -p openfiles-cli -- --config ./openfiles.toml write /hello.txt "hello from OpenFiles" -cargo run -q -p openfiles-cli -- --config ./openfiles.toml ls / -cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /hello.txt -cargo run -q -p openfiles-cli -- --config ./openfiles.toml flush +cargo run -q -p openfiles-cli -- --config ./openfiles.toml write /hello.txt "hello from OpenFiles" +cargo run -q -p openfiles-cli -- --config ./openfiles.toml ls / +cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /hello.txt +cargo run -q -p openfiles-cli -- --config ./openfiles.toml flush +``` + +The MinIO console is available at: + +```text +http://localhost:9001 +``` + +Use the credentials from `examples/docker-compose.yml`. + +--- + +## Quick start without Docker Desktop + +On macOS, if Docker Hub or Podman image pulls are unavailable, run MinIO directly with Homebrew. + +```bash +brew install minio/stable/minio +mkdir -p /tmp/openfiles-minio + +MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=minioadmin \ + minio server /tmp/openfiles-minio --address 127.0.0.1:9000 --console-address 127.0.0.1:9001 +``` + +In another terminal, create the bucket: + +```bash +brew install minio/stable/mc + +mc alias set local http://127.0.0.1:9000 minioadmin minioadmin +mc mb --ignore-existing local/openfiles +``` + +Then run OpenFiles: + +```bash +cargo run -p openfiles-server -- --config ./openfiles.toml --listen 127.0.0.1:8787 +``` + +--- + +## Optional NATS distributed mode + +OpenS3Files can use NATS to distribute mutating work across multiple `openfiles-server` instances. + +This mode is useful when you want multiple HTTP gateways or workers sharing one object-backed filesystem. + +### What NATS distribution does + +- Uses a NATS work subject and queue group. +- Lets multiple `openfiles-server` processes subscribe as workers. +- Distributes mutating operations such as write, delete, rename, flush, and expire. +- Publishes mutation events so peer processes can invalidate clean local cache entries. +- Keeps the object store as the durable source of truth. + +### What stays local + +Reads are intentionally served by the local process engine, even when NATS is enabled. + +Reason: OpenFiles cache files are per process, while the object store is shared. Routing reads through a NATS queue group can send the read to a worker whose cache metadata references a local cache blob that does not exist on that specific process. Local reads avoid that cross-process cache-file mismatch while still reading from the shared MinIO/S3 source of truth. + +### Install and start NATS + +Using Homebrew: + +```bash +brew install nats-server +nats-server -p 4222 +``` + +Verify NATS is listening: + +```bash +nc -vz 127.0.0.1 4222 +``` + +Expected: + +```text +Connection to 127.0.0.1 port 4222 succeeded +``` + +### Enable NATS in `openfiles.toml` + +Add or update: + +```toml +[nats] +enabled = true +url = "nats://127.0.0.1:4222" +subject_prefix = "openfiles" +request_timeout_ms = 30000 +max_payload_bytes = 10485760 +publish_events = true + +# Optional. If omitted, the server derives a queue group from subject_prefix and fs_id. +# queue_group = "openfiles.default.workers" + +# Optional. If omitted, each server generates a UUID. +# instance_id = "node-1" +``` + +### Start two OpenFiles server instances + +Terminal 1: + +```bash +nats-server -p 4222 +``` + +Terminal 2: + +```bash +cargo run -p openfiles-server -- --config ./openfiles.toml --listen 127.0.0.1:8787 +``` + +Terminal 3: + +```bash +cargo run -p openfiles-server -- --config ./openfiles.toml --listen 127.0.0.1:8788 +``` + +### Distributed validation test + +Write through node `8787`, then read through node `8788`: + +```bash +name="dist-$(date +%s).txt" + +curl -i -X PUT --data-binary "hello $name" \ + "http://127.0.0.1:8787/v1/write/$name" + +for i in $(seq 1 20); do + curl -s "http://127.0.0.1:8788/v1/read/$name" + echo +done +``` + +Expected output: + +```text +hello dist-.txt +hello dist-.txt +hello dist-.txt +... ``` -# 🚀 OpenFiles Script Execution Examples -## 🧠 Mental Model +Reverse direction: + +```bash +name="dist-reverse-$(date +%s).txt" + +curl -i -X PUT --data-binary "hello $name" \ + "http://127.0.0.1:8788/v1/write/$name" +for i in $(seq 1 20); do + curl -s "http://127.0.0.1:8787/v1/read/$name" + echo +done ``` -OpenFiles (MinIO / S3 / Your Provider) + +Expected output: + +```text +hello dist-reverse-.txt +hello dist-reverse-.txt +hello dist-reverse-.txt +... +``` + +### Useful distributed-mode checks + +List files through each instance: + +```bash +curl -i "http://127.0.0.1:8787/v1/list" +curl -i "http://127.0.0.1:8788/v1/list" +``` + +Read a file through each instance: + +```bash +curl -i "http://127.0.0.1:8787/v1/read/hello.txt" +curl -i "http://127.0.0.1:8788/v1/read/hello.txt" +``` + +Write through one instance and flush through another: + +```bash +curl -i -X PUT --data-binary "distributed flush test" \ + "http://127.0.0.1:8787/v1/write/flush-test.txt" + +curl -i -X POST "http://127.0.0.1:8788/v1/flush" + +curl -i "http://127.0.0.1:8787/v1/read/flush-test.txt" +``` + +### Troubleshooting distributed mode + +#### `failed to connect to NATS at nats://127.0.0.1:4222` + +NATS is not running or not reachable. + +```bash +nats-server -p 4222 +nc -vz 127.0.0.1 4222 +``` + +#### `storage error ... http://127.0.0.1:9000/openfiles/...` + +MinIO/S3 is not running, the bucket does not exist, or `openfiles.toml` points at the wrong endpoint. + +```bash +mc alias set local http://127.0.0.1:9000 minioadmin minioadmin +mc mb --ignore-existing local/openfiles +mc ls local/openfiles +``` + +#### `404 Not Found` + +Use the actual HTTP routes: + +```bash +/v1/write/{path} +/v1/read/{path} +/v1/list +/v1/list/{path} +/v1/stat +/v1/stat/{path} +/v1/delete/{path} +/v1/rename +/v1/flush +/v1/expire +/healthz +``` + +Example: + +```bash +curl -i -X PUT --data-binary "hello" \ + "http://127.0.0.1:8787/v1/write/demo.txt" + +curl -i "http://127.0.0.1:8788/v1/read/demo.txt" +``` + +#### `remote io error: No such file or directory` + +This can happen if distributed reads are routed through NATS and a remote worker has stale cache metadata but not the local cache blob. + +The current recommended behavior is: + +- NATS distributes writes, delete, rename, flush, expire, and cache invalidation. +- Reads stay local and use the shared object store as source of truth. + +--- + +# OpenFiles Script Execution Examples + +## Mental model + +```text +OpenFiles object store / MinIO / S3 / provider ↓ cat -Local Shell (sh/bash) +Local shell: sh / bash / zsh ↓ executes -OpenFiles CLI (for FS ops) +OpenFiles CLI + ↓ performs filesystem operations +OpenFiles object store / MinIO / S3 / provider ``` +Scripts are stored in OpenFiles, but executed by the host shell. + --- -## 📦 1. Create and run a simple script +## 1. Create and run a simple script -### Create script in OpenFiles +Create a script in OpenFiles: ```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml write /hello.sh '#!/bin/sh @@ -56,7 +367,7 @@ echo "Hello from OpenFiles!" ' ``` -### Execute it +Execute it: ```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /hello.sh | sh @@ -64,7 +375,7 @@ cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /hello.sh | sh --- -## 📂 2. Script that interacts with OpenFiles +## 2. Script that interacts with OpenFiles ```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml write /ops.sh '#!/bin/sh @@ -88,7 +399,7 @@ $OF flush ' ``` -### Run: +Run: ```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /ops.sh | sh @@ -96,10 +407,11 @@ cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /ops.sh | sh --- -## 🔁 3. Execute script and persist output back to OpenFiles +## 3. Execute script and persist output back to OpenFiles + +Create a job script: ```bash -# create job script cargo run -q -p openfiles-cli -- --config ./openfiles.toml write /job.sh '#!/bin/sh echo "Job started" date @@ -107,35 +419,51 @@ echo "Processing..." sleep 1 echo "Done" ' +``` + +Run and capture output: -# run + capture output +```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /job.sh | sh > /tmp/job.out +``` -# upload output back into OpenFiles +Upload output back into OpenFiles: + +```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml write /job-output.txt "$(cat /tmp/job.out)" +``` + +Read the result: -# read result +```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /job-output.txt ``` --- -## 🔗 4. Pipe OpenFiles data into Unix tools +## 4. Pipe OpenFiles data into Unix tools + +Word count: ```bash -# word count cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /hello.txt | wc -w +``` + +Uppercase transform: -# uppercase transform +```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /hello.txt | tr a-z A-Z +``` + +Grep: -# grep +```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /hello.txt | grep OpenFiles ``` --- -## ⚡ 5. One-liner remote execution pattern +## 5. One-liner remote execution pattern ```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /script.sh | sh @@ -143,7 +471,7 @@ cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /script.sh | sh --- -## 🧪 6. Multi-step pipeline script +## 6. Multi-step pipeline script ```bash cargo run -q -p openfiles-cli -- --config ./openfiles.toml write /pipeline.sh '#!/bin/sh @@ -177,7 +505,7 @@ cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /result.txt --- -## 🧰 7. Developer ergonomics (alias) +## 7. Developer ergonomics with an alias ```bash alias of='cargo run -q -p openfiles-cli -- --config ./openfiles.toml' @@ -186,29 +514,69 @@ alias of='cargo run -q -p openfiles-cli -- --config ./openfiles.toml' Then: ```bash -of write /test.sh '#!/bin/sh echo hi' +of write /test.sh '#!/bin/sh +echo hi +' + of cat /test.sh | sh of ls / ``` --- -## ⚠️ Important Note +## Important script execution note Scripts are stored in OpenFiles but executed by the host shell. -- `ls /` → host filesystem -- `of ls /` → OpenFiles filesystem +- `ls /` means the host filesystem. +- `of ls /` means the OpenFiles filesystem. +- `cat /file.txt` means the host filesystem. +- `of cat /file.txt` means the OpenFiles filesystem. + +Always use the OpenFiles CLI inside scripts when you want to interact with OpenFiles. + +--- + +## HTTP API -Always use the CLI inside scripts to interact with OpenFiles. +The server exposes simple HTTP routes: +| Method | Route | Description | +|---|---|---| +| `GET` | `/healthz` | Health check | +| `GET` | `/v1/stat` | Stat root | +| `GET` | `/v1/stat/{path}` | Stat path | +| `GET` | `/v1/list` | List root | +| `GET` | `/v1/list/{path}` | List directory | +| `GET` | `/v1/read/{path}` | Read file | +| `PUT` | `/v1/write/{path}` | Write file body | +| `DELETE` | `/v1/delete/{path}` | Delete path | +| `POST` | `/v1/rename` | Rename path | +| `POST` | `/v1/flush` | Flush dirty cache to backend | +| `POST` | `/v1/expire` | Expire inactive clean cache data | + +Examples: + +```bash +curl -i "http://127.0.0.1:8787/healthz" + +curl -i -X PUT --data-binary "hello http" \ + "http://127.0.0.1:8787/v1/write/http-demo.txt" -The MinIO console is available at `http://localhost:9001` with the credentials in `examples/docker-compose.yml`. +curl -i "http://127.0.0.1:8787/v1/read/http-demo.txt" + +curl -i "http://127.0.0.1:8787/v1/list" + +curl -i -X POST "http://127.0.0.1:8787/v1/flush" +``` + +--- ## Optional FUSE mount ```bash -# Linux: install libfuse3-dev/fuse3. macOS: install macFUSE. +# Linux: install libfuse3-dev/fuse3. +# macOS: install macFUSE. cargo run -p openfiles-fuse --features fuse -- --config ./openfiles.toml /mnt/openfiles # Then use regular tools: @@ -216,9 +584,13 @@ echo "standard file IO" > /mnt/openfiles/demo.txt cat /mnt/openfiles/demo.txt ``` +--- + ## wasmCloud v2 deployment pattern -OpenFiles runs as a Rust daemon/sidecar on the node or pod, materializes its active working set under a local path, and wasmCloud components receive that path as a `wasi:filesystem` preopen. This aligns with wasmCloud v2's explicit volume model and avoids routing every file read through an RPC boundary. +OpenFiles runs as a Rust daemon or sidecar on the node or pod, materializes its active working set under a local path, and wasmCloud components receive that path as a `wasi:filesystem` preopen. + +This aligns with wasmCloud v2's explicit volume model and avoids routing every file read through an RPC boundary. See: @@ -226,9 +598,11 @@ See: - `examples/wasmcloud/workload-http-list-files.yaml` - `examples/wasmcloud/components/http-list-files/` +--- + ## Vendor examples -Configuration files are under `examples/configs/`: +Configuration files are under `examples/configs/`. | Vendor | Config file | Notes | |---|---|---| @@ -241,33 +615,25 @@ Configuration files are under `examples/configs/`: | MinIO | `minio.toml` | S3-compatible local/dev endpoint. | | NetApp StorageGRID | `netapp-storagegrid.toml` | S3-compatible endpoint. | +--- + ## Performance model OpenFiles uses the same high-level performance strategy documented for S3 Files: -1. **Low-latency active set**: small/hot files are cached on high-performance local or shared storage. +1. **Low-latency active set**: small and hot files are cached on high-performance local or shared storage. 2. **Direct large reads**: large synchronized reads are served directly from the object store using range requests. 3. **Batched exports**: repeated writes to the same file can be consolidated before object PUT. 4. **Metadata-first directory import**: first directory access indexes object metadata, then selectively imports small files. +5. **Distributed mutation workers**: when NATS is enabled, mutating work can be spread across multiple OpenFiles server instances. OpenFiles cannot guarantee AWS S3 Files' exact throughput or latency because that depends on AWS EFS/S3 internals and region-scale infrastructure. It is designed to reproduce the semantics and developer experience while letting operators choose the cache medium, placement, concurrency, and object backend. -## Repository layout +--- -```text -crates/openfiles-core Core engine, standard, cache, backend adapters -crates/openfiles-cli CLI for file operations and smoke tests -crates/openfiles-server HTTP API + sync daemon -crates/openfiles-fuse Optional FUSE mount -crates/openfiles-wasmcloud-host Reference custom-host/plugin integration -wit/ OpenFiles WIT contract -examples/configs/ Vendor configs -examples/bindings/ Python, Node, Go, Rust examples -examples/wasmcloud/ wasmCloud v2 manifests and Rust component -docs/ Standard, architecture, performance, vendor matrix -``` +## Correctness and conflict model -## Safety and correctness defaults +OpenFiles follows these correctness defaults: - The object store is the source of truth on conflicts. - Conflicting local changes are moved into `.openfiles-lost+found-/` and are not exported until the user explicitly copies them out. @@ -275,6 +641,88 @@ docs/ Standard, architecture, performance, vendor matri - Hard links are not supported. - POSIX metadata is stored in portable sidecar metadata objects by default and can additionally be stored as user metadata where the backend supports it. - Paths are validated against object-store path component and full-key limits. +- NATS mutation events invalidate clean cache entries on peer instances. +- Dirty local cache entries are preserved to avoid losing unsynced work. + +--- + +## CRDT notes + +OpenFiles does not use CRDTs for arbitrary file bytes by default. + +That is intentional. For arbitrary binary files, automatic conflict-free merge is unsafe without application-specific merge semantics. OpenFiles instead uses: + +- object store as the source of truth, +- sidecar metadata, +- explicit conflict detection, +- lost+found recovery for conflicting local writes, +- optional distributed cache invalidation with NATS. + +A CRDT layer can be added above OpenFiles for specific file types such as JSON documents, text documents, collaborative editor states, or append-only logs, but it should not be applied blindly to all file contents. + +--- + +## Repository layout + +```text +crates/openfiles-core Core engine, standard, cache, backend adapters +crates/openfiles-cli CLI for file operations and smoke tests +crates/openfiles-server HTTP API, sync daemon, optional NATS distribution +crates/openfiles-fuse Optional FUSE mount +crates/openfiles-wasmcloud-host Reference custom-host/plugin integration +wit/ OpenFiles WIT contract +examples/configs/ Vendor configs +examples/bindings/ Python, Node, Go, Rust examples +examples/wasmcloud/ wasmCloud v2 manifests and Rust component +docs/ Standard, architecture, performance, vendor matrix +``` + +--- + +## Development + +Format: + +```bash +cargo fmt +``` + +Check: + +```bash +cargo check +``` + +Check only the server: + +```bash +cargo check -p openfiles-server +``` + +Run CLI smoke test: + +```bash +cargo run -q -p openfiles-cli -- --config ./openfiles.toml write /smoke.txt "smoke" +cargo run -q -p openfiles-cli -- --config ./openfiles.toml cat /smoke.txt +cargo run -q -p openfiles-cli -- --config ./openfiles.toml flush +``` + +Run server smoke test: + +```bash +cargo run -p openfiles-server -- --config ./openfiles.toml --listen 127.0.0.1:8787 +``` + +In another terminal: + +```bash +curl -i -X PUT --data-binary "hello server" \ + "http://127.0.0.1:8787/v1/write/server-smoke.txt" + +curl -i "http://127.0.0.1:8787/v1/read/server-smoke.txt" +``` + +--- ## License