From 1d364a4c1d491d0eaa4dd6a3b5a5bece0e31ed5a Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 5 May 2026 12:56:11 -0700 Subject: [PATCH] Decouple support bundle inner layer from Nexus management Replaces `BundleCollection.bundle: SupportBundle` with a slim `BundleInfo { id, reason_for_creation }`. Moves the sled-storage chunked transfer (`store_bundle_on_sled`), zip helpers (`bundle_to_zipfile`, `recursively_add_directory_to_zipfile`, `sha2_hash`), the `CHUNK_SIZE` and `TEMPDIR` constants, and the DB-polling cancellation (`check_for_cancellation`) out of the inner `support_bundle/` module and into `support_bundle_collector.rs`. After this change the inner layer is a pure mechanism: it never reads the `support_bundle` DB row, never talks to a sled-agent's bundle storage endpoints, and treats CRDB only as a source of facts about sleds, ereports, and blueprints. The outer collector remains the manager of the bundle lifecycle. This is the first step toward a future shared crate that omdb can use to collect bundles when Nexus is down. --- .../tasks/support_bundle/collection.rs | 420 ++---------------- .../tasks/support_bundle_collector.rs | 350 ++++++++++++++- 2 files changed, 391 insertions(+), 379 deletions(-) diff --git a/nexus/src/app/background/tasks/support_bundle/collection.rs b/nexus/src/app/background/tasks/support_bundle/collection.rs index 08c9df8dbde..2326f05e677 100644 --- a/nexus/src/app/background/tasks/support_bundle/collection.rs +++ b/nexus/src/app/background/tasks/support_bundle/collection.rs @@ -2,9 +2,16 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! The entrypoint to all support bundle collection. +//! The mechanism layer of support bundle collection. //! -//! These are the primitives used to look up everything else within the bundle. +//! Given a datastore handle, internal-DNS resolver, [`OpContext`], a +//! [`BundleDataSelection`], a synthesized [`BundleInfo`] (id + +//! reason_for_creation), and an output directory, gather data into the +//! directory and produce a [`SupportBundleCollectionReport`]. +//! +//! This layer never reads the `support_bundle` table, never transfers +//! data to a sled-agent's bundle storage endpoints, and never polls +//! bundle state. Those responsibilities belong to the caller. use crate::app::background::tasks::support_bundle::cache::Cache; use crate::app::background::tasks::support_bundle::perfetto; @@ -13,44 +20,26 @@ use crate::app::background::tasks::support_bundle::steps; use nexus_types::support_bundle::BundleDataSelection; use anyhow::Context; -use camino::Utf8DirEntry; -use camino::Utf8Path; use camino_tempfile::Utf8TempDir; -use camino_tempfile::tempdir_in; -use camino_tempfile::tempfile_in; use internal_dns_resolver::Resolver; -use nexus_db_model::SupportBundle; -use nexus_db_model::SupportBundleState; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; use nexus_types::internal_api::background::SupportBundleCollectionReport; -use omicron_common::api::external::Error; -use omicron_uuid_kinds::DatasetUuid; use omicron_uuid_kinds::SupportBundleUuid; -use omicron_uuid_kinds::ZpoolUuid; use parallel_task_set::ParallelTaskSet; use serde_json::json; -use sha2::Digest; -use sha2::Sha256; -use slog_error_chain::InlineErrorChain; -use std::io::Write; -use std::num::NonZeroU64; use std::sync::Arc; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncSeekExt; -use tokio::io::SeekFrom; use tokio_util::sync::CancellationToken; -use tufaceous_artifact::ArtifactHash; -use zip::ZipWriter; -use zip::write::FullFileOptions; - -/// We use "/var/tmp" to use Nexus' filesystem for temporary storage, -/// rather than "/tmp", which would keep this collected data in-memory. -pub const TEMPDIR: &str = "/var/tmp"; -/// The size of piece of a support bundle to transfer to the sled agent -/// within a single streaming request. -pub const CHUNK_SIZE: NonZeroU64 = NonZeroU64::new(1024 * 1024 * 1024).unwrap(); +/// Minimal bundle metadata needed by the mechanism layer. +/// +/// Built by the caller — either drawn from an existing `support_bundle` +/// DB row or synthesized fresh. +#[derive(Clone, Debug)] +pub struct BundleInfo { + pub id: SupportBundleUuid, + pub reason_for_creation: String, +} /// Wraps up all arguments to perform a single support bundle collection pub struct BundleCollection { @@ -59,8 +48,7 @@ pub struct BundleCollection { log: slog::Logger, opctx: OpContext, data_selection: BundleDataSelection, - bundle: SupportBundle, - transfer_chunk_size: NonZeroU64, + bundle: BundleInfo, cancellation_token: CancellationToken, } @@ -71,8 +59,7 @@ impl BundleCollection { log: slog::Logger, opctx: OpContext, data_selection: BundleDataSelection, - bundle: SupportBundle, - transfer_chunk_size: NonZeroU64, + bundle: BundleInfo, ) -> Self { Self { datastore, @@ -81,7 +68,6 @@ impl BundleCollection { opctx, data_selection, bundle, - transfer_chunk_size, cancellation_token: CancellationToken::new(), } } @@ -106,7 +92,7 @@ impl BundleCollection { &self.data_selection } - pub fn bundle(&self) -> &SupportBundle { + pub fn bundle(&self) -> &BundleInfo { &self.bundle } @@ -120,8 +106,8 @@ impl BundleCollection { /// Returns a reference to the cancellation token. /// - /// Pass to helper functions that need to `select!` on cancellation - /// independently (e.g., futures inside `FuturesUnordered`). + /// Cancellation is driven by the caller; this type only observes + /// the token. pub fn cancellation_token(&self) -> &CancellationToken { &self.cancellation_token } @@ -134,79 +120,24 @@ impl BundleCollection { self.cancellation_token.cancelled().await } - /// Collect the bundle within Nexus, and store it on a target sled. - pub async fn collect_bundle_and_store_on_sled( - self: &Arc, - ) -> anyhow::Result { - // Create a temporary directory where we'll store the support bundle - // as it's being collected. - let dir = tempdir_in(TEMPDIR)?; - - let report = self.collect_bundle_locally(&dir).await?; - self.store_bundle_on_sled(dir).await?; - Ok(report) - } - - // Create the support bundle, placing the contents into a user-specified - // directory. - // - // Does not attempt to convert the contents into a zipfile, nor send them - // to any durable storage. - async fn collect_bundle_locally( + /// Collect the bundle into the supplied temporary directory. + /// + /// The caller drives cancellation via `cancellation_token()`; this + /// function drains in-flight work before returning. The caller is + /// responsible for whatever happens to the directory afterward + /// (zipping, transferring to durable storage, writing to stdout, + /// etc.). + pub async fn collect_bundle_locally( self: &Arc, dir: &Utf8TempDir, ) -> anyhow::Result { - // Spawn a background task that periodically checks whether this - // bundle should still be collected. If not, it cancels the - // `CancellationToken`. - // - // Cancellation is hybrid: - // - // - Cancel-safe operations (HTTP requests, DB queries) use - // `tokio::select!` with the token for immediate cancellation. - // Dropping these futures is safe — no local side effects. - // - // - Cancel-unsafe operations (filesystem writes via `tokio::fs`, - // `spawn_blocking`) use cooperative `is_cancelled()` checks. - // These are never dropped mid-flight, ensuring all - // `spawn_blocking` work completes before the TempDir is dropped. - // - // `run_collect_bundle_steps` checks the token before spawning new - // steps and drains all in-flight tasks before returning. - // - // Previous iterations used a top-level `tokio::select!` to race - // cancellation against the entire collection. This had two problems: - // - // 1. Dropping the collection future left `spawn_blocking` file - // writes in-flight, racing with TempDir cleanup. - // See: https://github.com/oxidecomputer/omicron/issues/10198 - // - // 2. Earlier versions of the `select!` did async DB work in the - // cancellation branch body while the collection branch held a - // connection pool claim, causing deadlocks. - // See: https://github.com/oxidecomputer/omicron/issues/9259 - // - // The current design avoids both: cancel-unsafe futures are never - // dropped, and the DB check runs in a separate spawned task. - let cancel_task = tokio::spawn({ - let this = Arc::clone(self); - async move { this.check_for_cancellation().await } - }); - - // Run the collection. It checks self.cancelled and drains all - // in-flight work before returning. let report = self.collect_bundle_as_file(dir).await; - // Collection is done — stop the cancellation checker. - cancel_task.abort(); - let _ = cancel_task.await; - if self.is_cancelled() { warn!( &self.log, "Support Bundle cancelled - stopping collection"; "bundle" => %self.bundle.id, - "state" => ?self.bundle.state ); return Err(anyhow::anyhow!("Support Bundle Cancelled")); } @@ -219,180 +150,28 @@ impl BundleCollection { report } - async fn store_bundle_on_sled( - &self, - dir: Utf8TempDir, - ) -> anyhow::Result<()> { - // Create the zipfile as a temporary file - let mut zipfile = tokio::fs::File::from_std(bundle_to_zipfile(&dir)?); - let total_len = zipfile.metadata().await?.len(); - - // Collect the hash locally before we send it over the network - // - // We'll use this later during finalization to confirm the bundle - // has been stored successfully. - zipfile.seek(SeekFrom::Start(0)).await?; - let hash = sha2_hash(&mut zipfile).await?; - - // Find the sled where we're storing this bundle. - let sled_id = self - .datastore - .zpool_get_sled_if_in_service( - &self.opctx, - self.bundle.zpool_id.into(), - ) - .await?; - let sled_client = nexus_networking::sled_client( - &self.datastore, - &self.opctx, - sled_id, - &self.log, - ) - .await?; - - let zpool = ZpoolUuid::from(self.bundle.zpool_id); - let dataset = DatasetUuid::from(self.bundle.dataset_id); - let support_bundle = SupportBundleUuid::from(self.bundle.id); - - // Tell this sled to create the bundle. - let creation_result = sled_client - .support_bundle_start_creation(&zpool, &dataset, &support_bundle) - .await - .with_context(|| "Support bundle failed to start creation")?; - - if matches!( - creation_result.state, - sled_agent_client::types::SupportBundleState::Complete - ) { - // Early exit case: the bundle was already created -- we must have either - // crashed or failed between "finalizing" and "writing to the database that we - // finished". - info!(&self.log, "Support bundle was already collected"; "bundle" => %self.bundle.id); - return Ok(()); - } - info!(&self.log, "Support bundle creation started"; "bundle" => %self.bundle.id); - - let mut offset = 0; - while offset < total_len { - // Stream the zipfile to the sled where it should be kept - let mut file = zipfile - .try_clone() - .await - .with_context(|| "Failed to clone zipfile")?; - file.seek(SeekFrom::Start(offset)).await.with_context(|| { - format!("Failed to seek to offset {offset} / {total_len} within zipfile") - })?; - - // Only stream at most "transfer_chunk_size" bytes at once - let chunk_size = std::cmp::min( - self.transfer_chunk_size.get(), - total_len - offset, - ); - - let limited_file = file.take(chunk_size); - let stream = tokio_util::io::ReaderStream::new(limited_file); - let body = reqwest::Body::wrap_stream(stream); - - info!( - &self.log, - "Streaming bundle chunk"; - "bundle" => %self.bundle.id, - "offset" => offset, - "length" => chunk_size, - ); - - sled_client.support_bundle_transfer( - &zpool, &dataset, &support_bundle, offset, body - ).await.with_context(|| { - format!("Failed to transfer bundle: {chunk_size}@{offset} of {total_len} to sled") - })?; - - offset += chunk_size; - } - - sled_client - .support_bundle_finalize( - &zpool, - &dataset, - &support_bundle, - &hash.to_string(), - ) - .await - .with_context(|| "Failed to finalize bundle")?; - - // Returning from this method should drop all temporary storage - // allocated locally for this support bundle. - Ok(()) - } - - // Indefinitely perform periodic checks about whether or not we should - // cancel the bundle. + // Cancellation is hybrid: // - // Cancels `self.cancellation_token` and returns if: - // - The bundle state is no longer SupportBundleState::Collecting - // (which happens if the bundle has been explicitly cancelled, or - // if the backing storage has been expunged). - // - The bundle has been deleted + // - Cancel-safe operations (HTTP requests, DB queries) use + // `tokio::select!` with the token for immediate cancellation. + // Dropping these futures is safe — no local side effects. // - // Otherwise, keeps checking indefinitely while polled. - async fn check_for_cancellation(&self) { - let work_duration = tokio::time::Duration::from_secs(5); - let mut yield_interval = tokio::time::interval_at( - tokio::time::Instant::now() + work_duration, - work_duration, - ); - - loop { - // Timer fired mid-collection - check if we should stop. - yield_interval.tick().await; - trace!( - self.log, - "Checking if Bundle Collection cancelled"; - "bundle" => %self.bundle.id - ); - - match self - .datastore - .support_bundle_get(&self.opctx, self.bundle.id.into()) - .await - { - Ok(SupportBundle { - state: SupportBundleState::Collecting, - .. - }) => { - // Bundle still collecting; continue... - continue; - } - Ok(_) => { - // Not collecting, for any reason: Time to exit - self.cancellation_token.cancel(); - return; - } - Err(Error::ObjectNotFound { .. } | Error::NotFound { .. }) => { - self.cancellation_token.cancel(); - return; - } - Err(err) => { - warn!( - self.log, - "Database error checking bundle cancellation"; - InlineErrorChain::new(&err) - ); - - // If we cannot contact the database, retry later - continue; - } - } - } - } - + // - Cancel-unsafe operations (filesystem writes via `tokio::fs`, + // `spawn_blocking`) use cooperative `is_cancelled()` checks. + // These are never dropped mid-flight, ensuring all + // `spawn_blocking` work completes before the output directory + // is dropped. + // + // This loop checks the token before spawning new steps and drains + // all in-flight tasks before returning. Failing to do so previously + // raced `spawn_blocking` writes against `TempDir` cleanup; see + // https://github.com/oxidecomputer/omicron/issues/10198. async fn run_collect_bundle_steps( self: &Arc, output: &Utf8TempDir, mut steps: Vec, ) -> SupportBundleCollectionReport { - let mut report = - SupportBundleCollectionReport::new(self.bundle.id.into()); + let mut report = SupportBundleCollectionReport::new(self.bundle.id); const MAX_CONCURRENT_STEPS: usize = 16; let mut tasks = @@ -569,110 +348,3 @@ impl BundleCollection { Ok(self.run_collect_bundle_steps(dir, steps).await) } } - -// Takes a directory "dir", and zips the contents into a single zipfile. -fn bundle_to_zipfile(dir: &Utf8TempDir) -> anyhow::Result { - let tempfile = tempfile_in(TEMPDIR)?; - let mut zip = ZipWriter::new(tempfile); - - recursively_add_directory_to_zipfile(&mut zip, dir.path(), dir.path())?; - - Ok(zip.finish()?) -} - -fn recursively_add_directory_to_zipfile( - zip: &mut ZipWriter, - root_path: &Utf8Path, - dir_path: &Utf8Path, -) -> anyhow::Result<()> { - // Readdir might return entries in a non-deterministic order. - // Let's sort it for the zipfile, to be nice. - let mut entries = dir_path - .read_dir_utf8()? - .filter_map(Result::ok) - .collect::>(); - entries.sort_by(|a, b| a.file_name().cmp(&b.file_name())); - - for entry in &entries { - // Remove the "/tmp/..." prefix from the path when we're storing it in the - // zipfile. - let dst = entry.path().strip_prefix(root_path)?; - - let file_type = entry.file_type()?; - if file_type.is_file() { - let src = entry.path(); - - let zip_time = entry - .path() - .metadata() - .and_then(|m| m.modified()) - .ok() - .and_then(|sys_time| jiff::Zoned::try_from(sys_time).ok()) - .and_then(|zoned| { - zip::DateTime::try_from(zoned.datetime()).ok() - }) - .unwrap_or_else(zip::DateTime::default); - - let opts = FullFileOptions::default() - .last_modified_time(zip_time) - .compression_method(zip::CompressionMethod::Deflated) - .large_file(true); - - zip.start_file_from_path(dst, opts)?; - let mut file = std::fs::File::open(&src)?; - std::io::copy(&mut file, zip)?; - } - if file_type.is_dir() { - let opts = FullFileOptions::default(); - zip.add_directory_from_path(dst, opts)?; - recursively_add_directory_to_zipfile(zip, root_path, entry.path())?; - } - } - Ok(()) -} - -async fn sha2_hash(file: &mut tokio::fs::File) -> anyhow::Result { - let mut buf = vec![0u8; 65536]; - let mut ctx = Sha256::new(); - loop { - let n = file.read(&mut buf).await?; - if n == 0 { - break; - } - ctx.write_all(&buf[0..n])?; - } - - let digest = ctx.finalize(); - Ok(ArtifactHash(digest.as_slice().try_into()?)) -} - -#[cfg(test)] -mod test { - use super::*; - - use camino_tempfile::tempdir; - - // Ensure that we can convert a temporary directory into a zipfile - #[test] - fn test_zipfile_creation() { - let dir = tempdir().unwrap(); - - std::fs::create_dir_all(dir.path().join("dir-a")).unwrap(); - std::fs::create_dir_all(dir.path().join("dir-b")).unwrap(); - std::fs::write(dir.path().join("dir-a").join("file-a"), "some data") - .unwrap(); - std::fs::write(dir.path().join("file-b"), "more data").unwrap(); - - let zipfile = bundle_to_zipfile(&dir) - .expect("Should have been able to bundle zipfile"); - let archive = zip::read::ZipArchive::new(zipfile).unwrap(); - - // We expect the order to be deterministically alphabetical - let mut names = archive.file_names(); - assert_eq!(names.next(), Some("dir-a/")); - assert_eq!(names.next(), Some("dir-a/file-a")); - assert_eq!(names.next(), Some("dir-b/")); - assert_eq!(names.next(), Some("file-b")); - assert_eq!(names.next(), None); - } -} diff --git a/nexus/src/app/background/tasks/support_bundle_collector.rs b/nexus/src/app/background/tasks/support_bundle_collector.rs index d9091f1cc98..2f3aeadbb31 100644 --- a/nexus/src/app/background/tasks/support_bundle_collector.rs +++ b/nexus/src/app/background/tasks/support_bundle_collector.rs @@ -6,6 +6,11 @@ use crate::app::background::BackgroundTask; use anyhow::Context; +use camino::Utf8DirEntry; +use camino::Utf8Path; +use camino_tempfile::Utf8TempDir; +use camino_tempfile::tempdir_in; +use camino_tempfile::tempfile_in; use futures::FutureExt; use futures::future::BoxFuture; use internal_dns_resolver::Resolver; @@ -26,13 +31,31 @@ use omicron_uuid_kinds::SledUuid; use omicron_uuid_kinds::SupportBundleUuid; use omicron_uuid_kinds::ZpoolUuid; use serde_json::json; +use sha2::Digest; +use sha2::Sha256; use sled_agent_types::support_bundle::NESTED_DATASET_NOT_FOUND; use slog_error_chain::InlineErrorChain; +use std::io::Write; use std::num::NonZeroU64; use std::sync::Arc; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; +use tokio::io::SeekFrom; +use tokio_util::sync::CancellationToken; +use tufaceous_artifact::ArtifactHash; +use zip::ZipWriter; +use zip::write::FullFileOptions; use super::support_bundle::collection::BundleCollection; -use super::support_bundle::collection::CHUNK_SIZE; +use super::support_bundle::collection::BundleInfo; + +/// We use "/var/tmp" to use Nexus' filesystem for temporary storage, +/// rather than "/tmp", which would keep this collected data in-memory. +pub const TEMPDIR: &str = "/var/tmp"; + +/// The size of piece of a support bundle to transfer to the sled agent +/// within a single streaming request. +pub const CHUNK_SIZE: NonZeroU64 = NonZeroU64::new(1024 * 1024 * 1024).unwrap(); fn authz_support_bundle_from_id(id: SupportBundleUuid) -> authz::SupportBundle { authz::SupportBundle::new(authz::FLEET, id, LookupType::by_id(id)) @@ -370,17 +393,59 @@ impl SupportBundleCollector { .await .context("failed to query bundle data selection")?; + let bundle_log = + opctx.log.new(slog::o!("bundle" => bundle.id.to_string())); let collection = Arc::new(BundleCollection::new( self.datastore.clone(), self.resolver.clone(), - opctx.log.new(slog::o!("bundle" => bundle.id.to_string())), + bundle_log.clone(), opctx.child(std::collections::BTreeMap::new()), data_selection, - bundle.clone(), - self.transfer_chunk_size, + BundleInfo { + id: bundle.id.into(), + reason_for_creation: bundle.reason_for_creation.clone(), + }, )); - let mut report = collection.collect_bundle_and_store_on_sled().await?; + // Spawn a task that periodically polls the `support_bundle` row's + // state. If the bundle has been cancelled (or removed), it cancels + // the inner `CancellationToken`, prompting `collect_bundle_locally` + // to drain in-flight work and return. + // + // This is a separate task rather than a top-level + // `tokio::select!`: previous designs raced collection against an + // async DB check in the cancellation branch body, deadlocking + // when the collection branch held a connection pool claim. See + // https://github.com/oxidecomputer/omicron/issues/9259. + let cancel_task = tokio::spawn({ + let datastore = self.datastore.clone(); + let cancel_opctx = opctx.child(std::collections::BTreeMap::new()); + let token = collection.cancellation_token().clone(); + let log = bundle_log.clone(); + let bundle_id = bundle.id; + async move { + check_for_cancellation( + datastore, + cancel_opctx, + token, + log, + bundle_id.into(), + ) + .await + } + }); + + // Create a temporary directory where we'll store the support bundle + // as it's being collected. + let dir = tempdir_in(TEMPDIR)?; + let collect_result = collection.collect_bundle_locally(&dir).await; + + // Collection is done — stop the cancellation checker. + cancel_task.abort(); + let _ = cancel_task.await; + + let mut report = collect_result?; + self.store_bundle_on_sled(&bundle_log, opctx, &bundle, dir).await?; if let Err(err) = self .datastore .support_bundle_update( @@ -411,6 +476,256 @@ impl SupportBundleCollector { report.activated_in_db_ok = true; Ok(Some(report)) } + + // Zip the collected bundle directory and stream it to the sled-agent + // that owns the target zpool/dataset for durable storage. + async fn store_bundle_on_sled( + &self, + log: &slog::Logger, + opctx: &OpContext, + bundle: &SupportBundle, + dir: Utf8TempDir, + ) -> anyhow::Result<()> { + // Create the zipfile as a temporary file + let mut zipfile = tokio::fs::File::from_std(bundle_to_zipfile( + &dir, + Utf8Path::new(TEMPDIR), + )?); + let total_len = zipfile.metadata().await?.len(); + + // Collect the hash locally before we send it over the network + // + // We'll use this later during finalization to confirm the bundle + // has been stored successfully. + zipfile.seek(SeekFrom::Start(0)).await?; + let hash = sha2_hash(&mut zipfile).await?; + + // Find the sled where we're storing this bundle. + let sled_id = self + .datastore + .zpool_get_sled_if_in_service(opctx, bundle.zpool_id.into()) + .await?; + let sled_client = + nexus_networking::sled_client(&self.datastore, opctx, sled_id, log) + .await?; + + let zpool = ZpoolUuid::from(bundle.zpool_id); + let dataset = DatasetUuid::from(bundle.dataset_id); + let support_bundle = SupportBundleUuid::from(bundle.id); + + // Tell this sled to create the bundle. + let creation_result = sled_client + .support_bundle_start_creation(&zpool, &dataset, &support_bundle) + .await + .with_context(|| "Support bundle failed to start creation")?; + + if matches!( + creation_result.state, + sled_agent_client::types::SupportBundleState::Complete + ) { + // Early exit case: the bundle was already created -- we must have either + // crashed or failed between "finalizing" and "writing to the database that we + // finished". + info!(log, "Support bundle was already collected"; "bundle" => %bundle.id); + return Ok(()); + } + info!(log, "Support bundle creation started"; "bundle" => %bundle.id); + + let mut offset = 0; + while offset < total_len { + // Stream the zipfile to the sled where it should be kept + let mut file = zipfile + .try_clone() + .await + .with_context(|| "Failed to clone zipfile")?; + file.seek(SeekFrom::Start(offset)).await.with_context(|| { + format!("Failed to seek to offset {offset} / {total_len} within zipfile") + })?; + + // Only stream at most "transfer_chunk_size" bytes at once + let chunk_size = std::cmp::min( + self.transfer_chunk_size.get(), + total_len - offset, + ); + + let limited_file = file.take(chunk_size); + let stream = tokio_util::io::ReaderStream::new(limited_file); + let body = reqwest::Body::wrap_stream(stream); + + info!( + log, + "Streaming bundle chunk"; + "bundle" => %bundle.id, + "offset" => offset, + "length" => chunk_size, + ); + + sled_client.support_bundle_transfer( + &zpool, &dataset, &support_bundle, offset, body + ).await.with_context(|| { + format!("Failed to transfer bundle: {chunk_size}@{offset} of {total_len} to sled") + })?; + + offset += chunk_size; + } + + sled_client + .support_bundle_finalize( + &zpool, + &dataset, + &support_bundle, + &hash.to_string(), + ) + .await + .with_context(|| "Failed to finalize bundle")?; + + // Returning from this method should drop all temporary storage + // allocated locally for this support bundle. + Ok(()) + } +} + +// Indefinitely poll the database for the bundle's state. Cancels the +// supplied token when: +// +// - The bundle's state is no longer `Collecting` (typically because +// the bundle was explicitly cancelled by a user, or because its +// backing storage was expunged). +// - The bundle row no longer exists. +// +// The mechanism layer is otherwise oblivious to the bundle lifecycle — +// this is the management bridge that turns DB state changes into +// cancellation. +async fn check_for_cancellation( + datastore: Arc, + opctx: OpContext, + cancellation_token: CancellationToken, + log: slog::Logger, + bundle_id: SupportBundleUuid, +) { + let work_duration = tokio::time::Duration::from_secs(5); + let mut yield_interval = tokio::time::interval_at( + tokio::time::Instant::now() + work_duration, + work_duration, + ); + + loop { + // Timer fired mid-collection - check if we should stop. + yield_interval.tick().await; + trace!( + log, + "Checking if Bundle Collection cancelled"; + "bundle" => %bundle_id + ); + + match datastore.support_bundle_get(&opctx, bundle_id).await { + Ok(SupportBundle { + state: SupportBundleState::Collecting, .. + }) => { + // Bundle still collecting; continue... + continue; + } + Ok(_) => { + // Not collecting, for any reason: Time to exit + cancellation_token.cancel(); + return; + } + Err(Error::ObjectNotFound { .. } | Error::NotFound { .. }) => { + cancellation_token.cancel(); + return; + } + Err(err) => { + warn!( + log, + "Database error checking bundle cancellation"; + InlineErrorChain::new(&err) + ); + + // If we cannot contact the database, retry later + continue; + } + } + } +} + +// Takes a directory "dir", and zips the contents into a single zipfile +// stored as a tempfile under `tempdir`. +fn bundle_to_zipfile( + dir: &Utf8TempDir, + tempdir: &Utf8Path, +) -> anyhow::Result { + let tempfile = tempfile_in(tempdir)?; + let mut zip = ZipWriter::new(tempfile); + + recursively_add_directory_to_zipfile(&mut zip, dir.path(), dir.path())?; + + Ok(zip.finish()?) +} + +fn recursively_add_directory_to_zipfile( + zip: &mut ZipWriter, + root_path: &Utf8Path, + dir_path: &Utf8Path, +) -> anyhow::Result<()> { + // Readdir might return entries in a non-deterministic order. + // Let's sort it for the zipfile, to be nice. + let mut entries = dir_path + .read_dir_utf8()? + .filter_map(Result::ok) + .collect::>(); + entries.sort_by(|a, b| a.file_name().cmp(&b.file_name())); + + for entry in &entries { + // Remove the "/tmp/..." prefix from the path when we're storing it in the + // zipfile. + let dst = entry.path().strip_prefix(root_path)?; + + let file_type = entry.file_type()?; + if file_type.is_file() { + let src = entry.path(); + + let zip_time = entry + .path() + .metadata() + .and_then(|m| m.modified()) + .ok() + .and_then(|sys_time| jiff::Zoned::try_from(sys_time).ok()) + .and_then(|zoned| { + zip::DateTime::try_from(zoned.datetime()).ok() + }) + .unwrap_or_else(zip::DateTime::default); + + let opts = FullFileOptions::default() + .last_modified_time(zip_time) + .compression_method(zip::CompressionMethod::Deflated) + .large_file(true); + + zip.start_file_from_path(dst, opts)?; + let mut file = std::fs::File::open(&src)?; + std::io::copy(&mut file, zip)?; + } + if file_type.is_dir() { + let opts = FullFileOptions::default(); + zip.add_directory_from_path(dst, opts)?; + recursively_add_directory_to_zipfile(zip, root_path, entry.path())?; + } + } + Ok(()) +} + +async fn sha2_hash(file: &mut tokio::fs::File) -> anyhow::Result { + let mut buf = vec![0u8; 65536]; + let mut ctx = Sha256::new(); + loop { + let n = file.read(&mut buf).await?; + if n == 0 { + break; + } + ctx.write_all(&buf[0..n])?; + } + + let digest = ctx.finalize(); + Ok(ArtifactHash(digest.as_slice().try_into()?)) } impl BackgroundTask for SupportBundleCollector { @@ -1839,4 +2154,29 @@ mod test { SupportBundleCollectionStepStatus::Skipped ); } + + // Ensure that we can convert a temporary directory into a zipfile + #[test] + fn test_zipfile_creation() { + let dir = camino_tempfile::tempdir().unwrap(); + let tempdir_for_zip = camino_tempfile::tempdir().unwrap(); + + std::fs::create_dir_all(dir.path().join("dir-a")).unwrap(); + std::fs::create_dir_all(dir.path().join("dir-b")).unwrap(); + std::fs::write(dir.path().join("dir-a").join("file-a"), "some data") + .unwrap(); + std::fs::write(dir.path().join("file-b"), "more data").unwrap(); + + let zipfile = bundle_to_zipfile(&dir, tempdir_for_zip.path()) + .expect("Should have been able to bundle zipfile"); + let archive = zip::read::ZipArchive::new(zipfile).unwrap(); + + // We expect the order to be deterministically alphabetical + let mut names = archive.file_names(); + assert_eq!(names.next(), Some("dir-a/")); + assert_eq!(names.next(), Some("dir-a/file-a")); + assert_eq!(names.next(), Some("dir-b/")); + assert_eq!(names.next(), Some("file-b")); + assert_eq!(names.next(), None); + } }