From 8ff27b1642f44844cfc83510845f9457a2def593 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 26 Jun 2025 17:16:17 +0000 Subject: [PATCH 1/3] [4/5] User data export: background task and sagas The determination in RFD 563 was to attach _all_ read-only resources to a Pantry for export. This is implemented by a new "user data export coordinator" background task that will read the contents of the changeset (added in PR 1 in this set), and create user data export objects in their initial "Requested" state. This commit also adds create and delete sagas for these objects - a saga is required for the safe locking and unlocking pattern that was used, along with the correct behaviour for interacting with a Pantry. The new background task invokes these sagas based on the contents of the computed changeset. The last thing that the background task will do is query for a list of in-service Pantry addresses, and use that to mark any user data export objects that reference an expunged Pantry. This commit also fixes a previously committed bug: do not return user data export objects in the "delete required" part of the changeset if it was already deleted! --- dev-tools/omdb/src/bin/omdb/nexus.rs | 52 + dev-tools/omdb/tests/env.out | 12 + dev-tools/omdb/tests/successes.out | 26 + nexus-config/src/nexus_config.rs | 17 + nexus/background-task-interface/src/init.rs | 1 + .../src/db/datastore/user_data_export.rs | 118 +- nexus/examples/config-second.toml | 1 + nexus/examples/config.toml | 1 + nexus/src/app/background/init.rs | 19 +- nexus/src/app/background/tasks/mod.rs | 1 + .../tasks/user_data_export_coordinator.rs | 1044 +++++++++++++++++ nexus/src/app/sagas/common_storage.rs | 50 + nexus/src/app/sagas/mod.rs | 12 +- .../src/app/sagas/user_data_export_create.rs | 691 +++++++++++ .../src/app/sagas/user_data_export_delete.rs | 647 ++++++++++ nexus/test-utils/src/background.rs | 27 + nexus/tests/config.test.toml | 1 + nexus/types/src/internal_api/background.rs | 11 + smf/nexus/multi-sled/config-partial.toml | 1 + smf/nexus/single-sled/config-partial.toml | 1 + 20 files changed, 2721 insertions(+), 12 deletions(-) create mode 100644 nexus/src/app/background/tasks/user_data_export_coordinator.rs create mode 100644 nexus/src/app/sagas/user_data_export_create.rs create mode 100644 nexus/src/app/sagas/user_data_export_delete.rs diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 2bec4896d33..418db30b1a4 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -86,6 +86,7 @@ use nexus_types::internal_api::background::TufArtifactReplicationCounters; use nexus_types::internal_api::background::TufArtifactReplicationRequest; use nexus_types::internal_api::background::TufArtifactReplicationStatus; use nexus_types::internal_api::background::TufRepoPrunerStatus; +use nexus_types::internal_api::background::UserDataExportCoordinatorStatus; use nexus_types::internal_api::background::fm_rendezvous; use omicron_uuid_kinds::BlueprintUuid; use omicron_uuid_kinds::CollectionUuid; @@ -1369,6 +1370,9 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) { "trust_quorum_manager" => { print_task_trust_quorum_manager(details); } + "user_data_export_coordinator" => { + print_task_user_data_export_coordinator(details); + } _ => { println!( "warning: unknown background task: {:?} \ @@ -3174,6 +3178,7 @@ fn print_task_alert_dispatcher(details: &serde_json::Value) { ); } } + fn print_task_webhook_deliverator(details: &serde_json::Value) { use nexus_types::external_api::alert::WebhookDeliveryAttemptResult; use nexus_types::internal_api::background::WebhookDeliveratorStatus; @@ -3933,6 +3938,53 @@ fn print_task_trust_quorum_manager(details: &serde_json::Value) { } } +fn print_task_user_data_export_coordinator(details: &serde_json::Value) { + match serde_json::from_value::( + details.clone(), + ) { + Err(error) => eprintln!( + "warning: failed to interpret task details: {:?}: {:?}", + error, details + ), + + Ok(status) => { + println!( + " total create steps invoked ok: {}", + status.create_invoked_ok.len(), + ); + for line in &status.create_invoked_ok { + println!(" > {line}"); + } + + println!( + " total delete steps invoked ok: {}", + status.delete_invoked_ok.len(), + ); + for line in &status.delete_invoked_ok { + println!(" > {line}"); + } + + println!( + " total records affected by expunge: {}", + status.records_marked_for_deletion, + ); + + println!( + " total records fast-deleted (resource was deleted): {}", + status.records_bypassed_ok.len(), + ); + for line in &status.records_bypassed_ok { + println!(" > {line}"); + } + + println!(" errors: {}", status.errors.len()); + for line in &status.errors { + println!(" > {line}"); + } + } + } +} + const ERRICON: &str = "/!\\"; fn warn_if_nonzero(n: usize) -> &'static str { diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index f6457634107..29333a435ed 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -254,6 +254,10 @@ task: "tuf_repo_pruner" determine which TUF repos' artifacts can be pruned +task: "user_data_export_coordinator" + make the user resources available for export + + task: "v2p_manager" manages opte v2p mappings for vpc networking @@ -517,6 +521,10 @@ task: "tuf_repo_pruner" determine which TUF repos' artifacts can be pruned +task: "user_data_export_coordinator" + make the user resources available for export + + task: "v2p_manager" manages opte v2p mappings for vpc networking @@ -767,6 +775,10 @@ task: "tuf_repo_pruner" determine which TUF repos' artifacts can be pruned +task: "user_data_export_coordinator" + make the user resources available for export + + task: "v2p_manager" manages opte v2p mappings for vpc networking diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index ba66911f6e4..ae19b74e93d 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -489,6 +489,10 @@ task: "tuf_repo_pruner" determine which TUF repos' artifacts can be pruned +task: "user_data_export_coordinator" + make the user resources available for export + + task: "v2p_manager" manages opte v2p mappings for vpc networking @@ -1039,6 +1043,17 @@ task: "tuf_repo_pruner" repos kept because they're recent uploads: none other repos eligible for pruning: none +task: "user_data_export_coordinator" + configured period: every m + last completed activation: , triggered by + started at (s ago) and ran for ms + total create steps invoked ok: 0 + total delete steps invoked ok: 0 + total records affected by expunge: 0 + total records fast-deleted (resource was deleted): 0 + errors: 1 + > error looking up in-service Pantries: proto error: no records found for Query { name: Name("_crucible-pantry._tcp.control-plane.oxide.internal."), query_type: SRV, query_class: IN } + task: "v2p_manager" configured period: every s last completed activation: , triggered by @@ -1720,6 +1735,17 @@ task: "tuf_repo_pruner" repos kept because they're recent uploads: none other repos eligible for pruning: none +task: "user_data_export_coordinator" + configured period: every m + last completed activation: , triggered by + started at (s ago) and ran for ms + total create steps invoked ok: 0 + total delete steps invoked ok: 0 + total records affected by expunge: 0 + total records fast-deleted (resource was deleted): 0 + errors: 1 + > error looking up in-service Pantries: proto error: no records found for Query { name: Name("_crucible-pantry._tcp.control-plane.oxide.internal."), query_type: SRV, query_class: IN } + task: "v2p_manager" configured period: every s last completed activation: , triggered by diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index c877645a239..dce4969c3f2 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -442,6 +442,8 @@ pub struct BackgroundTaskConfig { pub audit_log_timeout_incomplete: AuditLogTimeoutIncompleteConfig, /// configuration for audit log cleanup (retention) task pub audit_log_cleanup: AuditLogCleanupConfig, + /// configuration for user data export coordinator task + pub user_data_export_coordinator: UserDataExportCoordinatorConfig, } #[serde_as] @@ -1042,6 +1044,15 @@ pub struct TrustQuorumConfig { pub period_secs: Duration, } +#[serde_as] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct UserDataExportCoordinatorConfig { + /// period (in seconds) for periodic activations of this background task + /// that managed user data export objects + #[serde_as(as = "DurationSeconds")] + pub period_secs: Duration, +} + /// Configuration for a nexus server #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct PackageConfig { @@ -1329,6 +1340,7 @@ mod test { audit_log_cleanup.period_secs = 600 audit_log_cleanup.retention_days = 90 audit_log_cleanup.max_deleted_per_activation = 10000 + user_data_export_coordinator.period_secs = 60 [default_region_allocation_strategy] type = "random" seed = 0 @@ -1609,6 +1621,10 @@ mod test { retention_days: NonZeroU32::new(90).unwrap(), max_deleted_per_activation: 10_000, }, + user_data_export_coordinator: + UserDataExportCoordinatorConfig { + period_secs: Duration::from_secs(60), + }, }, multicast: MulticastConfig { enabled: false }, default_region_allocation_strategy: @@ -1724,6 +1740,7 @@ mod test { audit_log_cleanup.period_secs = 600 audit_log_cleanup.retention_days = 90 audit_log_cleanup.max_deleted_per_activation = 10000 + user_data_export_coordinator.period_secs = 60 [default_region_allocation_strategy] type = "random" diff --git a/nexus/background-task-interface/src/init.rs b/nexus/background-task-interface/src/init.rs index 62b9f0c66df..513d9cd5364 100644 --- a/nexus/background-task-interface/src/init.rs +++ b/nexus/background-task-interface/src/init.rs @@ -62,6 +62,7 @@ pub struct BackgroundTasks { pub task_trust_quorum_manager: Activator, pub task_attached_subnet_manager: Activator, pub task_session_cleanup: Activator, + pub task_user_data_export_coordinator: Activator, // Handles to activate background tasks that do not get used by Nexus // at-large. These background tasks are implementation details as far as diff --git a/nexus/db-queries/src/db/datastore/user_data_export.rs b/nexus/db-queries/src/db/datastore/user_data_export.rs index 875385a2503..46406b25d40 100644 --- a/nexus/db-queries/src/db/datastore/user_data_export.rs +++ b/nexus/db-queries/src/db/datastore/user_data_export.rs @@ -310,7 +310,7 @@ impl DataStore { /// associated delete saga run /// /// This function also marks user data export records as deleted if the - /// associated resource was itself delete. + /// associated resource was itself deleted. pub async fn compute_user_data_export_changeset( &self, opctx: &OpContext, @@ -421,8 +421,12 @@ impl DataStore { .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + // Filter on state != Deleted to avoid returning a constantly + // accumulating list records to delete. + let records: Vec = dsl::user_data_export .filter(dsl::resource_deleted.eq(true)) + .filter(dsl::state.ne(UserDataExportState::Deleted)) .select(UserDataExportRecord::as_select()) .load_async(&*conn) .await @@ -1815,9 +1819,8 @@ mod tests { .await .unwrap(); - // The changeset should now show that a record for this iamge needs to - // be created, along with the old record still needing the associated - // delete saga. + // The changeset should now show that a record for this image needs to + // be created. let changeset = datastore.compute_user_data_export_changeset(&opctx).await.unwrap(); @@ -1826,8 +1829,7 @@ mod tests { &[UserDataExportResource::Image { id: image.id() }], ); assert!(changeset.create_required.is_empty()); - assert_eq!(changeset.delete_required.len(), 1); - assert_eq!(changeset.delete_required[0].id(), record.id()); + assert!(changeset.delete_required.is_empty()); // Now it should work. @@ -1906,4 +1908,108 @@ mod tests { db.terminate().await; logctx.cleanup_successful(); } + + /// Assert that deleted records are not part of the changeset anymore. + #[tokio::test] + async fn test_deleted_not_part_of_changeset() { + let logctx = dev::test_setup_log("test_deleted_not_part_of_changeset"); + let log = logctx.log.new(o!()); + let db = TestDatabase::new_with_datastore(&log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let (authz_project, _) = + create_project(&opctx, &datastore, PROJECT_NAME).await; + + let snapshot = create_project_snapshot( + &opctx, + &datastore, + &authz_project, + Uuid::new_v4(), + "snap", + ) + .await; + + // Create a regular record and move it to Live + + let record = datastore + .user_data_export_create_for_snapshot( + &opctx, + UserDataExportUuid::new_v4(), + snapshot.id(), + ) + .await + .unwrap(); + + let operating_saga_id = Uuid::new_v4(); + + datastore + .set_user_data_export_requested_to_assigning( + &opctx, + record.id(), + operating_saga_id, + 1, + ) + .await + .unwrap(); + + datastore + .set_user_data_export_assigning_to_live( + &opctx, + record.id(), + operating_saga_id, + 1, + SocketAddrV6::new(Ipv6Addr::LOCALHOST, 8080, 0, 0), + VolumeUuid::new_v4(), + ) + .await + .unwrap(); + + // The changeset should be empty now + + let changeset = + datastore.compute_user_data_export_changeset(&opctx).await.unwrap(); + assert!(changeset.request_required.is_empty()); + assert!(changeset.create_required.is_empty()); + assert!(changeset.delete_required.is_empty()); + + // Marking the record's resource as deleted means it will show up in the + // delete_required list. + + datastore.user_data_export_mark_deleted(record.id()).await.unwrap(); + + let changeset = + datastore.compute_user_data_export_changeset(&opctx).await.unwrap(); + assert!(!changeset.delete_required.is_empty()); + + // Move the record to deleting. + + datastore + .set_user_data_export_live_to_deleting( + &opctx, + record.id(), + operating_saga_id, + 2, + ) + .await + .unwrap(); + + datastore + .set_user_data_export_deleting_to_deleted( + &opctx, + record.id(), + operating_saga_id, + 2, + ) + .await + .unwrap(); + + // The record should _not_ show up in the delete_required list anymore. + + let changeset = + datastore.compute_user_data_export_changeset(&opctx).await.unwrap(); + assert!(changeset.delete_required.is_empty()); + + db.terminate().await; + logctx.cleanup_successful(); + } } diff --git a/nexus/examples/config-second.toml b/nexus/examples/config-second.toml index 3c1a1a3700a..fcc145ecd0c 100644 --- a/nexus/examples/config-second.toml +++ b/nexus/examples/config-second.toml @@ -203,6 +203,7 @@ audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 audit_log_cleanup.period_secs = 600 audit_log_cleanup.retention_days = 90 audit_log_cleanup.max_deleted_per_activation = 10000 +user_data_export_coordinator.period_secs = 240 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/examples/config.toml b/nexus/examples/config.toml index b4026bfb1de..1beac908aa4 100644 --- a/nexus/examples/config.toml +++ b/nexus/examples/config.toml @@ -187,6 +187,7 @@ audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 audit_log_cleanup.period_secs = 600 audit_log_cleanup.retention_days = 90 audit_log_cleanup.max_deleted_per_activation = 10000 +user_data_export_coordinator.period_secs = 240 [default_region_allocation_strategy] # allocate region on 3 random distinct zpools, on 3 random distinct sleds. diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index 5f7e9a07da8..df75991cf4b 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -141,6 +141,7 @@ use super::tasks::sync_switch_configuration::SwitchPortSettingsManager; use super::tasks::trust_quorum; use super::tasks::tuf_artifact_replication; use super::tasks::tuf_repo_pruner; +use super::tasks::user_data_export_coordinator::*; use super::tasks::v2p_mappings::V2PManager; use super::tasks::vpc_routes; use super::tasks::webhook_deliverator; @@ -277,6 +278,7 @@ impl BackgroundTasksInitializer { task_trust_quorum_manager: Activator::new(), task_attached_subnet_manager: Activator::new(), task_session_cleanup: Activator::new(), + task_user_data_export_coordinator: Activator::new(), // Handles to activate background tasks that do not get used by Nexus // at-large. These background tasks are implementation details as far as @@ -372,6 +374,7 @@ impl BackgroundTasksInitializer { task_session_cleanup, task_audit_log_timeout_incomplete, task_audit_log_cleanup, + task_user_data_export_coordinator, // Add new background tasks here. Be sure to use this binding in a // call to `Driver::register()` below. That's what actually wires // up the Activator to the corresponding background task. @@ -1221,7 +1224,7 @@ impl BackgroundTasksInitializer { description: "distributes attached subnets to sleds and switch", period: config.attached_subnet_manager.period_secs, task_impl: Box::new(attached_subnets::Manager::new( - resolver, + resolver.clone(), datastore.clone(), )), opctx: opctx.child(BTreeMap::new()), @@ -1269,7 +1272,7 @@ impl BackgroundTasksInitializer { than the retention period", period: config.audit_log_cleanup.period_secs, task_impl: Box::new(audit_log_cleanup::AuditLogCleanup::new( - datastore, + datastore.clone(), config.audit_log_cleanup.retention_days, config.audit_log_cleanup.max_deleted_per_activation, )), @@ -1278,6 +1281,18 @@ impl BackgroundTasksInitializer { activator: task_audit_log_cleanup, }); + driver.register(TaskDefinition { + name: "user_data_export_coordinator", + description: "make the user resources available for export", + period: config.user_data_export_coordinator.period_secs, + task_impl: Box::new(UserDataExportCoordinator::new( + datastore, sagas, resolver, + )), + opctx: opctx.child(BTreeMap::new()), + watchers: vec![], + activator: task_user_data_export_coordinator, + }); + driver } } diff --git a/nexus/src/app/background/tasks/mod.rs b/nexus/src/app/background/tasks/mod.rs index fdcb45ef8d0..52a4ee69b26 100644 --- a/nexus/src/app/background/tasks/mod.rs +++ b/nexus/src/app/background/tasks/mod.rs @@ -56,6 +56,7 @@ pub mod sync_switch_configuration; pub mod trust_quorum; pub mod tuf_artifact_replication; pub mod tuf_repo_pruner; +pub mod user_data_export_coordinator; pub mod v2p_mappings; pub mod vpc_routes; pub mod webhook_deliverator; diff --git a/nexus/src/app/background/tasks/user_data_export_coordinator.rs b/nexus/src/app/background/tasks/user_data_export_coordinator.rs new file mode 100644 index 00000000000..aadff096993 --- /dev/null +++ b/nexus/src/app/background/tasks/user_data_export_coordinator.rs @@ -0,0 +1,1044 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! This background task will process the user data export changeset, create the +//! appropriate user data export records, and trigger the appropriate create or +//! delete saga according to the changeset contents. It will also check if any +//! Pantry used for user data export is no longer in DNS, and will delete +//! affected records so they can be re-created. + +use crate::app::authn; +use crate::app::background::BackgroundTask; +use crate::app::saga::StartSaga; +use crate::app::sagas; +use crate::app::sagas::NexusSaga; +use crate::app::sagas::user_data_export_create::*; +use crate::app::sagas::user_data_export_delete::*; +use futures::FutureExt; +use futures::future::BoxFuture; +use internal_dns_resolver::Resolver; +use internal_dns_types::names::ServiceName; +use nexus_db_model::UserDataExportRecord; +use nexus_db_model::UserDataExportResource; +use nexus_db_model::UserDataExportState; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db::DataStore; +use nexus_db_queries::db::datastore::UserDataExportChangeset; +use nexus_types::internal_api::background::UserDataExportCoordinatorStatus; +use omicron_uuid_kinds::UserDataExportUuid; +use serde_json::json; +use std::sync::Arc; + +pub struct UserDataExportCoordinator { + datastore: Arc, + sagas: Arc, + resolver: Resolver, +} + +impl UserDataExportCoordinator { + pub fn new( + datastore: Arc, + sagas: Arc, + resolver: Resolver, + ) -> Self { + UserDataExportCoordinator { datastore, sagas, resolver } + } + + async fn delete_records_for_expunged_pantries( + &self, + opctx: &OpContext, + status: &mut UserDataExportCoordinatorStatus, + ) { + let log = &opctx.log; + let in_service_pantries = match self + .resolver + .lookup_all_socket_v6(ServiceName::CruciblePantry) + .await + { + Ok(pantries) => pantries, + Err(e) => { + let s = format!("error looking up in-service Pantries: {e}"); + error!(&log, "{s}"); + status.errors.push(s); + return; + } + }; + + let in_service_pantries = in_service_pantries + .into_iter() + .map(|socketaddr| socketaddr.ip().into()) + .collect(); + + let result = self + .datastore + .user_data_export_mark_expunged_deleted(opctx, in_service_pantries) + .await; + + match result { + Ok(records_marked_for_deletion) => { + status.records_marked_for_deletion = + records_marked_for_deletion; + } + + Err(e) => { + let s = format!( + "error deleting records for expunged Pantries: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + } + } + } + + async fn create_user_data_export_record( + &self, + opctx: &OpContext, + resource: &UserDataExportResource, + ) -> anyhow::Result { + let user_data_export_id = UserDataExportUuid::new_v4(); + + Ok(match resource { + UserDataExportResource::Snapshot { id } => { + self.datastore + .user_data_export_create_for_snapshot( + &opctx, + user_data_export_id, + *id, + ) + .await? + } + + UserDataExportResource::Image { id: image_id } => { + self.datastore + .user_data_export_create_for_image( + &opctx, + user_data_export_id, + *image_id, + ) + .await? + } + }) + } + + async fn process_user_data_export_changeset( + &self, + opctx: &OpContext, + changeset: &UserDataExportChangeset, + status: &mut UserDataExportCoordinatorStatus, + ) { + let log = &opctx.log; + + for item in &changeset.request_required { + // First create the record in state "Requested" first + let record = + match self.create_user_data_export_record(opctx, &item).await { + Ok(record) => record, + + Err(e) => { + let s = format!( + "error creating user data export record for \ + {item:?}: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + + continue; + } + }; + + // Then invoke the create saga + let params = sagas::user_data_export_create::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + id: record.id(), + }; + + let saga_dag = match SagaUserDataExportCreate::prepare(¶ms) { + Ok(dag) => dag, + + Err(e) => { + let s = format!( + "error preparing user data export create dag for \ + {item:?}: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + + continue; + } + }; + + match self.sagas.saga_start(saga_dag).await { + Ok(id) => { + let s = format!( + "requested user data export create saga for {item:?}" + ); + info!(&log, "{s}"; "saga_id" => %id); + status.create_invoked_ok.push(s); + } + + Err(e) => { + let s = format!( + "error requesting user data export create for \ + {item:?}: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + } + } + } + + for item in &changeset.create_required { + // Invoke the create saga for anything in state Requested + + let params = sagas::user_data_export_create::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + id: item.id(), + }; + + let saga_dag = match SagaUserDataExportCreate::prepare(¶ms) { + Ok(dag) => dag, + + Err(e) => { + let s = format!( + "error preparing user data export create dag for \ + {item:?}: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + + continue; + } + }; + + match self.sagas.saga_start(saga_dag).await { + Ok(id) => { + let s = format!( + "requested user data export create saga for {item:?}" + ); + info!(&log, "{s}"; "saga_id" => %id); + status.create_invoked_ok.push(s); + } + + Err(e) => { + let s = format!( + "error requesting user data export create for \ + {item:?}: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + } + } + } + + for record in &changeset.delete_required { + let volume_id = match record.state() { + UserDataExportState::Requested => { + // Set the record's state straight to Deleted, the resource + // is gone. + match self + .datastore + .set_user_data_export_requested_to_deleted( + opctx, + record.id(), + ) + .await + { + Ok(()) => { + let s = format!( + "transitioned user data export {} from \ + requested to deleted", + record.id(), + ); + info!(&log, "{s}"); + status.records_bypassed_ok.push(s); + continue; + } + + Err(e) => { + let s = format!( + "error transitioning user data export {} \ + from requested to deleted: {e}", + record.id(), + ); + error!(&log, "{s}"); + status.errors.push(s); + continue; + } + } + } + + UserDataExportState::Assigning => { + // Do nothing, a create saga is operating on this record + // (note this saga must have been invoked _before_ the + // resource was deleted, as the create saga cannot change + // the record's state to Assigning if the resource_deleted + // column is true). + continue; + } + + UserDataExportState::Live => { + let Some(volume_id) = record.volume_id() else { + let s = format!( + "record {} state Live volume id is None!", + record.id(), + ); + + error!(&log, "{s}"); + status.errors.push(s); + continue; + }; + + volume_id + } + + UserDataExportState::Deleting => { + // do nothing, a delete saga is operating on this record + continue; + } + + UserDataExportState::Deleted => { + // We shouldn't ever get here: the changeset shouldn't + // include a record that needs deletion but is already + // deleted. + continue; + } + }; + + let params = sagas::user_data_export_delete::Params { + serialized_authn: authn::saga::Serialized::for_opctx(opctx), + user_data_export_id: record.id(), + volume_id, + }; + + let saga_dag = match SagaUserDataExportDelete::prepare(¶ms) { + Ok(dag) => dag, + + Err(e) => { + let s = format!( + "error preparing user data export delete dag for \ + {record:?}: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + + continue; + } + }; + + match self.sagas.saga_start(saga_dag).await { + Ok(id) => { + let s = format!( + "requested user data export delete saga for {record:?}" + ); + info!(&log, "{s}"; "saga_id" => %id); + status.delete_invoked_ok.push(s); + } + + Err(e) => { + let s = format!( + "error requesting user data export delete for \ + {record:?}: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + } + } + } + } +} + +impl BackgroundTask for UserDataExportCoordinator { + fn activate<'a>( + &'a mut self, + opctx: &'a OpContext, + ) -> BoxFuture<'a, serde_json::Value> { + async move { + let log = &opctx.log; + let mut status = UserDataExportCoordinatorStatus::default(); + + self.delete_records_for_expunged_pantries(&opctx, &mut status) + .await; + + let changeset = match self + .datastore + .compute_user_data_export_changeset(&opctx) + .await + { + Ok(changeset) => changeset, + + Err(e) => { + let s = format!( + "error getting user data export changeset: {e}" + ); + error!(&log, "{s}"); + status.errors.push(s); + + return json!(status); + } + }; + + error!(&log, "changeset is {changeset:?}"); + + self.process_user_data_export_changeset( + &opctx, + &changeset, + &mut status, + ) + .await; + + json!(status) + } + .boxed() + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::app::MIN_DISK_SIZE_BYTES; + use crate::app::authz; + use crate::app::background::init::test::NoopStartSaga; + use chrono::Utc; + use nexus_db_lookup::LookupPath; + use nexus_db_model::BlockSize; + use nexus_db_model::Generation; + use nexus_db_model::ProjectImage; + use nexus_db_model::ProjectImageIdentity; + use nexus_db_model::Snapshot; + use nexus_db_model::SnapshotIdentity; + use nexus_db_model::SnapshotState; + use nexus_db_model::UserDataExportResource; + use nexus_test_utils::resource_helpers::create_default_ip_pools; + use nexus_test_utils::resource_helpers::create_project; + + use nexus_test_utils_macros::nexus_test; + use nexus_types::identity::Resource; + use omicron_common::api::external; + + use omicron_uuid_kinds::UserDataExportUuid; + use omicron_uuid_kinds::VolumeUuid; + use std::net::Ipv6Addr; + use std::net::SocketAddrV6; + use uuid::Uuid; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + const PROJECT_NAME: &str = "bobs-barrel-of-bits"; + + async fn setup_test_project( + cptestctx: &ControlPlaneTestContext, + opctx: &OpContext, + ) -> authz::Project { + create_default_ip_pools(&cptestctx.external_client).await; + + let project = + create_project(&cptestctx.external_client, PROJECT_NAME).await; + + let datastore = cptestctx.server.server_context().nexus.datastore(); + + let (_, authz_project) = LookupPath::new(opctx, datastore) + .project_id(project.identity.id) + .lookup_for(authz::Action::CreateChild) + .await + .expect("project must exist"); + + authz_project + } + + #[nexus_test(server = crate::Server)] + async fn test_user_data_export_coordinator_task_noop( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopStartSaga::new()); + let resolver = nexus.resolver(); + let mut task = UserDataExportCoordinator::new( + datastore.clone(), + starter.clone(), + resolver.clone(), + ); + + let result: UserDataExportCoordinatorStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert_eq!(result, UserDataExportCoordinatorStatus::default()); + assert_eq!(starter.count_reset(), 0); + } + + #[nexus_test(server = crate::Server)] + async fn test_user_data_export_coordinator_task_create( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopStartSaga::new()); + let resolver = nexus.resolver(); + let mut task = UserDataExportCoordinator::new( + datastore.clone(), + starter.clone(), + resolver.clone(), + ); + + let authz_project = setup_test_project(cptestctx, &opctx).await; + + // Add a snapshot and image + + let snapshot = datastore + .project_ensure_snapshot( + &opctx, + &authz_project, + Snapshot { + identity: SnapshotIdentity { + id: Uuid::new_v4(), + name: external::Name::try_from("snapshot".to_string()) + .unwrap() + .into(), + description: "snapshot".into(), + + time_created: Utc::now(), + time_modified: Utc::now(), + time_deleted: None, + }, + + project_id: authz_project.id(), + disk_id: Uuid::new_v4(), + volume_id: VolumeUuid::new_v4().into(), + destination_volume_id: VolumeUuid::new_v4().into(), + + generation: Generation::new(), + state: SnapshotState::Creating, + block_size: BlockSize::AdvancedFormat, + + size: external::ByteCount::try_from( + 2 * MIN_DISK_SIZE_BYTES, + ) + .unwrap() + .into(), + }, + ) + .await + .unwrap(); + + let image = datastore + .project_image_create( + &opctx, + &authz_project, + ProjectImage { + identity: ProjectImageIdentity { + id: Uuid::new_v4(), + name: external::Name::try_from("image".to_string()) + .unwrap() + .into(), + description: "description".into(), + + time_created: Utc::now(), + time_modified: Utc::now(), + time_deleted: None, + }, + + silo_id: Uuid::new_v4(), + project_id: authz_project.id(), + volume_id: VolumeUuid::new_v4().into(), + + url: None, + os: String::from("debian"), + version: String::from("12"), + digest: None, + block_size: BlockSize::Iso, + + size: external::ByteCount::try_from(MIN_DISK_SIZE_BYTES) + .unwrap() + .into(), + }, + ) + .await + .unwrap(); + + // Activate the task - it should try to create user data export objects + // for the snapshot and image + + let result: UserDataExportCoordinatorStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + eprintln!("{result:?}"); + + assert_eq!(result.create_invoked_ok.len(), 2); + + let s = format!( + "{:?}", + UserDataExportResource::Snapshot { id: snapshot.id() }, + ); + assert!(result.create_invoked_ok.iter().any(|i| i.contains(&s))); + + let s = + format!("{:?}", UserDataExportResource::Image { id: image.id() },); + assert!(result.create_invoked_ok.iter().any(|i| i.contains(&s))); + + assert_eq!(result.errors.len(), 0); + + assert_eq!(starter.count_reset(), 2); + } + + /// Assert that the background task will try to run the creation saga if it + /// unwinds the record back to the Requested state. + #[nexus_test(server = crate::Server)] + async fn test_user_data_export_coordinator_task_create_after_unwind( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopStartSaga::new()); + let resolver = nexus.resolver(); + let mut task = UserDataExportCoordinator::new( + datastore.clone(), + starter.clone(), + resolver.clone(), + ); + + let authz_project = setup_test_project(cptestctx, &opctx).await; + + // Add a snapshot + + let snapshot = datastore + .project_ensure_snapshot( + &opctx, + &authz_project, + Snapshot { + identity: SnapshotIdentity { + id: Uuid::new_v4(), + name: external::Name::try_from("snapshot".to_string()) + .unwrap() + .into(), + description: "snapshot".into(), + + time_created: Utc::now(), + time_modified: Utc::now(), + time_deleted: None, + }, + + project_id: authz_project.id(), + disk_id: Uuid::new_v4(), + volume_id: VolumeUuid::new_v4().into(), + destination_volume_id: VolumeUuid::new_v4().into(), + + generation: Generation::new(), + state: SnapshotState::Creating, + block_size: BlockSize::AdvancedFormat, + + size: external::ByteCount::try_from( + 2 * MIN_DISK_SIZE_BYTES, + ) + .unwrap() + .into(), + }, + ) + .await + .unwrap(); + + // Create the user data export record + + let snapshot_record = datastore + .user_data_export_create_for_snapshot( + &opctx, + UserDataExportUuid::new_v4(), + snapshot.id(), + ) + .await + .unwrap(); + + // Activate the task - it should try to run the saga for the record in + // state Requested + + let result: UserDataExportCoordinatorStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + eprintln!("{result:?}"); + + assert_eq!(result.create_invoked_ok.len(), 1); + + let s = format!("UserDataExportRecord {{ id: {}", snapshot_record.id()); + assert!(result.create_invoked_ok[0].contains(&s)); + + assert_eq!(result.errors.len(), 0); + + assert_eq!(starter.count_reset(), 1); + } + + #[nexus_test(server = crate::Server)] + async fn test_user_data_export_coordinator_task_delete( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopStartSaga::new()); + let resolver = nexus.resolver(); + let mut task = UserDataExportCoordinator::new( + datastore.clone(), + starter.clone(), + resolver.clone(), + ); + + let authz_project = setup_test_project(cptestctx, &opctx).await; + + // Add a snapshot and image + + let snapshot = datastore + .project_ensure_snapshot( + &opctx, + &authz_project, + Snapshot { + identity: SnapshotIdentity { + id: Uuid::new_v4(), + name: external::Name::try_from("snapshot".to_string()) + .unwrap() + .into(), + description: "snapshot".into(), + + time_created: Utc::now(), + time_modified: Utc::now(), + time_deleted: None, + }, + + project_id: authz_project.id(), + disk_id: Uuid::new_v4(), + volume_id: VolumeUuid::new_v4().into(), + destination_volume_id: VolumeUuid::new_v4().into(), + + generation: Generation::new(), + state: SnapshotState::Creating, + block_size: BlockSize::AdvancedFormat, + + size: external::ByteCount::try_from( + 2 * MIN_DISK_SIZE_BYTES, + ) + .unwrap() + .into(), + }, + ) + .await + .unwrap(); + + let image = datastore + .project_image_create( + &opctx, + &authz_project, + ProjectImage { + identity: ProjectImageIdentity { + id: Uuid::new_v4(), + name: external::Name::try_from("image".to_string()) + .unwrap() + .into(), + description: "description".into(), + + time_created: Utc::now(), + time_modified: Utc::now(), + time_deleted: None, + }, + + silo_id: Uuid::new_v4(), + project_id: authz_project.id(), + volume_id: VolumeUuid::new_v4().into(), + + url: None, + os: String::from("debian"), + version: String::from("12"), + digest: None, + block_size: BlockSize::Iso, + + size: external::ByteCount::try_from(MIN_DISK_SIZE_BYTES) + .unwrap() + .into(), + }, + ) + .await + .unwrap(); + + // Create user data export rows for the snapshot and image + + let (.., authz_snapshot, db_snapshot) = + LookupPath::new(&opctx, datastore) + .snapshot_id(snapshot.id()) + .fetch_for(authz::Action::Read) + .await + .unwrap(); + + let snapshot_record = datastore + .user_data_export_create_for_snapshot( + &opctx, + UserDataExportUuid::new_v4(), + snapshot.id(), + ) + .await + .unwrap(); + + let image_record = datastore + .user_data_export_create_for_image( + &opctx, + UserDataExportUuid::new_v4(), + image.id(), + ) + .await + .unwrap(); + + // Transition both records from Requested to Live + + let operating_saga_id = Uuid::new_v4(); + let generation = 2; + + datastore + .set_user_data_export_requested_to_assigning( + &opctx, + image_record.id(), + operating_saga_id, + generation, + ) + .await + .unwrap(); + + datastore + .set_user_data_export_requested_to_assigning( + &opctx, + snapshot_record.id(), + operating_saga_id, + generation, + ) + .await + .unwrap(); + + datastore + .set_user_data_export_assigning_to_live( + &opctx, + image_record.id(), + operating_saga_id, + generation, + SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0), + VolumeUuid::new_v4(), + ) + .await + .unwrap(); + + datastore + .set_user_data_export_assigning_to_live( + &opctx, + snapshot_record.id(), + operating_saga_id, + generation, + SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0), + VolumeUuid::new_v4(), + ) + .await + .unwrap(); + + // Activate the task - it should do nothing, as there are user data + // export objects already, and they are in state Live. + + let result: UserDataExportCoordinatorStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert_eq!(result, UserDataExportCoordinatorStatus::default()); + assert_eq!(starter.count_reset(), 0); + + // Delete the snapshot + + datastore + .project_delete_snapshot( + &opctx, + &authz_snapshot, + &db_snapshot, + vec![SnapshotState::Creating, SnapshotState::Ready], + ) + .await + .unwrap(); + + // Activate the task - it should only try to delete the user data export + // object associated with the snapshot + + let result: UserDataExportCoordinatorStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + eprintln!("{result:?}"); + + assert!(result.errors.is_empty()); + assert_eq!(result.create_invoked_ok.len(), 0); + assert_eq!(result.delete_invoked_ok.len(), 1); + + let s = format!("resource_id: {}", snapshot.id()); + assert!(result.delete_invoked_ok.iter().any(|i| i.contains(&s))); + + let s = format!("resource_id: {}", image.id()); + assert!(result.delete_invoked_ok.iter().all(|i| !i.contains(&s))); + + assert_eq!(result.errors.len(), 0); + assert_eq!(starter.count_reset(), 1); + + // Delete the image, now it should try to delete both. + + let (.., authz_image, db_image) = LookupPath::new(&opctx, datastore) + .project_image_id(image.id()) + .fetch_for(authz::Action::Read) + .await + .unwrap(); + + datastore + .project_image_delete(&opctx, &authz_image, db_image) + .await + .unwrap(); + + let result: UserDataExportCoordinatorStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + assert!(result.errors.is_empty()); + assert_eq!(result.create_invoked_ok.len(), 0); + assert_eq!(result.delete_invoked_ok.len(), 2); + + let s = format!("resource_id: {}", snapshot.id()); + assert!(result.delete_invoked_ok.iter().any(|i| i.contains(&s))); + + let s = format!("resource_id: {}", image.id()); + assert!(result.delete_invoked_ok.iter().any(|i| i.contains(&s))); + + assert_eq!(result.errors.len(), 0); + assert_eq!(starter.count_reset(), 2); + } + + #[nexus_test(server = crate::Server)] + async fn test_user_data_export_coordinator_task_bypass( + cptestctx: &ControlPlaneTestContext, + ) { + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = OpContext::for_tests( + cptestctx.logctx.log.clone(), + datastore.clone(), + ); + + let starter = Arc::new(NoopStartSaga::new()); + let resolver = nexus.resolver(); + let mut task = UserDataExportCoordinator::new( + datastore.clone(), + starter.clone(), + resolver.clone(), + ); + + let authz_project = setup_test_project(cptestctx, &opctx).await; + + // Add a snapshot and image + + let snapshot = datastore + .project_ensure_snapshot( + &opctx, + &authz_project, + Snapshot { + identity: SnapshotIdentity { + id: Uuid::new_v4(), + name: external::Name::try_from("snapshot".to_string()) + .unwrap() + .into(), + description: "snapshot".into(), + + time_created: Utc::now(), + time_modified: Utc::now(), + time_deleted: None, + }, + + project_id: authz_project.id(), + disk_id: Uuid::new_v4(), + volume_id: VolumeUuid::new_v4().into(), + destination_volume_id: VolumeUuid::new_v4().into(), + + generation: Generation::new(), + state: SnapshotState::Creating, + block_size: BlockSize::AdvancedFormat, + + size: external::ByteCount::try_from( + 2 * MIN_DISK_SIZE_BYTES, + ) + .unwrap() + .into(), + }, + ) + .await + .unwrap(); + + // Create a user data export row for the snapshot + + let (.., authz_snapshot, db_snapshot) = + LookupPath::new(&opctx, datastore) + .snapshot_id(snapshot.id()) + .fetch_for(authz::Action::Read) + .await + .unwrap(); + + let record = datastore + .user_data_export_create_for_snapshot( + &opctx, + UserDataExportUuid::new_v4(), + snapshot.id(), + ) + .await + .unwrap(); + + // The user data export record is in state Requested - delete the + // snapshot. + + datastore + .project_delete_snapshot( + &opctx, + &authz_snapshot, + &db_snapshot, + vec![SnapshotState::Creating, SnapshotState::Ready], + ) + .await + .unwrap(); + + // Activate the task + + let result: UserDataExportCoordinatorStatus = + serde_json::from_value(task.activate(&opctx).await).unwrap(); + + // The task should not have invoked any sagas or thrown any errors + + assert!(result.errors.is_empty()); + assert_eq!(starter.count_reset(), 0); + + // It should have "bypassed" the record + + assert_eq!( + &result.records_bypassed_ok, + &[format!( + "transitioned user data export {} from requested to deleted", + record.id(), + )], + ); + + // And done nothing else + + assert_eq!(result.create_invoked_ok.len(), 0); + assert_eq!(result.delete_invoked_ok.len(), 0); + assert_eq!(result.records_marked_for_deletion, 0); + } +} diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index b160c68e4c9..0ba4cb40d7f 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -175,6 +175,56 @@ pub(crate) async fn call_pantry_detach( .map(|_response| ()) } +/// Detach the volume from the Pantry, and it's ok if the Pantry bounced and +/// lost the attachment. +pub(crate) async fn call_pantry_detach_ok_if_gone( + nexus: &Nexus, + log: &slog::Logger, + attach_id: Uuid, + pantry_address: SocketAddrV6, +) -> Result<(), ProgenitorOperationRetryError> { + let endpoint = format!("http://{}", pantry_address); + + info!( + log, + "sending detach (ok if gone) for {attach_id} to endpoint {endpoint}", + ); + + let client = crucible_pantry_client::Client::new(&endpoint); + + let detach_operation = || async { + match client.volume_status(&attach_id.to_string()).await { + Err(e) => match e { + crucible_pantry_client::Error::ErrorResponse(ref rv) => { + if rv.status() == http::StatusCode::NOT_FOUND { + return Ok(()); + } + + return Err(e); + } + + _ => { + return Err(e); + } + }, + + Ok(_) => { + // volume still attached, proceed + } + } + + client.detach(&attach_id.to_string()).await.map(|_| ()) + }; + + let gone_check = + || async { Ok(is_pantry_gone(nexus, pantry_address, log).await) }; + + ProgenitorOperationRetry::new(detach_operation, gone_check) + .run(log) + .await + .map(|_response| ()) +} + pub(crate) fn find_only_new_region( log: &Logger, existing_datasets_and_regions: Vec<( diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index 6460974475d..5c24c158ada 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -59,6 +59,8 @@ pub mod snapshot_delete; pub mod subnet_attach; pub mod subnet_detach; pub mod test_saga; +pub mod user_data_export_create; +pub mod user_data_export_delete; pub mod volume_delete; pub mod volume_remove_rop; pub mod vpc_create; @@ -170,8 +172,8 @@ fn make_action_registry() -> ActionRegistry { disk_create::SagaDiskCreate, disk_delete::SagaDiskDelete, finalize_disk::SagaFinalizeDisk, - image_delete::SagaImageDelete, image_create::SagaImageCreate, + image_delete::SagaImageDelete, instance_create::SagaInstanceCreate, instance_delete::SagaInstanceDelete, instance_ip_attach::SagaInstanceIpAttach, @@ -181,18 +183,20 @@ fn make_action_registry() -> ActionRegistry { instance_update::SagaInstanceUpdate, multicast_group_dpd_ensure::SagaMulticastGroupDpdEnsure, project_create::SagaProjectCreate, - region_replacement_start::SagaRegionReplacementStart, region_replacement_drive::SagaRegionReplacementDrive, region_replacement_finish::SagaRegionReplacementFinish, - region_snapshot_replacement_start::SagaRegionSnapshotReplacementStart, + region_replacement_start::SagaRegionReplacementStart, + region_snapshot_replacement_finish::SagaRegionSnapshotReplacementFinish, region_snapshot_replacement_garbage_collect::SagaRegionSnapshotReplacementGarbageCollect, + region_snapshot_replacement_start::SagaRegionSnapshotReplacementStart, region_snapshot_replacement_step::SagaRegionSnapshotReplacementStep, region_snapshot_replacement_step_garbage_collect::SagaRegionSnapshotReplacementStepGarbageCollect, - region_snapshot_replacement_finish::SagaRegionSnapshotReplacementFinish, snapshot_create::SagaSnapshotCreate, snapshot_delete::SagaSnapshotDelete, subnet_attach::SagaSubnetAttach, subnet_detach::SagaSubnetDetach, + user_data_export_create::SagaUserDataExportCreate, + user_data_export_delete::SagaUserDataExportDelete, volume_delete::SagaVolumeDelete, volume_remove_rop::SagaVolumeRemoveROP, vpc_create::SagaVpcCreate, diff --git a/nexus/src/app/sagas/user_data_export_create.rs b/nexus/src/app/sagas/user_data_export_create.rs new file mode 100644 index 00000000000..8e4b0bf213b --- /dev/null +++ b/nexus/src/app/sagas/user_data_export_create.rs @@ -0,0 +1,691 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! When a user creates a resource that is backed by read-only data, like a +//! snapshot or image, a background task will call this saga to attach a copy of +//! the backing volume to a Pantry so that data can be exported through Nexus. +//! The intention is that users can export any read-only data at any time by +//! proxying through Nexus, without having to manage any implementation details. +//! +//! This saga will, for the argument record that's in state Requested: +//! +//! 1. Make a copy of the associate read-only object's volume +//! 2. Attach that volume to a random Pantry +//! 3. Transition the record to Live, setting the fields for the selected Pantry +//! and the volume id. +//! +//! This saga handles the following state transitions: +//! +//! ```text +//! Requested <-- +//! | +//! | | +//! v | +//! | +//! Assigning -- +//! +//! | +//! v +//! +//! Live +//! ``` + +use super::{ + ACTION_GENERATE_ID, ActionRegistry, NexusActionContext, NexusSaga, + SagaInitError, + common_storage::{ + call_pantry_attach_for_volume, call_pantry_detach, get_pantry_address, + }, +}; +use crate::app::sagas::declare_saga_actions; +use crate::app::{authn, db}; +use anyhow::anyhow; +use nexus_db_lookup::LookupPath; +use nexus_db_model::UserDataExportResource; +use nexus_db_queries::db::identity::Resource; +use nexus_types::saga::saga_action_failed; +use omicron_common::api::external::Error; +use omicron_common::progenitor_operation_retry::ProgenitorOperationRetryError; +use omicron_uuid_kinds::GenericUuid; +use omicron_uuid_kinds::UserDataExportUuid; +use omicron_uuid_kinds::VolumeUuid; +use serde::Deserialize; +use serde::Serialize; +use slog::info; +use slog_error_chain::InlineErrorChain; +use std::net::SocketAddrV6; +use steno::ActionError; +use steno::Node; +use uuid::Uuid; + +// user data export create saga: input parameters + +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct Params { + pub serialized_authn: authn::saga::Serialized, + pub id: UserDataExportUuid, +} + +// user data export create saga: actions +declare_saga_actions! { + user_data_export_create; + GET_TARGET_GENERATION_NUMBER -> "target_generation" { + + sudec_get_target_generation_number + } + SET_SAGA_ID -> "unused_1" { + + sudec_set_saga_id + - sudec_set_saga_id_undo + } + CREATE_EXPORT_VOLUME -> "export_volume" { + + sudec_create_export_volume + - sudec_create_export_volume_undo + } + GET_PANTRY_ADDRESS -> "pantry_address" { + + sudec_get_pantry_address + } + CALL_PANTRY_ATTACH_FOR_EXPORT -> "call_pantry_attach_for_export" { + + sudec_call_pantry_attach_for_export + - sudec_call_pantry_attach_for_export_undo + } + UPDATE_USER_DATA_EXPORT_RECORD -> "unused_2" { + + sudec_update_user_data_export_record + } +} + +// user data export create saga: definition + +#[derive(Debug)] +pub(crate) struct SagaUserDataExportCreate; +impl NexusSaga for SagaUserDataExportCreate { + const NAME: &'static str = "user-data-export-create"; + type Params = Params; + + fn register_actions(registry: &mut ActionRegistry) { + user_data_export_create_register_actions(registry); + } + + fn make_saga_dag( + _params: &Self::Params, + mut builder: steno::DagBuilder, + ) -> Result { + // Generate IDs + builder.append(Node::action( + "saga_id", + "GenerateSagaId", + ACTION_GENERATE_ID.as_ref(), + )); + + builder.append(Node::action( + "volume_id", + "GenerateVolumeId", + ACTION_GENERATE_ID.as_ref(), + )); + + builder.append(get_target_generation_number_action()); + builder.append(set_saga_id_action()); + builder.append(create_export_volume_action()); + builder.append(get_pantry_address_action()); + builder.append(call_pantry_attach_for_export_action()); + builder.append(update_user_data_export_record_action()); + + Ok(builder.build()?) + } +} + +// user data export saga: action implementations + +async fn sudec_get_target_generation_number( + sagactx: NexusActionContext, +) -> Result { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + // Query first for the record's generation number, then bump it and use that + // as the target generation in the rest of the saga. + + let Some(record) = osagactx + .datastore() + .user_data_export_lookup_by_id(&opctx, params.id) + .await + .map_err(saga_action_failed)? + else { + return Err(saga_action_failed(Error::internal_error(String::from( + "user data export record hard-deleted!", + )))); + }; + + Ok(record.generation() + 1) +} + +async fn sudec_set_saga_id( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + let target_generation = sagactx.lookup::("target_generation")?; + + // Change the request record here to an intermediate "assigning" state to + // block out other sagas that will be triggered for the same request. + + osagactx + .datastore() + .set_user_data_export_requested_to_assigning( + &opctx, + params.id, + saga_id, + target_generation, + ) + .await + .map_err(saga_action_failed)?; + + Ok(()) +} + +async fn sudec_set_saga_id_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + let target_generation = sagactx.lookup::("target_generation")?; + + osagactx + .datastore() + .unset_user_data_export_requested_to_assigning( + &opctx, + params.id, + saga_id, + target_generation, + ) + .await?; + + Ok(()) +} + +async fn sudec_create_export_volume( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let volume_id = sagactx.lookup::("volume_id")?; + + let Some(record) = osagactx + .datastore() + .user_data_export_lookup_by_id(&opctx, params.id) + .await + .map_err(saga_action_failed)? + else { + return Err(saga_action_failed(Error::internal_error(String::from( + "user data export record hard-deleted!", + )))); + }; + + let source_volume_id = match record.resource() { + UserDataExportResource::Snapshot { id } => { + debug!(log, "grabbing snapshot {id}"); + + let (.., db_snapshot) = + LookupPath::new(&opctx, osagactx.datastore()) + .snapshot_id(id) + .fetch() + .await + .map_err(saga_action_failed)?; + + debug!( + log, + "copying snapshot {} volume {} to {volume_id}", + db_snapshot.id(), + db_snapshot.volume_id(), + ); + + db_snapshot.volume_id() + } + + UserDataExportResource::Image { id } => { + debug!(log, "grabbing image {id}"); + + let (.., db_image) = LookupPath::new(&opctx, osagactx.datastore()) + .image_id(id) + .fetch() + .await + .map_err(saga_action_failed)?; + + debug!( + log, + "copying image {} volume {} to {volume_id}", + db_image.id(), + db_image.volume_id(), + ); + + db_image.volume_id() + } + }; + + osagactx + .datastore() + .volume_checkout_randomize_ids( + db::datastore::SourceVolume(source_volume_id), + db::datastore::DestVolume(volume_id), + db::datastore::VolumeCheckoutReason::ReadOnlyCopy, + ) + .await + .map_err(saga_action_failed)?; + + Ok(()) +} + +async fn sudec_create_export_volume_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + let volume_id = sagactx.lookup::("volume_id")?; + + // `volume_checkout_randomize_ids` calls `volume_create`, which will + // increase the resource count for read only resources in a volume, which + // there are guaranteed to be for these volumes. Decreasing crucible + // resources is necessary as an undo step. Do not call `volume_hard_delete` + // here: soft deleting volumes is necessary for + // `find_deleted_volume_regions` to work. + + info!(log, "calling soft delete for volume {}", volume_id); + + osagactx.datastore().soft_delete_volume(volume_id).await?; + + Ok(()) +} + +async fn sudec_get_pantry_address( + sagactx: NexusActionContext, +) -> Result { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + + let pantry_address = get_pantry_address(osagactx.nexus()).await?; + + info!(log, "using pantry at {}", pantry_address); + + Ok(pantry_address) +} + +async fn sudec_call_pantry_attach_for_export( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); + let osagactx = sagactx.user_data(); + + let volume_id = sagactx.lookup::("volume_id")?; + let pantry_address = sagactx.lookup::("pantry_address")?; + + let volume = match osagactx.datastore().volume_get(volume_id).await { + Ok(Some(volume)) => volume, + + Ok(None) => { + return Err(saga_action_failed(Error::internal_error(&format!( + "volume {volume_id} gone!" + )))); + } + + Err(e) => { + return Err(saga_action_failed(Error::internal_error(&format!( + "failed to get volume {volume_id}: {e}" + )))); + } + }; + + let volume_construction_request = serde_json::from_str(&volume.data()) + .map_err(|e| { + saga_action_failed(Error::internal_error(&format!( + "failed to deserialize volume {volume_id} data: {e}" + ))) + })?; + + info!(log, "sending attach for volume {volume_id} to {pantry_address}"); + + call_pantry_attach_for_volume( + &log, + &osagactx.nexus(), + volume_id.into_untyped_uuid(), + volume_construction_request, + pantry_address, + ) + .await +} + +async fn sudec_call_pantry_attach_for_export_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let log = sagactx.user_data().log(); + + let volume_id = sagactx.lookup::("volume_id")?; + let pantry_address = sagactx.lookup::("pantry_address")?; + + info!(log, "undo: detaching {volume_id} from pantry at {pantry_address}"); + + match call_pantry_detach( + sagactx.user_data().nexus(), + &log, + volume_id.into_untyped_uuid(), + pantry_address, + ) + .await + { + // We can treat the pantry being permanently gone as success. + Ok(()) | Err(ProgenitorOperationRetryError::Gone) => Ok(()), + + Err(err) => Err(anyhow!( + "failed to detach {volume_id} from pantry at {pantry_address}: {}", + InlineErrorChain::new(&err) + )), + } +} + +async fn sudec_update_user_data_export_record( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + let target_generation = sagactx.lookup::("target_generation")?; + let pantry_address = sagactx.lookup::("pantry_address")?; + let volume_id = sagactx.lookup::("volume_id")?; + + // Set the record's state to Live and clear the operating saga id. There is + // no undo step for this, it should succeed idempotently. + + osagactx + .datastore() + .set_user_data_export_assigning_to_live( + &opctx, + params.id, + saga_id, + target_generation, + pantry_address, + volume_id, + ) + .await + .map_err(saga_action_failed)?; + + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::app::saga::create_saga_dag; + use nexus_db_model::UserDataExportRecord; + use nexus_db_model::UserDataExportState; + use nexus_db_queries::context::OpContext; + use nexus_test_utils::resource_helpers::create_default_ip_pools; + use nexus_test_utils::resource_helpers::create_disk; + use nexus_test_utils::resource_helpers::create_project; + use nexus_test_utils::resource_helpers::create_snapshot; + use nexus_test_utils_macros::nexus_test; + use uuid::Uuid; + + type DiskTest<'a> = + nexus_test_utils::resource_helpers::DiskTest<'a, crate::Server>; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + const PROJECT_NAME: &str = "bobs-barrel-of-bits"; + const DISK_NAME: &str = "bobs-disk"; + const SNAPSHOT_NAME: &str = "bobs-snapshot"; + + async fn create_all_the_stuff( + cptestctx: &ControlPlaneTestContext, + ) -> (Uuid, UserDataExportRecord) { + let client = &cptestctx.external_client; + + create_default_ip_pools(&client).await; + create_project(client, PROJECT_NAME).await; + create_disk(client, PROJECT_NAME, DISK_NAME).await; + + let snapshot_id = + create_snapshot(client, PROJECT_NAME, DISK_NAME, SNAPSHOT_NAME) + .await + .identity + .id; + + // Manually create the record so it remains in state Requested + + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + let opctx = test_opctx(cptestctx); + + let export_object = datastore + .user_data_export_create_for_snapshot( + &opctx, + UserDataExportUuid::new_v4(), + snapshot_id, + ) + .await + .unwrap(); + + (snapshot_id, export_object) + } + + pub fn test_opctx(cptestctx: &ControlPlaneTestContext) -> OpContext { + OpContext::for_tests( + cptestctx.logctx.log.new(o!()), + cptestctx.server.server_context().nexus.datastore().clone(), + ) + } + + #[nexus_test(server = crate::Server)] + async fn test_saga_basic_usage_succeeds( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + + let nexus = &cptestctx.server.server_context().nexus; + let (snapshot_id, export_object) = + create_all_the_stuff(cptestctx).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(cptestctx); + + let params = Params { + serialized_authn: authn::saga::Serialized::for_opctx(&opctx), + id: export_object.id(), + }; + + // Actually run the saga + nexus + .sagas + .saga_execute::(params) + .await + .unwrap(); + + // Make sure the record was transitioned ok + + let record = nexus + .datastore() + .user_data_export_lookup_for_snapshot(&opctx, snapshot_id) + .await + .unwrap() + .unwrap(); + + assert_eq!(record.state(), UserDataExportState::Live); + assert!(record.pantry_address().is_some()); + assert!(record.volume_id().is_some()); + } + + #[nexus_test(server = crate::Server)] + async fn test_actions_succeed_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + + let nexus = &cptestctx.server.server_context().nexus; + let (snapshot_id, export_object) = + create_all_the_stuff(cptestctx).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(cptestctx); + + let params = Params { + serialized_authn: authn::saga::Serialized::for_opctx(&opctx), + id: export_object.id(), + }; + + let dag = create_saga_dag::(params).unwrap(); + + crate::app::sagas::test_helpers::actions_succeed_idempotently( + nexus, dag, + ) + .await; + + // Make sure the record was transitioned ok + + let record = nexus + .datastore() + .user_data_export_lookup_for_snapshot(&opctx, snapshot_id) + .await + .unwrap() + .unwrap(); + + assert_eq!(record.state(), UserDataExportState::Live); + assert!(record.pantry_address().is_some()); + assert!(record.volume_id().is_some()); + } + + async fn verify_clean_slate( + cptestctx: &ControlPlaneTestContext, + snapshot_id: Uuid, + ) { + let opctx = test_opctx(cptestctx); + let nexus = &cptestctx.server.server_context().nexus; + + crate::app::sagas::test_helpers::assert_no_failed_undo_steps( + &cptestctx.logctx.log, + nexus.datastore(), + ) + .await; + + // Validate the record is in the beginning state + + let record = nexus + .datastore() + .user_data_export_lookup_for_snapshot(&opctx, snapshot_id) + .await + .unwrap() + .unwrap(); + + assert_eq!(record.state(), UserDataExportState::Requested); + assert_eq!(record.pantry_address(), None); + assert_eq!(record.volume_id(), None); + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + let log = &cptestctx.logctx.log; + + let nexus = &cptestctx.server.server_context().nexus; + + let (snapshot_id, export_object) = + create_all_the_stuff(cptestctx).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(&cptestctx); + + crate::app::sagas::test_helpers::action_failure_can_unwind::< + SagaUserDataExportCreate, + _, + _, + >( + nexus, + || { + Box::pin(async { + Params { + serialized_authn: authn::saga::Serialized::for_opctx( + &opctx, + ), + id: export_object.id(), + } + }) + }, + || { + Box::pin(async { + verify_clean_slate(cptestctx, snapshot_id).await; + }) + }, + log, + ) + .await; + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + let log = &cptestctx.logctx.log; + + let nexus = &cptestctx.server.server_context().nexus; + + let (snapshot_id, export_object) = + create_all_the_stuff(cptestctx).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(&cptestctx); + + crate::app::sagas::test_helpers::action_failure_can_unwind_idempotently::< + SagaUserDataExportCreate, + _, + _, + >( + nexus, + || { + Box::pin(async { + Params { + serialized_authn: authn::saga::Serialized::for_opctx(&opctx), + id: export_object.id(), + } + }) + }, + || { + Box::pin(async { + verify_clean_slate(cptestctx, snapshot_id).await; + }) + }, + log, + ) + .await; + } +} diff --git a/nexus/src/app/sagas/user_data_export_delete.rs b/nexus/src/app/sagas/user_data_export_delete.rs new file mode 100644 index 00000000000..02b77477986 --- /dev/null +++ b/nexus/src/app/sagas/user_data_export_delete.rs @@ -0,0 +1,647 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! When a user deletes a resource that is backed by read-only data, a +//! background task will call this saga to undo the process of attaching a copy +//! of that resource's volume to a Pantry for use with the related export API. +//! +//! This saga will, for the argument user data export object: +//! +//! 1. Detach the user data export's volume from the appropriate Pantry +//! 2. Delete that volume (using the volume delete sub-saga) +//! 3. Delete the user data export record. +//! +//! This saga handles the following state transitions: +//! +//! ```text +//! Live <-- +//! | +//! | | +//! v | +//! | +//! Deleting -- +//! +//! | +//! v +//! +//! Deleted +//! ``` + +use super::ActionRegistry; +use super::NexusActionContext; +use super::NexusSaga; +use super::common_storage::call_pantry_detach_ok_if_gone; +use crate::app::sagas::ACTION_GENERATE_ID; +use crate::app::sagas::SagaInitError; +use crate::app::sagas::declare_saga_actions; +use crate::app::sagas::volume_delete; +use nexus_db_queries::authn; +use nexus_types::saga::saga_action_failed; +use omicron_common::api::external::Error; +use omicron_common::progenitor_operation_retry::ProgenitorOperationRetryError; +use omicron_uuid_kinds::GenericUuid; +use omicron_uuid_kinds::UserDataExportUuid; +use omicron_uuid_kinds::VolumeUuid; +use serde::Deserialize; +use serde::Serialize; +use slog_error_chain::InlineErrorChain; +use steno::ActionError; +use steno::Node; +use uuid::Uuid; + +// user data export delete saga: input parameters + +#[derive(Debug, Deserialize, Serialize)] +pub struct Params { + pub serialized_authn: authn::saga::Serialized, + pub user_data_export_id: UserDataExportUuid, + pub volume_id: VolumeUuid, +} + +// user data export delete saga: actions + +declare_saga_actions! { + user_data_export_delete; + GET_TARGET_GENERATION_NUMBER -> "target_generation" { + + suded_get_target_generation_number + } + SET_SAGA_ID -> "unused_1" { + + suded_set_saga_id + - suded_set_saga_id_undo + } + CALL_PANTRY_DETACH_FOR_EXPORT -> "call_pantry_detach_for_export" { + + suded_call_pantry_detach_for_export + } + UPDATE_USER_DATA_EXPORT_RECORD -> "unused_2" { + + suded_update_user_data_export_record + } +} + +// user data export delete saga: definition + +#[derive(Debug)] +pub(crate) struct SagaUserDataExportDelete; +impl NexusSaga for SagaUserDataExportDelete { + const NAME: &'static str = "user-data-export-delete"; + type Params = Params; + + fn register_actions(registry: &mut ActionRegistry) { + user_data_export_delete_register_actions(registry); + } + + fn make_saga_dag( + params: &Self::Params, + mut builder: steno::DagBuilder, + ) -> Result { + builder.append(Node::action( + "saga_id", + "GenerateSagaId", + ACTION_GENERATE_ID.as_ref(), + )); + + builder.append(get_target_generation_number_action()); + builder.append(set_saga_id_action()); + + builder.append(call_pantry_detach_for_export_action()); + + let subsaga_params = volume_delete::Params { + serialized_authn: params.serialized_authn.clone(), + volume_id: params.volume_id, + }; + + let subsaga_dag = { + let subsaga_builder = steno::DagBuilder::new(steno::SagaName::new( + volume_delete::SagaVolumeDelete::NAME, + )); + volume_delete::SagaVolumeDelete::make_saga_dag( + &subsaga_params, + subsaga_builder, + )? + }; + + builder.append(Node::constant( + "params_for_volume_delete_subsaga", + serde_json::to_value(&subsaga_params).map_err(|e| { + SagaInitError::SerializeError( + "params_for_volume_delete_subsaga".to_string(), + e, + ) + })?, + )); + + builder.append(Node::subsaga( + "volume_delete_subsaga_no_result", + subsaga_dag, + "params_for_volume_delete_subsaga", + )); + + // Set the user data export state to Deleted last. There's no way to + // re-trigger the delete otherwise. + builder.append(update_user_data_export_record_action()); + + Ok(builder.build()?) + } +} + +// user data export delete saga: action implementations + +async fn suded_get_target_generation_number( + sagactx: NexusActionContext, +) -> Result { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + // Query first for the record's generation number, then bump it and use that + // as the target generation in the rest of the saga. + + let Some(record) = osagactx + .datastore() + .user_data_export_lookup_by_id(&opctx, params.user_data_export_id) + .await + .map_err(saga_action_failed)? + else { + return Err(saga_action_failed(Error::internal_error(String::from( + "user data export record hard-deleted!", + )))); + }; + + Ok(record.generation() + 1) +} + +async fn suded_set_saga_id( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + let target_generation = sagactx.lookup::("target_generation")?; + + // Change the request record here to an intermediate "deleting" state to + // block out other sagas that will be triggered for the same request. + + osagactx + .datastore() + .set_user_data_export_live_to_deleting( + &opctx, + params.user_data_export_id, + saga_id, + target_generation, + ) + .await + .map_err(saga_action_failed)?; + + Ok(()) +} + +async fn suded_set_saga_id_undo( + sagactx: NexusActionContext, +) -> Result<(), anyhow::Error> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + let target_generation = sagactx.lookup::("target_generation")?; + + osagactx + .datastore() + .unset_user_data_export_live_to_deleting( + &opctx, + params.user_data_export_id, + saga_id, + target_generation, + ) + .await?; + + Ok(()) +} + +async fn suded_call_pantry_detach_for_export( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let log = sagactx.user_data().log(); + let params = sagactx.saga_params::()?; + let osagactx = sagactx.user_data(); + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let maybe_record = osagactx + .datastore() + .user_data_export_lookup_by_id(&opctx, params.user_data_export_id) + .await + .map_err(saga_action_failed)?; + + let Some(record) = maybe_record else { + info!( + log, + "user data export {} hard deleted!", params.user_data_export_id, + ); + return Err(saga_action_failed(Error::internal_error(String::from( + "user data export hard deleted!", + )))); + }; + + let Some(pantry_address) = record.pantry_address() else { + info!( + log, + "user data export {} has no pantry address!", + params.user_data_export_id, + ); + return Err(saga_action_failed(Error::internal_error(String::from( + "user data export has no pantry address!", + )))); + }; + + let Some(volume_id) = record.volume_id() else { + info!( + log, + "user data export {} has no volume id!", params.user_data_export_id, + ); + return Err(saga_action_failed(Error::internal_error(String::from( + "user data export has no volume id!", + )))); + }; + + // It's possible the Pantry bounced, and lost the attachment of this volume, + // and this record was deleted so that another attachment could be created + // (possibly somewhere else!). Call the detach function that's ok with this. + + info!(log, "detaching {volume_id} from pantry at {pantry_address}"); + + match call_pantry_detach_ok_if_gone( + sagactx.user_data().nexus(), + &log, + volume_id.into_untyped_uuid(), + pantry_address, + ) + .await + { + // We can treat the pantry being permanently gone as success. + Ok(()) | Err(ProgenitorOperationRetryError::Gone) => Ok(()), + + Err(err) => Err(saga_action_failed(Error::internal_error(format!( + "failed to detach {volume_id} from pantry at {pantry_address}: {}", + InlineErrorChain::new(&err) + )))), + } +} + +async fn suded_update_user_data_export_record( + sagactx: NexusActionContext, +) -> Result<(), ActionError> { + let osagactx = sagactx.user_data(); + let params = sagactx.saga_params::()?; + + let opctx = crate::context::op_context_for_saga_action( + &sagactx, + ¶ms.serialized_authn, + ); + + let saga_id = sagactx.lookup::("saga_id")?; + let target_generation = sagactx.lookup::("target_generation")?; + + osagactx + .datastore() + .set_user_data_export_deleting_to_deleted( + &opctx, + params.user_data_export_id, + saga_id, + target_generation, + ) + .await + .map_err(saga_action_failed)?; + + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::app::authz; + use crate::app::saga::create_saga_dag; + use nexus_db_lookup::LookupPath; + use uuid::Uuid; + + use nexus_db_model::SnapshotState; + use nexus_db_model::UserDataExportRecord; + use nexus_db_model::UserDataExportState; + + use nexus_db_queries::context::OpContext; + + use nexus_test_utils::background::run_user_data_export_coordinator; + use nexus_test_utils::resource_helpers::create_default_ip_pools; + use nexus_test_utils::resource_helpers::create_disk; + use nexus_test_utils::resource_helpers::create_project; + use nexus_test_utils::resource_helpers::create_snapshot; + + use nexus_test_utils_macros::nexus_test; + + use omicron_common::api::external::Error; + + use omicron_test_utils::dev::poll; + + use std::time::Duration; + + type DiskTest<'a> = + nexus_test_utils::resource_helpers::DiskTest<'a, crate::Server>; + + type ControlPlaneTestContext = + nexus_test_utils::ControlPlaneTestContext; + + const PROJECT_NAME: &str = "bobs-barrel-of-bits"; + const DISK_NAME: &str = "bobs-disk"; + const SNAPSHOT_NAME: &str = "bobs-snapshot"; + + async fn create_all_the_stuff( + cptestctx: &ControlPlaneTestContext, + ) -> (Uuid, UserDataExportRecord) { + let client = &cptestctx.external_client; + let nexus = &cptestctx.server.server_context().nexus; + let datastore = nexus.datastore(); + + create_default_ip_pools(&client).await; + create_project(client, PROJECT_NAME).await; + create_disk(client, PROJECT_NAME, DISK_NAME).await; + + let snapshot_id = + create_snapshot(client, PROJECT_NAME, DISK_NAME, SNAPSHOT_NAME) + .await + .identity + .id; + + let opctx = test_opctx(cptestctx); + + // Run the background task to create the record + run_user_data_export_coordinator(&cptestctx.lockstep_client).await; + + let (.., authz_snapshot, db_snapshot) = + LookupPath::new(&opctx, datastore) + .snapshot_id(snapshot_id) + .fetch_for(authz::Action::Read) + .await + .unwrap(); + + let export_object = poll::wait_for_condition( + || { + let opctx = test_opctx(cptestctx); + let datastore = datastore.clone(); + + async move { + let maybe_record = datastore + .user_data_export_lookup_for_snapshot( + &opctx, + snapshot_id, + ) + .await + .unwrap(); + + match maybe_record { + Some(record) => { + // Only return when the record is in state Live + if record.state() == UserDataExportState::Live { + Ok(record) + } else { + Err(poll::CondCheckError::::NotYet) + } + } + + None => Err(poll::CondCheckError::::NotYet), + } + } + }, + &Duration::from_millis(10), + &Duration::from_secs(20), + ) + .await + .unwrap(); + + // Delete the snapshot, then run the changeset function to mark the + // export object as deleted. + + datastore + .project_delete_snapshot( + &opctx, + &authz_snapshot, + &db_snapshot, + vec![SnapshotState::Creating, SnapshotState::Ready], + ) + .await + .unwrap(); + + let _changeset = + datastore.compute_user_data_export_changeset(&opctx).await.unwrap(); + + (snapshot_id, export_object) + } + + pub fn test_opctx(cptestctx: &ControlPlaneTestContext) -> OpContext { + OpContext::for_tests( + cptestctx.logctx.log.new(o!()), + cptestctx.server.server_context().nexus.datastore().clone(), + ) + } + + #[nexus_test(server = crate::Server)] + async fn test_saga_basic_usage_succeeds( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + + let nexus = &cptestctx.server.server_context().nexus; + let (_, export_object) = create_all_the_stuff(cptestctx).await; + + // Create the delete saga dag + let opctx = test_opctx(cptestctx); + + let params = Params { + serialized_authn: authn::saga::Serialized::for_opctx(&opctx), + user_data_export_id: export_object.id(), + volume_id: export_object.volume_id().unwrap(), + }; + + nexus + .sagas + .saga_execute::(params) + .await + .unwrap(); + + // Make sure the record was deleted ok + + let export_object = nexus + .datastore() + .user_data_export_lookup_by_id(&opctx, export_object.id()) + .await + .unwrap() + .unwrap(); + + assert_eq!(export_object.state(), UserDataExportState::Deleted); + } + + #[nexus_test(server = crate::Server)] + async fn test_actions_succeed_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + + let nexus = &cptestctx.server.server_context().nexus; + let (_, export_object) = create_all_the_stuff(cptestctx).await; + + // Create the delete saga dag + let opctx = test_opctx(cptestctx); + + let params = Params { + serialized_authn: authn::saga::Serialized::for_opctx(&opctx), + user_data_export_id: export_object.id(), + volume_id: export_object.volume_id().unwrap(), + }; + + let dag = create_saga_dag::(params).unwrap(); + + crate::app::sagas::test_helpers::actions_succeed_idempotently( + nexus, dag, + ) + .await; + + // Make sure the record was deleted ok + + let export_object = nexus + .datastore() + .user_data_export_lookup_by_id(&opctx, export_object.id()) + .await + .unwrap() + .unwrap(); + + assert_eq!(export_object.state(), UserDataExportState::Deleted); + } + + async fn verify_clean_slate( + cptestctx: &ControlPlaneTestContext, + user_data_export_id: UserDataExportUuid, + ) { + let opctx = test_opctx(cptestctx); + let nexus = &cptestctx.server.server_context().nexus; + + crate::app::sagas::test_helpers::assert_no_failed_undo_steps( + &cptestctx.logctx.log, + nexus.datastore(), + ) + .await; + + // Validate the record is in the beginning state + + let record = nexus + .datastore() + .user_data_export_lookup_by_id(&opctx, user_data_export_id) + .await + .unwrap() + .unwrap(); + + assert_eq!(record.state(), UserDataExportState::Live); + assert!(record.pantry_address().is_some()); + assert!(record.volume_id().is_some()); + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + let log = &cptestctx.logctx.log; + + let nexus = &cptestctx.server.server_context().nexus; + + let (_, export_object) = create_all_the_stuff(cptestctx).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(&cptestctx); + + let export_id = export_object.id(); + let volume_id = export_object.volume_id().unwrap(); + + crate::app::sagas::test_helpers::action_failure_can_unwind::< + SagaUserDataExportDelete, + _, + _, + >( + nexus, + || { + Box::pin(async { + Params { + serialized_authn: authn::saga::Serialized::for_opctx( + &opctx, + ), + user_data_export_id: export_id, + volume_id, + } + }) + }, + || { + Box::pin(async { + verify_clean_slate(cptestctx, export_id).await; + }) + }, + log, + ) + .await; + } + + #[nexus_test(server = crate::Server)] + async fn test_action_failure_can_unwind_idempotently( + cptestctx: &ControlPlaneTestContext, + ) { + DiskTest::new(cptestctx).await; + let log = &cptestctx.logctx.log; + + let nexus = &cptestctx.server.server_context().nexus; + + let (_, export_object) = create_all_the_stuff(cptestctx).await; + + // Build the saga DAG with the provided test parameters + let opctx = test_opctx(&cptestctx); + + let export_id = export_object.id(); + let volume_id = export_object.volume_id().unwrap(); + + crate::app::sagas::test_helpers::action_failure_can_unwind_idempotently::< + SagaUserDataExportDelete, + _, + _, + >( + nexus, + || { + Box::pin(async { + Params { + serialized_authn: authn::saga::Serialized::for_opctx(&opctx), + user_data_export_id: export_id, + volume_id, + } + }) + }, + || { + Box::pin(async { + verify_clean_slate(cptestctx, export_id).await; + }) + }, + log, + ) + .await; + } +} diff --git a/nexus/test-utils/src/background.rs b/nexus/test-utils/src/background.rs index 74514af810a..75c0411defb 100644 --- a/nexus/test-utils/src/background.rs +++ b/nexus/test-utils/src/background.rs @@ -544,3 +544,30 @@ pub async fn run_blueprint_rendezvous(lockstep_client: &ClientTestContext) { ) .unwrap(); } + +/// Run the user_data_export_coordinator background task +pub async fn run_user_data_export_coordinator( + internal_client: &ClientTestContext, +) { + let last_background_task = activate_background_task( + &internal_client, + "user_data_export_coordinator", + ) + .await; + + let LastResult::Completed(last_result_completed) = + last_background_task.last + else { + panic!( + "unexpected {:?} returned from user_data_export_coordinator task", + last_background_task.last, + ); + }; + + let status = serde_json::from_value::( + last_result_completed.details, + ) + .unwrap(); + + assert!(status.errors.is_empty()); +} diff --git a/nexus/tests/config.test.toml b/nexus/tests/config.test.toml index a32a9b86081..0c2aa8ba200 100644 --- a/nexus/tests/config.test.toml +++ b/nexus/tests/config.test.toml @@ -224,6 +224,7 @@ audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 audit_log_cleanup.period_secs = 600 audit_log_cleanup.retention_days = 90 audit_log_cleanup.max_deleted_per_activation = 10000 +user_data_export_coordinator.period_secs = 240 [multicast] # Enable multicast functionality for tests (disabled by default in production) diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index cbf09135bde..7870e326205 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -1166,6 +1166,17 @@ pub struct ServiceFirewallRuleStatus { pub sled_push_errors: Option>, } +/// The status of a `user_data_export_coordinator` background task +/// activation +#[derive(Serialize, Deserialize, Default, Debug, PartialEq, Eq)] +pub struct UserDataExportCoordinatorStatus { + pub create_invoked_ok: Vec, + pub delete_invoked_ok: Vec, + pub records_marked_for_deletion: usize, + pub records_bypassed_ok: Vec, + pub errors: Vec, +} + #[cfg(test)] mod test { use super::TufRepoInfo; diff --git a/smf/nexus/multi-sled/config-partial.toml b/smf/nexus/multi-sled/config-partial.toml index 7a43386e375..caa3ffc2617 100644 --- a/smf/nexus/multi-sled/config-partial.toml +++ b/smf/nexus/multi-sled/config-partial.toml @@ -127,6 +127,7 @@ audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 audit_log_cleanup.period_secs = 600 audit_log_cleanup.retention_days = 90 audit_log_cleanup.max_deleted_per_activation = 10000 +user_data_export_coordinator.period_secs = 30 [default_region_allocation_strategy] # by default, allocate across 3 distinct sleds diff --git a/smf/nexus/single-sled/config-partial.toml b/smf/nexus/single-sled/config-partial.toml index 11863e1c681..f11f573b71d 100644 --- a/smf/nexus/single-sled/config-partial.toml +++ b/smf/nexus/single-sled/config-partial.toml @@ -127,6 +127,7 @@ audit_log_timeout_incomplete.max_timed_out_per_activation = 1000 audit_log_cleanup.period_secs = 600 audit_log_cleanup.retention_days = 90 audit_log_cleanup.max_deleted_per_activation = 10000 +user_data_export_coordinator.period_secs = 30 [default_region_allocation_strategy] # by default, allocate without requirement for distinct sleds. From f768f5595f6c51757e8996da3d17a8d47dd54d2d Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Wed, 29 Apr 2026 21:30:43 +0000 Subject: [PATCH 2/3] should be info, not error --- nexus/src/app/background/tasks/user_data_export_coordinator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nexus/src/app/background/tasks/user_data_export_coordinator.rs b/nexus/src/app/background/tasks/user_data_export_coordinator.rs index aadff096993..b29f7a3c5f3 100644 --- a/nexus/src/app/background/tasks/user_data_export_coordinator.rs +++ b/nexus/src/app/background/tasks/user_data_export_coordinator.rs @@ -378,7 +378,7 @@ impl BackgroundTask for UserDataExportCoordinator { } }; - error!(&log, "changeset is {changeset:?}"); + info!(&log, "changeset is {changeset:?}"); self.process_user_data_export_changeset( &opctx, From 9ac0abcacc024c5046c8b2422c3e5369ad0775b4 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Fri, 1 May 2026 12:59:12 +0000 Subject: [PATCH 3/3] only set deleted for undeleted records --- nexus/db-queries/src/db/datastore/user_data_export.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/nexus/db-queries/src/db/datastore/user_data_export.rs b/nexus/db-queries/src/db/datastore/user_data_export.rs index 46406b25d40..e4f53b0d6a7 100644 --- a/nexus/db-queries/src/db/datastore/user_data_export.rs +++ b/nexus/db-queries/src/db/datastore/user_data_export.rs @@ -472,6 +472,7 @@ impl DataStore { .filter(diesel::dsl::not( dsl::pantry_ip.eq_any(in_service_pantries), )) + .filter(dsl::resource_deleted.eq(false)) .set(dsl::resource_deleted.eq(true)) .execute_async(&*conn) .await