Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 152 additions & 10 deletions application/src/server/filesystem/cap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ impl CapFilesystem {
.ok_or_else(|| anyhow::anyhow!("filesystem not initialized"))
}

async fn async_refresh_inner(&self) -> Result<Arc<cap_std::fs::Dir>, anyhow::Error> {
let base_path = self.base_path.clone();
let inner = tokio::task::spawn_blocking(move || {
cap_std::fs::Dir::open_ambient_dir(&*base_path, cap_std::ambient_authority())
})
.await??;

let inner = Arc::new(inner);
*self.inner.write().await = Some(inner.clone());

Ok(inner)
}

#[inline]
pub fn get_inner(&self) -> Result<Arc<cap_std::fs::Dir>, anyhow::Error> {
let inner = self.inner.blocking_read();
Expand All @@ -87,6 +100,17 @@ impl CapFilesystem {
.ok_or_else(|| anyhow::anyhow!("filesystem not initialized"))
}

fn refresh_inner(&self) -> Result<Arc<cap_std::fs::Dir>, anyhow::Error> {
let inner = Arc::new(cap_std::fs::Dir::open_ambient_dir(
&*self.base_path,
cap_std::ambient_authority(),
)?);

*self.inner.blocking_write() = Some(inner.clone());

Ok(inner)
}

#[inline]
pub fn resolve_path(path: &Path) -> PathBuf {
let mut result = PathBuf::new();
Expand Down Expand Up @@ -178,7 +202,46 @@ impl CapFilesystem {
let path = self.relative_path(path.as_ref());

let inner = self.async_get_inner().await?;
tokio::task::spawn_blocking(move || inner.remove_dir_all(path)).await??;
let res = tokio::task::spawn_blocking({
let inner = inner.clone();
let path = path.clone();
move || inner.remove_dir_all(path)
})
.await?;

match res {
Ok(()) => {}
Err(err) => {
tracing::warn!(
path = %path.display(),
error = %err,
"remove_dir_all failed, refreshing filesystem handle and retrying"
);

let inner = self.async_refresh_inner().await?;
let retry = tokio::task::spawn_blocking({
let inner = inner.clone();
let path = path.clone();
move || inner.remove_dir_all(path)
})
.await?;

match retry {
Ok(()) => {}
Err(retry_err) => {
tracing::warn!(
path = %path.display(),
error = %retry_err,
"remove_dir_all retry failed, falling back to std::fs::remove_dir_all"
);

let absolute_path = self.base_path.join(&path);
tokio::task::spawn_blocking(move || std::fs::remove_dir_all(absolute_path))
.await??;
}
}
}
}

Ok(())
}
Expand All @@ -187,7 +250,24 @@ impl CapFilesystem {
let path = self.relative_path(path.as_ref());

let inner = self.get_inner()?;
inner.remove_dir_all(path)?;
if let Err(err) = inner.remove_dir_all(path.clone()) {
tracing::warn!(
path = %path.display(),
error = %err,
"remove_dir_all failed, refreshing filesystem handle and retrying"
);

let inner = self.refresh_inner()?;
if let Err(retry_err) = inner.remove_dir_all(path.clone()) {
tracing::warn!(
path = %path.display(),
error = %retry_err,
"remove_dir_all retry failed, falling back to std::fs::remove_dir_all"
);

std::fs::remove_dir_all(self.base_path.join(&path))?;
}
}

Ok(())
}
Expand Down Expand Up @@ -249,8 +329,28 @@ impl CapFilesystem {
cap_std::fs::Metadata::from_just_metadata(tokio::fs::metadata(&*self.base_path).await?)
} else {
let inner = self.async_get_inner().await?;

tokio::task::spawn_blocking(move || inner.metadata(path)).await??
let result =
tokio::task::spawn_blocking({
let inner = inner.clone();
let path = path.clone();

move || inner.metadata(path)
})
.await?;

match result {
Ok(metadata) => metadata,
Err(err) => {
tracing::warn!(
path = %path.display(),
error = %err,
"metadata failed, refreshing filesystem handle and retrying"
);

let inner = self.async_refresh_inner().await?;
tokio::task::spawn_blocking(move || inner.metadata(path)).await??
}
}
};

Ok(metadata)
Expand All @@ -263,8 +363,19 @@ impl CapFilesystem {
cap_std::fs::Metadata::from_just_metadata(std::fs::metadata(&*self.base_path)?)
} else {
let inner = self.get_inner()?;

inner.metadata(path)?
match inner.metadata(path.clone()) {
Ok(metadata) => metadata,
Err(err) => {
tracing::warn!(
path = %path.display(),
error = %err,
"metadata failed, refreshing filesystem handle and retrying"
);

let inner = self.refresh_inner()?;
inner.metadata(path)?
}
}
};

Ok(metadata)
Expand All @@ -282,8 +393,28 @@ impl CapFilesystem {
)
} else {
let inner = self.async_get_inner().await?;

tokio::task::spawn_blocking(move || inner.symlink_metadata(path)).await??
let result =
tokio::task::spawn_blocking({
let inner = inner.clone();
let path = path.clone();

move || inner.symlink_metadata(path)
})
.await?;

match result {
Ok(metadata) => metadata,
Err(err) => {
tracing::warn!(
path = %path.display(),
error = %err,
"symlink_metadata failed, refreshing filesystem handle and retrying"
);

let inner = self.async_refresh_inner().await?;
tokio::task::spawn_blocking(move || inner.symlink_metadata(path)).await??
}
}
};

Ok(metadata)
Expand All @@ -296,8 +427,19 @@ impl CapFilesystem {
cap_std::fs::Metadata::from_just_metadata(std::fs::symlink_metadata(&*self.base_path)?)
} else {
let inner = self.get_inner()?;

inner.symlink_metadata(path)?
match inner.symlink_metadata(path.clone()) {
Ok(metadata) => metadata,
Err(err) => {
tracing::warn!(
path = %path.display(),
error = %err,
"symlink_metadata failed, refreshing filesystem handle and retrying"
);

let inner = self.refresh_inner()?;
inner.symlink_metadata(path)?
}
}
};

Ok(metadata)
Expand Down