From a14affff83a0c84e24689e6539a9a51a9b54a647 Mon Sep 17 00:00:00 2001 From: Dan Rosen Date: Fri, 24 Apr 2026 19:00:14 -0400 Subject: [PATCH 1/2] [fm] resource deletion: tombstones, version guard, tracker-based GC --- dev-tools/omdb/src/bin/omdb/db/alert.rs | 6 + dev-tools/omdb/src/bin/omdb/nexus.rs | 52 +- dev-tools/omdb/tests/successes.out | 4 + nexus/db-model/src/alert.rs | 7 + nexus/db-model/src/schema_versions.rs | 3 +- nexus/db-model/src/support_bundle.rs | 2 + nexus/db-queries/src/db/datastore/alert.rs | 292 +++++- nexus/db-queries/src/db/datastore/alert_rx.rs | 1 + nexus/db-queries/src/db/datastore/fm.rs | 473 ++++++++- .../src/db/datastore/support_bundle.rs | 441 ++++++++- .../src/db/datastore/webhook_delivery.rs | 2 +- nexus/db-queries/src/db/mod.rs | 1 + .../db-queries/src/db/sitrep_version_guard.rs | 315 ++++++ .../webhook_rx_list_resendable_events.sql | 3 +- nexus/db-schema/src/schema.rs | 28 + nexus/src/app/alert.rs | 6 +- .../app/background/tasks/alert_dispatcher.rs | 2 +- .../src/app/background/tasks/fm_rendezvous.rs | 929 +++++++++++++++++- .../integration_tests/support_bundles.rs | 1 + nexus/types/src/internal_api/background.rs | 18 + schema/crdb/dbinit.sql | 60 +- schema/crdb/fm-resource-deletion/up01.sql | 2 + schema/crdb/fm-resource-deletion/up02.sql | 2 + schema/crdb/fm-resource-deletion/up03.sql | 1 + schema/crdb/fm-resource-deletion/up04.sql | 3 + .../crdb/fm-resource-deletion/up04.verify.sql | 2 + schema/crdb/fm-resource-deletion/up05.sql | 6 + schema/crdb/fm-resource-deletion/up06.sql | 6 + 28 files changed, 2546 insertions(+), 122 deletions(-) create mode 100644 nexus/db-queries/src/db/sitrep_version_guard.rs create mode 100644 schema/crdb/fm-resource-deletion/up01.sql create mode 100644 schema/crdb/fm-resource-deletion/up02.sql create mode 100644 schema/crdb/fm-resource-deletion/up03.sql create mode 100644 schema/crdb/fm-resource-deletion/up04.sql create mode 100644 schema/crdb/fm-resource-deletion/up04.verify.sql create mode 100644 schema/crdb/fm-resource-deletion/up05.sql create mode 100644 schema/crdb/fm-resource-deletion/up06.sql diff --git a/dev-tools/omdb/src/bin/omdb/db/alert.rs b/dev-tools/omdb/src/bin/omdb/db/alert.rs index eeec094138b..a16e53d0de6 100644 --- a/dev-tools/omdb/src/bin/omdb/db/alert.rs +++ b/dev-tools/omdb/src/bin/omdb/db/alert.rs @@ -1031,10 +1031,12 @@ async fn cmd_db_alert_info( payload, num_dispatched, case_id, + time_deleted, } = alert; const CLASS: &str = "class"; const TIME_DISPATCHED: &str = "fully dispatched at"; + const TIME_DELETED: &str = "tombstoned at"; const NUM_DISPATCHED: &str = "deliveries dispatched"; const CASE_ID: &str = "requested by FM case"; @@ -1043,6 +1045,7 @@ async fn cmd_db_alert_info( TIME_CREATED, TIME_MODIFIED, TIME_DISPATCHED, + TIME_DELETED, NUM_DISPATCHED, CLASS, CASE_ID, @@ -1058,6 +1061,9 @@ async fn cmd_db_alert_info( if let Some(t) = time_dispatched { println!(" {TIME_DISPATCHED:>WIDTH$}: {t}") } + if let Some(t) = time_deleted { + println!(" {TIME_DELETED:>WIDTH$}: {t}") + } if let Some(case_id) = case_id { println!(" {CASE_ID:>WIDTH$}: {case_id:?}"); } diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 2bec4896d33..8390d6c60b2 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -3727,6 +3727,7 @@ fn print_task_fm_rendezvous(details: &serde_json::Value) { alerts, support_bundles, ereport_marking: marking, + latest_processed_sitrep_version, } = match serde_json::from_value::(details.clone()) { Err(error) => { eprintln!( @@ -3738,7 +3739,36 @@ fn print_task_fm_rendezvous(details: &serde_json::Value) { Ok(status) => status, }; match sitrep_id { - Some(id) => println!(" current sitrep: {id}"), + Some(id) => { + println!(" current sitrep: {id}"); + // The tracker line only appears when a sitrep was processed + // — without one, the activation never reaches the advance + // call, and there is nothing meaningful to print. + match latest_processed_sitrep_version { + Some(v) => { + println!(" latest fully-processed sitrep version: {v}",) + } + None => { + // Disambiguate: a `None` here means either the advance + // was skipped because at least one subtask recorded + // per-request errors, or the advance call itself failed. + // Both cases are also logged separately by Nexus. + let any_subtask_errors = !alerts.details.errors.is_empty() + || !support_bundles.details.errors.is_empty() + || !marking.details.errors.is_empty(); + if any_subtask_errors { + println!( + "(i) tracker advance skipped: subtasks recorded \ + per-request errors" + ); + } else { + println!( + "(i) tracker advance failed at this activation" + ); + } + } + } + } None => println!( "(i) no FM situation report loaded, so rendezvous was not \ performed", @@ -3751,20 +3781,25 @@ fn print_task_fm_rendezvous(details: &serde_json::Value) { total_alerts_requested, current_sitrep_alerts_requested, alerts_created, + stale_sitrep, errors, }| { - let already_created = - total_alerts_requested - alerts_created - errors.len(); + let already_created = total_alerts_requested + - alerts_created + - stale_sitrep + - errors.len(); const REQUESTED: &str = "alerts requested:"; const REQUESTED_THIS_SITREP: &str = " requested in this sitrep:"; const CREATED: &str = " created in this activation:"; const ALREADY_CREATED: &str = " already created:"; + const STALE_SITREP: &str = " skipped (stale sitrep):"; const ERRORS: &str = " errors:"; const WIDTH: usize = const_max_len(&[ REQUESTED, REQUESTED_THIS_SITREP, CREATED, ALREADY_CREATED, + STALE_SITREP, ERRORS, ]) + 1; pub const NUM_WIDTH: usize = 4; @@ -3779,6 +3814,7 @@ fn print_task_fm_rendezvous(details: &serde_json::Value) { println!( " {ALREADY_CREATED:NUM_WIDTH$}" ); + println!(" {STALE_SITREP:NUM_WIDTH$}"); println!( "{} {ERRORS:NUM_WIDTH$}", warn_if_nonzero(errors.len()), @@ -3796,20 +3832,25 @@ fn print_task_fm_rendezvous(details: &serde_json::Value) { total_bundles_requested, current_sitrep_bundles_requested, bundles_created, + stale_sitrep, errors, }| { - let already_created = - total_bundles_requested - bundles_created - errors.len(); + let already_created = total_bundles_requested + - bundles_created + - stale_sitrep + - errors.len(); const REQUESTED: &str = "support bundles requested:"; const REQUESTED_THIS_SITREP: &str = " requested in this sitrep:"; const CREATED: &str = " created in this activation:"; const ALREADY_CREATED: &str = " already created:"; + const STALE_SITREP: &str = " skipped (stale sitrep):"; const ERRORS: &str = " errors:"; const WIDTH: usize = const_max_len(&[ REQUESTED, REQUESTED_THIS_SITREP, CREATED, ALREADY_CREATED, + STALE_SITREP, ERRORS, ]) + 1; pub const NUM_WIDTH: usize = 4; @@ -3824,6 +3865,7 @@ fn print_task_fm_rendezvous(details: &serde_json::Value) { println!( " {ALREADY_CREATED:NUM_WIDTH$}" ); + println!(" {STALE_SITREP:NUM_WIDTH$}"); println!( "{} {ERRORS:NUM_WIDTH$}", warn_if_nonzero(errors.len()), diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index 8662889facf..317a5b6f715 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -725,6 +725,7 @@ task: "fm_rendezvous" requested in this sitrep: 0 created in this activation: 0 already created: 0 + skipped (stale sitrep): 0 errors: 0 creating requested support bundles: (i) note: this operation was not executed @@ -732,6 +733,7 @@ task: "fm_rendezvous" requested in this sitrep: 0 created in this activation: 0 already created: 0 + skipped (stale sitrep): 0 errors: 0 marking ereports as seen: (i) note: this operation was not executed @@ -1406,6 +1408,7 @@ task: "fm_rendezvous" requested in this sitrep: 0 created in this activation: 0 already created: 0 + skipped (stale sitrep): 0 errors: 0 creating requested support bundles: (i) note: this operation was not executed @@ -1413,6 +1416,7 @@ task: "fm_rendezvous" requested in this sitrep: 0 created in this activation: 0 already created: 0 + skipped (stale sitrep): 0 errors: 0 marking ereports as seen: (i) note: this operation was not executed diff --git a/nexus/db-model/src/alert.rs b/nexus/db-model/src/alert.rs index b3dd72e1338..0c5451cb9fc 100644 --- a/nexus/db-model/src/alert.rs +++ b/nexus/db-model/src/alert.rs @@ -48,6 +48,12 @@ pub struct Alert { /// The ID of the fault management case that created this alert, if any. pub case_id: Option>, + + /// Tombstone timestamp: set when an FM-created alert is soft-deleted. + /// + /// Only meaningful when `case_id` is `Some` (FM-created alerts). + /// Non-FM alerts continue to hard-delete and never have this set. + pub time_deleted: Option>, } impl Alert { @@ -69,6 +75,7 @@ impl Alert { payload: payload.into(), num_dispatched: 0, case_id: None, + time_deleted: None, } } diff --git a/nexus/db-model/src/schema_versions.rs b/nexus/db-model/src/schema_versions.rs index e09c123b732..fc0ea7151b5 100644 --- a/nexus/db-model/src/schema_versions.rs +++ b/nexus/db-model/src/schema_versions.rs @@ -16,7 +16,7 @@ use std::{collections::BTreeMap, sync::LazyLock}; /// /// This must be updated when you change the database schema. Refer to /// schema/crdb/README.adoc in the root of this repository for details. -pub const SCHEMA_VERSION: Version = Version::new(256, 0, 0); +pub const SCHEMA_VERSION: Version = Version::new(257, 0, 0); /// List of all past database schema versions, in *reverse* order /// @@ -28,6 +28,7 @@ pub static KNOWN_VERSIONS: LazyLock> = LazyLock::new(|| { // | leaving the first copy as an example for the next person. // v // KnownVersion::new(next_int, "unique-dirname-with-the-sql-files"), + KnownVersion::new(257, "fm-resource-deletion"), KnownVersion::new(256, "bgp-unnumbered-peer-cleanup"), KnownVersion::new(255, "blueprint-add-external-networking-generation"), KnownVersion::new( diff --git a/nexus/db-model/src/support_bundle.rs b/nexus/db-model/src/support_bundle.rs index d1f65b5c2e2..3840990b39e 100644 --- a/nexus/db-model/src/support_bundle.rs +++ b/nexus/db-model/src/support_bundle.rs @@ -109,6 +109,7 @@ pub struct SupportBundle { pub assigned_nexus: Option>, pub user_comment: Option, pub fm_case_id: Option>, + pub time_deleted: Option>, } impl SupportBundle { @@ -132,6 +133,7 @@ impl SupportBundle { assigned_nexus: Some(nexus_id.into()), user_comment, fm_case_id: fm_case_id.map(Into::into), + time_deleted: None, } } diff --git a/nexus/db-queries/src/db/datastore/alert.rs b/nexus/db-queries/src/db/datastore/alert.rs index 0843367803a..be7f3cb69c2 100644 --- a/nexus/db-queries/src/db/datastore/alert.rs +++ b/nexus/db-queries/src/db/datastore/alert.rs @@ -9,8 +9,6 @@ use crate::context::OpContext; use crate::db::model::Alert; use async_bb8_diesel::AsyncRunQueryDsl; use diesel::prelude::*; -use diesel::result::DatabaseErrorKind; -use diesel::result::Error as DieselError; use diesel::result::OptionalExtension; use nexus_db_errors::ErrorHandler; use nexus_db_errors::public_error_from_diesel; @@ -18,39 +16,113 @@ use nexus_db_schema::schema::alert::dsl as alert_dsl; use nexus_types::identity::Asset; use omicron_common::api::external::CreateResult; use omicron_common::api::external::Error; +use omicron_common::api::external::ResourceType; use omicron_common::api::external::UpdateResult; use omicron_uuid_kinds::{AlertUuid, GenericUuid}; impl DataStore { + /// Insert an alert row, returning the inserted alert on success. + /// + /// If a row with this alert's id already exists, this returns + /// [`Error::ObjectAlreadyExists`]. This isn't _really_ an error: in normal + /// operation, multiple Nexus instances do race to insert the same alert, + /// and -- in the case of FM-requested alerts -- as long as any sitrep + /// requesting the given alert is still current, every periodic execution of + /// FM rendezvous will continue to attempt to recreate the same alert. + /// Omicron convention is to signal this idempotent "I didn't do anything" + /// case to the caller as an error, and the caller can decide how to treat + /// it. + /// + /// When `fm_sitrep_version` is `Some`, the insert is wrapped in + /// [`SitrepVersionGuardedInsert`] so it only succeeds while the rendezvous + /// progress tracker has not yet moved past `fm_sitrep_version`. If the + /// tracker is ahead, returns [`Error::Conflict`] with a `"stale sitrep"` + /// message, and the caller should treat the request as skipped (not + /// retried). + /// + /// [`SitrepVersionGuardedInsert`]: + /// crate::db::sitrep_version_guard::SitrepVersionGuardedInsert pub async fn alert_create( &self, opctx: &OpContext, alert: Alert, + fm_sitrep_version: Option, ) -> CreateResult { - let conn = self.pool_connection_authorized(&opctx).await?; - let alert = diesel::insert_into(alert_dsl::alert) + let conn = self.pool_connection_authorized(opctx).await?; + let alert_id = alert.id(); + let insert = diesel::insert_into(alert_dsl::alert) .values(alert) - .returning(Alert::as_returning()) - .get_result_async(&*conn) + .on_conflict_do_nothing() + .returning(Alert::as_returning()); + + let inserted = match fm_sitrep_version { + None => insert.get_result_async(&*conn).await.optional().map_err( + |e| public_error_from_diesel(e, ErrorHandler::Server), + )?, + Some(v) => { + use crate::db::sitrep_version_guard::{ + GuardedInsertError, SitrepVersionGuardedInsert, + }; + SitrepVersionGuardedInsert::new(insert, v) + .insert_and_get_optional_result_async(&conn) + .await + .map_err(|err| match err { + GuardedInsertError::StaleSitrep => Error::conflict( + "cannot create alert for stale sitrep", + ), + GuardedInsertError::DatabaseError(e) => { + public_error_from_diesel(e, ErrorHandler::Server) + } + })? + } + }; + + inserted.ok_or_else(|| Error::ObjectAlreadyExists { + type_name: ResourceType::Alert, + object_name: alert_id.to_string(), + }) + } + + pub async fn alert_delete( + &self, + opctx: &OpContext, + id: AlertUuid, + ) -> Result<(), Error> { + let conn = self.pool_connection_authorized(opctx).await?; + let id = id.into_untyped_uuid(); + + // First, try to soft-delete by setting `time_deleted` for a row + // matching the given alert id with a non-null `case_id`, meaning it was + // created by fault management. FM rendezvous will eventually + // garbage-collect this when it's safe (see `actually_activate` and + // `fm_tombstone_sweep`). + let soft_rows = diesel::update(alert_dsl::alert) + .filter(alert_dsl::id.eq(id)) + .filter(alert_dsl::case_id.is_not_null()) + .filter(alert_dsl::time_deleted.is_null()) + .set(alert_dsl::time_deleted.eq(diesel::dsl::now)) + .execute_async(&*conn) .await - .map_err(|e| match e { - DieselError::DatabaseError( - DatabaseErrorKind::UniqueViolation, - _, - ) => Error::conflict("alert already exists"), - e => public_error_from_diesel(e, ErrorHandler::Server), - })?; - - slog::debug!( - &opctx.log, - "published alert"; - "alert_id" => ?alert.id(), - "alert_class" => %alert.class, - "alert_case_id" => ?alert.case_id, - "time_created" => ?alert.identity.time_created, - ); + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; + + // If no rows were deleted above, we try hard-deleting instead, taking + // care not to hard-delete any previously soft-deleted rows (or, more + // subtly, any FM-created rows that were inserted after the above + // statement). Note, we can't combine both attempts into a single CTE: + // CRDB rejects that by default + // (`sql.multiple_modifications_of_table.enabled = false`). + if soft_rows == 0 { + diesel::delete(alert_dsl::alert) + .filter(alert_dsl::id.eq(id)) + .filter(alert_dsl::case_id.is_null()) + .execute_async(&*conn) + .await + .map_err(|e| { + public_error_from_diesel(e, ErrorHandler::Server) + })?; + } - Ok(alert) + Ok(()) } pub async fn alert_select_next_for_dispatch( @@ -105,3 +177,177 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::pub_test_utils::TestDatabase; + use async_bb8_diesel::AsyncRunQueryDsl; + use chrono::Utc; + use diesel::ExpressionMethods; + use diesel::QueryDsl; + use diesel::SelectableHelper; + use diesel::result::OptionalExtension; + use nexus_db_model::Alert; + use nexus_db_model::DbTypedUuid; + use nexus_db_schema::schema::alert::dsl; + use nexus_types::alert::AlertClass; + use nexus_types::identity::Asset; + use omicron_test_utils::dev; + use omicron_uuid_kinds::AlertUuid; + use omicron_uuid_kinds::CaseKind; + use omicron_uuid_kinds::CaseUuid; + use omicron_uuid_kinds::GenericUuid; + use serde_json::json; + + /// Fetch an alert row by id, ignoring `time_deleted`. + /// + /// Used by tests that need to inspect tombstoned rows that + /// `alert_create`/normal selects would filter out. + async fn test_alert_raw( + datastore: &DataStore, + id: AlertUuid, + ) -> Option { + let conn = datastore.pool_connection_for_tests().await.unwrap(); + dsl::alert + .filter(dsl::id.eq(id.into_untyped_uuid())) + .select(Alert::as_select()) + .first_async(&*conn) + .await + .optional() + .unwrap() + } + + #[tokio::test] + async fn test_alert_time_deleted_roundtrip() { + let logctx = dev::test_setup_log("test_alert_time_deleted_roundtrip"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let datastore = db.datastore(); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + let alert = + Alert::new(AlertUuid::new_v4(), AlertClass::TestFoo, json!({})); + let alert_id = alert.id().into_untyped_uuid(); + diesel::insert_into(dsl::alert) + .values(alert) + .execute_async(&*conn) + .await + .unwrap(); + + diesel::update(dsl::alert) + .filter(dsl::id.eq(alert_id)) + .set(dsl::time_deleted.eq(Utc::now())) + .execute_async(&*conn) + .await + .unwrap(); + + let fetched: Alert = dsl::alert + .filter(dsl::id.eq(alert_id)) + .select(Alert::as_select()) + .first_async(&*conn) + .await + .unwrap(); + assert!( + fetched.time_deleted.is_some(), + "time_deleted should round-trip as Some" + ); + + db.terminate().await; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_alert_create_non_fm_path() { + let logctx = dev::test_setup_log("test_alert_create_non_fm_path"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let alert = nexus_db_model::Alert::new( + AlertUuid::new_v4(), + AlertClass::TestFoo, + json!({}), + ); + let _ = + datastore.alert_create(opctx, alert.clone(), None).await.unwrap(); + + let err = datastore + .alert_create(opctx, alert, None) + .await + .expect_err("second insert should fail with ObjectAlreadyExists"); + assert!( + matches!(err, Error::ObjectAlreadyExists { .. }), + "expected ObjectAlreadyExists, got {err:?}", + ); + + db.terminate().await; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_alert_create_stale_sitrep() { + let logctx = dev::test_setup_log("test_alert_create_stale_sitrep"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + datastore.fm_rendezvous_progress_advance(opctx, 10).await.unwrap(); + + let alert = + Alert::new(AlertUuid::new_v4(), AlertClass::TestFoo, json!({})); + let err = datastore + .alert_create(opctx, alert, Some(5)) + .await + .expect_err("stale sitrep should yield Err(Conflict)"); + match err { + Error::Conflict { ref message } + if message.external_message().contains("stale sitrep") => {} + other => { + panic!("expected Conflict with stale sitrep, got {other:?}") + } + } + + db.terminate().await; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_alert_delete_branches_on_case_id() { + let logctx = + dev::test_setup_log("test_alert_delete_branches_on_case_id"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + // Non-FM alert -> hard delete. + let alert = + Alert::new(AlertUuid::new_v4(), AlertClass::TestFoo, json!({})); + datastore.alert_create(opctx, alert.clone(), None).await.unwrap(); + datastore.alert_delete(opctx, alert.id()).await.unwrap(); + assert!(test_alert_raw(datastore, alert.id()).await.is_none()); + + // FM alert -> soft delete. + let case_id: DbTypedUuid = CaseUuid::new_v4().into(); + let mut fm_alert = + Alert::new(AlertUuid::new_v4(), AlertClass::TestFoo, json!({})); + fm_alert.case_id = Some(case_id); + datastore.alert_create(opctx, fm_alert.clone(), None).await.unwrap(); + datastore.alert_delete(opctx, fm_alert.id()).await.unwrap(); + let row = test_alert_raw(datastore, fm_alert.id()).await.unwrap(); + let first_tombstone = row + .time_deleted + .expect("FM alert should be tombstoned after first delete"); + + // Re-deleting an already-soft-deleted FM alert must be a no-op: the row + // stays, and `time_deleted` stays at its original value. + datastore.alert_delete(opctx, fm_alert.id()).await.unwrap(); + let row = test_alert_raw(datastore, fm_alert.id()) + .await + .expect("soft-deleted FM alert must not be hard-deleted"); + assert_eq!( + row.time_deleted, + Some(first_tombstone), + "second delete must not change time_deleted", + ); + + db.terminate().await; + logctx.cleanup_successful(); + } +} diff --git a/nexus/db-queries/src/db/datastore/alert_rx.rs b/nexus/db-queries/src/db/datastore/alert_rx.rs index 4137f63cb91..4c0a7ed9d6f 100644 --- a/nexus/db-queries/src/db/datastore/alert_rx.rs +++ b/nexus/db-queries/src/db/datastore/alert_rx.rs @@ -1211,6 +1211,7 @@ mod test { .alert_create( opctx, Alert::new(id, alert_class, serde_json::json!({})), + None, ) .await .expect("cant create ye event"); diff --git a/nexus/db-queries/src/db/datastore/fm.rs b/nexus/db-queries/src/db/datastore/fm.rs index 1552d463dfb..cbb5d3c4fa9 100644 --- a/nexus/db-queries/src/db/datastore/fm.rs +++ b/nexus/db-queries/src/db/datastore/fm.rs @@ -34,6 +34,7 @@ use nexus_db_schema::schema::ereport::dsl as ereport_dsl; use nexus_db_schema::schema::fm_alert_request::dsl as alert_req_dsl; use nexus_db_schema::schema::fm_case::dsl as case_dsl; use nexus_db_schema::schema::fm_ereport_in_case::dsl as case_ereport_dsl; +use nexus_db_schema::schema::fm_rendezvous_progress::dsl as rendezvous_progress_dsl; use nexus_db_schema::schema::fm_sitrep::dsl as sitrep_dsl; use nexus_db_schema::schema::fm_sitrep_history::dsl as history_dsl; use nexus_db_schema::schema::fm_support_bundle_request::dsl as support_bundle_req_dsl; @@ -123,6 +124,88 @@ sitrep_child_tables! { Case => { table: "fm_case" }, } +/// Emits the body of an FM tombstone-sweep method. Invoked by +/// [`DataStore::fm_alert_tombstone_sweep`] and +/// [`DataStore::fm_support_bundle_tombstone_sweep`]. We use a macro here rather +/// than a generic helper because the `where`-clause bounds would be awful. +/// +/// A row can be hard-deleted iff: +/// * `time_deleted IS NOT NULL` (it has been tombstoned), AND +/// * the `case_id_column` is `NOT NULL` (it is FM-originated), AND +/// * No row in `fm_case` (joined to `fm_sitrep_history` on `sitrep_id`) +/// references this case at a sitrep `version >= tracker`. +/// +/// Because `fm_case`'s primary key is `(sitrep_id, id)` -- that is, a single +/// case appears as multiple rows, one per sitrep it is carried forward into -- +/// the inner join correctly fans out across all containing sitreps, and the +/// `NOT EXISTS` filter checks whether the case still has a row at any version +/// at or above the tracker. +/// +// TODO(mergeconflict): I'm sad that this has to be a transaction. The `DELETE` +// here is a single, atomic DML statement, so in principle it should be safe +// without one. Trouble is, it looks for all tombstoned rows on the target +// table, which -- no matter what tricks I tried with indexes and shuffling +// around different parts of the query -- counts as a full scan from CRDB's +// perspective. This requires us to use `ALLOW_FULL_TABLE_SCAN_SQL`, which in +// turn requires a transaction. +// +// Every Nexus instance runs this sweep on each rendezvous activation, so, +// depending on how CRDB decides to lock things, concurrent sweeps may +// demonstrate some contention. Suggestions welcome. +macro_rules! fm_tombstone_sweep_body { + ( + $self:expr, $opctx:expr, + table: $table:ident, + case_id_column: $case_id:ident, + txn_name: $txn_name:literal $(,)? + ) => {{ + use crate::db::queries::ALLOW_FULL_TABLE_SCAN_SQL; + use async_bb8_diesel::AsyncSimpleConnection; + use diesel::dsl::{exists, not}; + use nexus_db_schema::schema::{ + fm_case, fm_rendezvous_progress, fm_sitrep_history, $table, + }; + + $opctx.authorize(authz::Action::Modify, &authz::FLEET).await?; + let conn = $self.pool_connection_authorized($opctx).await?; + + $self + .transaction_retry_wrapper($txn_name) + .transaction(&conn, |conn| async move { + conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await?; + + let tracker = fm_rendezvous_progress::table + .filter(fm_rendezvous_progress::singleton.eq(true)) + .select( + fm_rendezvous_progress::latest_processed_sitrep_version, + ) + .single_value(); + + diesel::delete($table::table) + .filter($table::time_deleted.is_not_null()) + .filter($table::$case_id.is_not_null()) + .filter(not(exists( + fm_case::table + .inner_join( + fm_sitrep_history::table + .on(fm_sitrep_history::sitrep_id + .eq(fm_case::sitrep_id)), + ) + .filter(fm_case::id.nullable().eq($table::$case_id)) + .filter( + fm_sitrep_history::version + .nullable() + .ge(tracker), + ), + ))) + .execute_async(&conn) + .await + }) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + }}; +} + /// Per-child-table statistics from a single GC pass. #[derive(Clone, Copy, Debug, Default)] pub struct ChildTableGcStats { @@ -140,6 +223,79 @@ pub struct GcOrphansResult { } impl DataStore { + /// Advances the `fm_rendezvous_progress` singleton tracker to at least + /// `version`, via `GREATEST(latest_processed_sitrep_version, version)`, + /// and returns the post-update tracker value. + /// + /// This is idempotent and commutative: a lower or equal `version` is a + /// no-op, so concurrent advances from multiple Nexuses compose without + /// ordering hazards. + /// + /// A cute byproduct of this design is that passing a version of `0` allows + /// callers to read the current tracker value without modifying it. This is + /// used in some tests. + pub async fn fm_rendezvous_progress_advance( + &self, + opctx: &OpContext, + version: i64, + ) -> Result { + use diesel::sql_types::BigInt; + + opctx.authorize(authz::Action::Modify, &authz::FLEET).await?; + let conn = self.pool_connection_authorized(opctx).await?; + diesel::update(rendezvous_progress_dsl::fm_rendezvous_progress) + .filter(rendezvous_progress_dsl::singleton.eq(true)) + .set( + rendezvous_progress_dsl::latest_processed_sitrep_version.eq( + diesel::dsl::sql::( + "GREATEST(latest_processed_sitrep_version, ", + ) + .bind::(version) + .sql(")"), + ), + ) + .returning(rendezvous_progress_dsl::latest_processed_sitrep_version) + .get_result_async::(&*conn) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Sweeps tombstoned FM-originated alerts whose case is no longer carried + /// forward in any sitrep at or above the rendezvous tracker. See also + /// [`Self::fm_support_bundle_tombstone_sweep`] below, and + /// `fm_tombstone_sweep_body!` for the shared deletion criteria. + /// + /// Returns the number of alert rows deleted. + pub async fn fm_alert_tombstone_sweep( + &self, + opctx: &OpContext, + ) -> Result { + fm_tombstone_sweep_body!( + self, opctx, + table: alert, + case_id_column: case_id, + txn_name: "fm_alert_tombstone_sweep", + ) + } + + /// Sweeps tombstoned FM-originated support bundles whose case is no longer + /// carried forward in any sitrep at or above the rendezvous tracker. See + /// also [`Self::fm_alert_tombstone_sweep`] above, and + /// `fm_tombstone_sweep_body!` for the shared deletion criteria. + /// + /// Returns the number of support_bundle rows deleted. + pub async fn fm_support_bundle_tombstone_sweep( + &self, + opctx: &OpContext, + ) -> Result { + fm_tombstone_sweep_body!( + self, opctx, + table: support_bundle, + case_id_column: fm_case_id, + txn_name: "fm_support_bundle_tombstone_sweep", + ) + } + /// Reads the current [sitrep version](fm::SitrepVersion) from CRDB. /// /// If no sitreps have been generated, this returns `None`. @@ -1569,8 +1725,10 @@ mod tests { use nexus_types::fm::ereport::{EreportData, Reporter}; use omicron_test_utils::dev; use omicron_uuid_kinds::CollectionUuid; + use omicron_uuid_kinds::DatasetUuid; use omicron_uuid_kinds::OmicronZoneUuid; use omicron_uuid_kinds::SupportBundleUuid; + use omicron_uuid_kinds::ZpoolUuid; use std::collections::BTreeMap; use std::collections::BTreeSet; use std::sync::Arc; @@ -3316,9 +3474,14 @@ mod tests { let conn = datastore.pool_connection_for_tests().await.unwrap(); // fm_* tables that are NOT children of fm_sitrep and are - // intentionally ignored. + // intentionally ignored. `fm_rendezvous_progress` is a singleton + // tracker (one row, never GC'd as a sitrep child); it lives + // outside the sitrep parent/child fanout and is managed by the + // FM rendezvous instead. let tables_ignored: BTreeSet<&str> = - ["fm_sitrep", "fm_sitrep_history"].into_iter().collect(); + ["fm_sitrep", "fm_sitrep_history", "fm_rendezvous_progress"] + .into_iter() + .collect(); let tables_checked = self.tables_checked(); let mut query = QueryBuilder::new(); @@ -3712,4 +3875,310 @@ mod tests { db.terminate().await; logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_fm_rendezvous_progress_advance_monotonic() { + let logctx = dev::test_setup_log( + "test_fm_rendezvous_progress_advance_monotonic", + ); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + // `advance(0)` against a bootstrap-seeded tracker doubles as a + // bootstrap-is-zero check: GREATEST(0, 0) = 0. + let advance = |v| datastore.fm_rendezvous_progress_advance(opctx, v); + assert_eq!(advance(0).await.unwrap(), 0); + assert_eq!(advance(5).await.unwrap(), 5); + assert_eq!(advance(3).await.unwrap(), 5); + assert_eq!(advance(5).await.unwrap(), 5); + assert_eq!(advance(9).await.unwrap(), 9); + + db.terminate().await; + logctx.cleanup_successful(); + } + + /// Insert an empty sitrep (no cases) optionally parented on the current + /// sitrep, returning its id and version. + async fn test_create_sitrep( + datastore: &DataStore, + opctx: &OpContext, + parent: Option, + ) -> (SitrepUuid, i64) { + let sitrep_id = SitrepUuid::new_v4(); + let sitrep = fm::Sitrep { + metadata: fm::SitrepMetadata { + id: sitrep_id, + inv_collection_id: CollectionUuid::new_v4(), + creator_id: OmicronZoneUuid::new_v4(), + comment: "test_create_sitrep".to_string(), + time_created: Utc::now(), + parent_sitrep_id: parent, + next_inv_min_time_started: Utc::now(), + }, + cases: Default::default(), + ereports_by_id: Default::default(), + }; + datastore + .fm_sitrep_insert(opctx, sitrep) + .await + .expect("inserting test sitrep"); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + let version: i64 = history_dsl::fm_sitrep_history + .filter(history_dsl::sitrep_id.eq(sitrep_id.into_untyped_uuid())) + .select(history_dsl::version) + .first_async::(&*conn) + .await + .expect("reading fresh sitrep version"); + (sitrep_id, version) + } + + /// Directly insert an `fm_case` row into the given sitrep, returning the + /// case id. + async fn test_create_case( + datastore: &DataStore, + _opctx: &OpContext, + sitrep_id: SitrepUuid, + ) -> CaseUuid { + let case_id = CaseUuid::new_v4(); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::insert_into(case_dsl::fm_case) + .values(model::fm::CaseMetadata { + id: case_id.into(), + sitrep_id: sitrep_id.into(), + de: nexus_types::fm::DiagnosisEngineKind::PowerShelf.into(), + created_sitrep_id: sitrep_id.into(), + closed_sitrep_id: None, + comment: "test_create_case".to_string(), + }) + .execute_async(&*conn) + .await + .expect("inserting test case"); + case_id + } + + /// Add an `fm_case` row carrying an existing case forward into the given + /// sitrep. The helper reuses `sitrep_id` for the `created_sitrep_id` + /// field — fine here because the sweep query does not consult + /// `created_sitrep_id`. DO NOT use this helper for tests exercising + /// case-creation lifecycles, where preserving the original + /// `created_sitrep_id` would matter. + async fn test_carry_case_forward( + datastore: &DataStore, + _opctx: &OpContext, + case_id: CaseUuid, + sitrep_id: SitrepUuid, + ) { + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::insert_into(case_dsl::fm_case) + .values(model::fm::CaseMetadata { + id: case_id.into(), + sitrep_id: sitrep_id.into(), + de: nexus_types::fm::DiagnosisEngineKind::PowerShelf.into(), + created_sitrep_id: sitrep_id.into(), + closed_sitrep_id: None, + comment: "test_carry_case_forward".to_string(), + }) + .execute_async(&*conn) + .await + .expect("inserting carried-forward case row"); + } + + /// Insert an FM-originated `alert` row (with `case_id` set, not yet + /// tombstoned) and return its id. + async fn test_insert_fm_alert( + datastore: &DataStore, + _opctx: &OpContext, + case_id: CaseUuid, + ) -> AlertUuid { + let alert_id = AlertUuid::new_v4(); + let mut alert = nexus_db_model::Alert::new( + alert_id, + AlertClass::TestFoo, + serde_json::json!({}), + ); + alert.case_id = Some(case_id.into()); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::insert_into(nexus_db_schema::schema::alert::table) + .values(alert) + .execute_async(&*conn) + .await + .expect("inserting test fm alert"); + alert_id + } + + /// Tombstone an alert (soft-delete via direct diesel update). + async fn test_tombstone_alert( + datastore: &DataStore, + _opctx: &OpContext, + alert_id: AlertUuid, + ) { + use nexus_db_schema::schema::alert::dsl; + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::update(dsl::alert) + .filter(dsl::id.eq(alert_id.into_untyped_uuid())) + .set(dsl::time_deleted.eq(Utc::now())) + .execute_async(&*conn) + .await + .expect("tombstoning test alert"); + } + + /// Returns whether an alert row exists (regardless of `time_deleted`). + async fn test_alert_exists( + datastore: &DataStore, + _opctx: &OpContext, + alert_id: AlertUuid, + ) -> bool { + use nexus_db_schema::schema::alert::dsl; + let conn = datastore.pool_connection_for_tests().await.unwrap(); + let count: i64 = dsl::alert + .filter(dsl::id.eq(alert_id.into_untyped_uuid())) + .count() + .get_result_async::(&*conn) + .await + .expect("counting alerts"); + count > 0 + } + + #[tokio::test] + async fn test_fm_alert_tombstone_sweep_respects_case_lifetime() { + let logctx = dev::test_setup_log( + "test_fm_alert_tombstone_sweep_respects_case_lifetime", + ); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let (sitrep_a, _va) = test_create_sitrep(datastore, opctx, None).await; + let case_id = test_create_case(datastore, opctx, sitrep_a).await; + let (sitrep_b, vb) = + test_create_sitrep(datastore, opctx, Some(sitrep_a)).await; + test_carry_case_forward(datastore, opctx, case_id, sitrep_b).await; + + let alert_id = test_insert_fm_alert(datastore, opctx, case_id).await; + test_tombstone_alert(datastore, opctx, alert_id).await; + + // tracker=0: NOT deleted. + let deleted = datastore.fm_alert_tombstone_sweep(opctx).await.unwrap(); + assert_eq!(deleted, 0); + assert!(test_alert_exists(datastore, opctx, alert_id).await); + + // tracker=vb: still NOT deleted (case still has a row at version vb). + datastore.fm_rendezvous_progress_advance(opctx, vb).await.unwrap(); + let deleted = datastore.fm_alert_tombstone_sweep(opctx).await.unwrap(); + assert_eq!(deleted, 0); + assert!(test_alert_exists(datastore, opctx, alert_id).await); + + // tracker=vb+1: DELETED (no case row has version >= vb+1). + datastore.fm_rendezvous_progress_advance(opctx, vb + 1).await.unwrap(); + let deleted = datastore.fm_alert_tombstone_sweep(opctx).await.unwrap(); + assert_eq!(deleted, 1); + assert!(!test_alert_exists(datastore, opctx, alert_id).await); + + db.terminate().await; + logctx.cleanup_successful(); + } + + /// Insert an FM-originated `support_bundle` row (with `fm_case_id` set, + /// not yet tombstoned) and return its id. Bypasses the datastore's + /// allocation logic — fabricates a zpool/dataset/nexus id directly, + /// which is fine for a sweep-query test since the sweep only inspects + /// `fm_case_id` and `time_deleted`. + async fn test_insert_fm_support_bundle( + datastore: &DataStore, + _opctx: &OpContext, + case_id: CaseUuid, + ) -> SupportBundleUuid { + let bundle_id = SupportBundleUuid::new_v4(); + let bundle = nexus_db_model::SupportBundle::new( + bundle_id, + "test fm support bundle".to_string(), + ZpoolUuid::new_v4(), + DatasetUuid::new_v4(), + OmicronZoneUuid::new_v4(), + None, + Some(case_id), + ); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::insert_into(nexus_db_schema::schema::support_bundle::table) + .values(bundle) + .execute_async(&*conn) + .await + .expect("inserting test fm support bundle"); + bundle_id + } + + /// Tombstone a support bundle (soft-delete via direct diesel update). + async fn test_tombstone_support_bundle( + datastore: &DataStore, + _opctx: &OpContext, + bundle_id: SupportBundleUuid, + ) { + use nexus_db_schema::schema::support_bundle::dsl; + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::update(dsl::support_bundle) + .filter(dsl::id.eq(bundle_id.into_untyped_uuid())) + .set(dsl::time_deleted.eq(Utc::now())) + .execute_async(&*conn) + .await + .expect("tombstoning test support bundle"); + } + + /// Returns whether a support_bundle row exists (regardless of + /// `time_deleted`). + async fn test_support_bundle_exists( + datastore: &DataStore, + _opctx: &OpContext, + bundle_id: SupportBundleUuid, + ) -> bool { + use nexus_db_schema::schema::support_bundle::dsl; + let conn = datastore.pool_connection_for_tests().await.unwrap(); + let count: i64 = dsl::support_bundle + .filter(dsl::id.eq(bundle_id.into_untyped_uuid())) + .count() + .get_result_async::(&*conn) + .await + .expect("counting support bundles"); + count > 0 + } + + #[tokio::test] + async fn test_fm_support_bundle_tombstone_sweep_respects_case_lifetime() { + let logctx = dev::test_setup_log( + "test_fm_support_bundle_tombstone_sweep_respects_case_lifetime", + ); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let (sitrep_a, _va) = test_create_sitrep(datastore, opctx, None).await; + let case_id = test_create_case(datastore, opctx, sitrep_a).await; + let (sitrep_b, vb) = + test_create_sitrep(datastore, opctx, Some(sitrep_a)).await; + test_carry_case_forward(datastore, opctx, case_id, sitrep_b).await; + + let bundle_id = + test_insert_fm_support_bundle(datastore, opctx, case_id).await; + test_tombstone_support_bundle(datastore, opctx, bundle_id).await; + + // tracker=0: NOT deleted. + let deleted = + datastore.fm_support_bundle_tombstone_sweep(opctx).await.unwrap(); + assert_eq!(deleted, 0); + assert!(test_support_bundle_exists(datastore, opctx, bundle_id).await); + + // tracker=vb: still NOT deleted (case still has a row at version vb). + datastore.fm_rendezvous_progress_advance(opctx, vb).await.unwrap(); + let deleted = + datastore.fm_support_bundle_tombstone_sweep(opctx).await.unwrap(); + assert_eq!(deleted, 0); + assert!(test_support_bundle_exists(datastore, opctx, bundle_id).await); + + // tracker=vb+1: DELETED (no case row has version >= vb+1). + datastore.fm_rendezvous_progress_advance(opctx, vb + 1).await.unwrap(); + let deleted = + datastore.fm_support_bundle_tombstone_sweep(opctx).await.unwrap(); + assert_eq!(deleted, 1); + assert!(!test_support_bundle_exists(datastore, opctx, bundle_id).await); + + db.terminate().await; + logctx.cleanup_successful(); + } } diff --git a/nexus/db-queries/src/db/datastore/support_bundle.rs b/nexus/db-queries/src/db/datastore/support_bundle.rs index 71b2ca5e727..460b3619875 100644 --- a/nexus/db-queries/src/db/datastore/support_bundle.rs +++ b/nexus/db-queries/src/db/datastore/support_bundle.rs @@ -72,7 +72,11 @@ pub enum SupportBundleProvenance { User, /// Requested by the fault management subsystem. Uses a specific bundle ID /// for idempotent creation, and records the FM case association. - Fm { id: SupportBundleUuid, case_id: omicron_uuid_kinds::CaseUuid }, + Fm { + id: SupportBundleUuid, + case_id: omicron_uuid_kinds::CaseUuid, + sitrep_version: i64, + }, } /// Parameters for creating a support bundle. @@ -105,12 +109,19 @@ impl DataStore { /// /// If the provenance is [`SupportBundleProvenance::Fm`] and a bundle /// with the given ID already exists, returns - /// [`Error::ObjectAlreadyExists`]. + /// [`Error::ObjectAlreadyExists`]. If the FM rendezvous progress tracker + /// has advanced past `sitrep_version`, returns [`Error::Conflict`] with + /// a "stale sitrep" message — the caller's sitrep is older than the + /// tracker, so the request should be skipped rather than retried. pub async fn support_bundle_create( &self, opctx: &OpContext, params: SupportBundleCreateParams<'_>, ) -> CreateResult { + use crate::db::sitrep_version_guard::{ + GuardedInsertError, SitrepVersionGuardedInsert, + }; + opctx.authorize(authz::Action::Modify, &authz::FLEET).await?; let conn = self.pool_connection_authorized(opctx).await?; @@ -118,6 +129,7 @@ impl DataStore { enum SupportBundleError { TooManyBundles, AlreadyExists, + StaleSitrep, } let SupportBundleCreateParams { @@ -128,12 +140,12 @@ impl DataStore { data_selection, } = params; - let (bundle_id, fm_case_id, idempotent) = match provenance { + let (bundle_id, fm_case_id, fm_sitrep_version) = match provenance { SupportBundleProvenance::User => { - (SupportBundleUuid::new_v4(), None, false) + (SupportBundleUuid::new_v4(), None, None) } - SupportBundleProvenance::Fm { id, case_id } => { - (id, Some(case_id), true) + SupportBundleProvenance::Fm { id, case_id, sitrep_version } => { + (id, Some(case_id), Some(sitrep_version)) } }; @@ -155,9 +167,16 @@ impl DataStore { // have a support bundle allocated to it. let free_dataset = dataset_dsl::rendezvous_debug_dataset .filter(dataset_dsl::time_tombstoned.is_null()) - .left_join(support_bundle_dsl::support_bundle.on( - dataset_dsl::id.eq(support_bundle_dsl::dataset_id), - )) + .left_join( + support_bundle_dsl::support_bundle.on( + dataset_dsl::id + .eq(support_bundle_dsl::dataset_id) + .and( + support_bundle_dsl::time_deleted + .is_null(), + ), + ), + ) .filter(support_bundle_dsl::dataset_id.is_null()) .select(RendezvousDebugDataset::as_select()) .first_async(&conn) @@ -189,38 +208,62 @@ impl DataStore { fm_case_id, ); - // For idempotent creation (FM provenance), check if the - // bundle already exists before inserting. - if idempotent - && diesel::select(diesel::dsl::exists( - support_bundle_dsl::support_bundle.filter( - support_bundle_dsl::id - .eq(bundle_id.into_untyped_uuid()), - ), - )) - .get_result_async::(&conn) - .await? - { + // The primary INSERT. For FM-provenance bundles we wrap + // it in `SitrepVersionGuardedInsert` so that the row is + // only written when the rendezvous tracker has not yet + // moved past the sitrep that requested this bundle. Either + // way `on_conflict_do_nothing()` lets us detect idempotent + // FM retries (the row already exists) without a separate + // pre-check round trip. + let inserted: Option = { + let insert = diesel::insert_into( + support_bundle_dsl::support_bundle, + ) + .values(bundle) + .on_conflict_do_nothing() + .returning(SupportBundle::as_returning()); + match fm_sitrep_version { + None => insert + .get_result_async(&conn) + .await + .optional()?, + Some(v) => { + let guarded = + SitrepVersionGuardedInsert::new(insert, v); + match guarded + .insert_and_get_optional_result_async( + &conn, + ) + .await + { + Ok(row) => row, + Err(GuardedInsertError::StaleSitrep) => { + return Err(err.bail( + SupportBundleError::StaleSitrep, + )); + } + Err(GuardedInsertError::DatabaseError( + e, + )) => return Err(e), + } + } + } + }; + + let Some(inserted) = inserted else { return Err( err.bail(SupportBundleError::AlreadyExists), ); - } - - diesel::insert_into( - support_bundle_dsl::support_bundle, - ) - .values(bundle.clone()) - .execute_async(&conn) - .await?; + }; Self::support_bundle_data_selection_insert_on_conn( &conn, - bundle.id.into(), + inserted.id.into(), data_selection, ) .await?; - Ok(bundle) + Ok(inserted) } }) .await @@ -240,6 +283,12 @@ impl DataStore { object_name: bundle_id.to_string(), }; } + SupportBundleError::StaleSitrep => { + return external::Error::conflict( + "cannot create FM-originated support bundle \ + for stale sitrep", + ); + } } } public_error_from_diesel(e, ErrorHandler::Server) @@ -393,9 +442,12 @@ impl DataStore { async move { use nexus_db_schema::schema::support_bundle::dsl; - // Find all bundles without backing storage. + // Find all live bundles without backing storage. + // Tombstoned bundles are skipped: the FM tombstone sweep + // handles their hard-deletion independently. let bundles_with_bad_datasets = dsl::support_bundle .filter(dsl::dataset_id.eq_any(invalid_datasets)) + .filter(dsl::time_deleted.is_null()) .select(SupportBundle::as_select()) .load_async(conn) .await?; @@ -771,15 +823,35 @@ impl DataStore { } } - Self::support_bundle_data_selection_delete_on_conn( - &conn, - vec![id], - ) - .await?; - diesel::delete(dsl::support_bundle) - .filter(dsl::id.eq(id)) - .execute_async(&conn) - .await?; + match bundle.fm_case_id { + None => { + // Non-FM bundle: hard delete, including the + // child data-selection rows. + Self::support_bundle_data_selection_delete_on_conn( + &conn, + vec![id], + ) + .await?; + diesel::delete(dsl::support_bundle) + .filter(dsl::id.eq(id)) + .execute_async(&conn) + .await?; + } + Some(_) => { + // FM bundle: tombstone the row by setting + // `time_deleted`. Child data-selection rows are + // intentionally left in place — they describe the + // frozen collection request and remain attached + // through the tombstone lifetime, cascading out + // when the eventual sweep hard-deletes the parent. + diesel::update(dsl::support_bundle) + .filter(dsl::id.eq(id)) + .filter(dsl::time_deleted.is_null()) + .set(dsl::time_deleted.eq(chrono::Utc::now())) + .execute_async(&conn) + .await?; + } + } Ok(()) } }) @@ -817,6 +889,7 @@ mod test { use omicron_common::api::internal::shared::DatasetKind::Debug as DebugDatasetKind; use omicron_test_utils::dev; use omicron_uuid_kinds::BlueprintUuid; + use omicron_uuid_kinds::CaseUuid; use omicron_uuid_kinds::DatasetUuid; use omicron_uuid_kinds::PhysicalDiskUuid; use omicron_uuid_kinds::SledUuid; @@ -2008,8 +2081,6 @@ mod test { #[tokio::test] async fn test_support_bundle_fm_idempotent_create() { - use omicron_uuid_kinds::CaseUuid; - let logctx = dev::test_setup_log("test_support_bundle_fm_idempotent_create"); let db = TestDatabase::new_with_datastore(&logctx.log).await; @@ -2031,6 +2102,7 @@ mod test { provenance: SupportBundleProvenance::Fm { id: bundle_id, case_id, + sitrep_version: 0, }, reason, nexus_id: this_nexus_id, @@ -2055,6 +2127,7 @@ mod test { provenance: SupportBundleProvenance::Fm { id: bundle_id, case_id, + sitrep_version: 0, }, reason, nexus_id: this_nexus_id, @@ -2071,4 +2144,286 @@ mod test { db.terminate().await; logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_support_bundle_create_stale_sitrep() { + let logctx = + dev::test_setup_log("test_support_bundle_create_stale_sitrep"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + test_create_rendezvous_dataset(datastore, opctx).await; + + datastore.fm_rendezvous_progress_advance(opctx, 10).await.unwrap(); + + let case_id = CaseUuid::new_v4(); + let err = datastore + .support_bundle_create( + opctx, + SupportBundleCreateParams { + provenance: SupportBundleProvenance::Fm { + id: SupportBundleUuid::new_v4(), + case_id, + sitrep_version: 5, + }, + reason: "testing", + nexus_id: OmicronZoneUuid::new_v4(), + user_comment: None, + data_selection: Default::default(), + }, + ) + .await + .unwrap_err(); + assert_matches!( + err, + Error::Conflict { ref message } + if message.external_message().contains("stale sitrep") + ); + + db.terminate().await; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_support_bundle_time_deleted_roundtrip() { + use nexus_db_schema::schema::support_bundle::dsl; + + let logctx = + dev::test_setup_log("test_support_bundle_time_deleted_roundtrip"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let datastore = db.datastore(); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + // Build a minimal SupportBundle row directly (bypassing the + // datastore's allocation logic — we're just exercising the column). + let bundle = SupportBundle::new( + SupportBundleUuid::new_v4(), + "test bundle".to_string(), + ZpoolUuid::new_v4(), + DatasetUuid::new_v4(), + OmicronZoneUuid::new_v4(), + None, + None, + ); + let bundle_id = bundle.id.into_untyped_uuid(); + diesel::insert_into(dsl::support_bundle) + .values(bundle) + .execute_async(&*conn) + .await + .unwrap(); + + diesel::update(dsl::support_bundle) + .filter(dsl::id.eq(bundle_id)) + .set(dsl::time_deleted.eq(chrono::Utc::now())) + .execute_async(&*conn) + .await + .unwrap(); + + let fetched: SupportBundle = dsl::support_bundle + .filter(dsl::id.eq(bundle_id)) + .select(SupportBundle::as_select()) + .first_async(&*conn) + .await + .unwrap(); + assert!( + fetched.time_deleted.is_some(), + "time_deleted should round-trip as Some" + ); + + db.terminate().await; + logctx.cleanup_successful(); + } + + /// Test helper: create a sled with a single zpool/debug dataset, returning + /// the dataset id. + async fn test_create_rendezvous_dataset( + datastore: &DataStore, + opctx: &OpContext, + ) -> DatasetUuid { + let test_sled = create_sled_and_zpools(datastore, opctx, 1).await; + test_sled.pools[0].dataset + } + + /// Test helper: allocate a support bundle. Asserts that the allocation + /// landed on `expected_dataset_id`. + async fn test_allocate_support_bundle( + datastore: &DataStore, + opctx: &OpContext, + expected_dataset_id: DatasetUuid, + ) -> SupportBundle { + let nexus_id = OmicronZoneUuid::new_v4(); + let bundle = datastore + .support_bundle_create( + opctx, + SupportBundleCreateParams { + provenance: SupportBundleProvenance::User, + reason: "tombstone reuse test", + nexus_id, + user_comment: None, + data_selection: BundleDataSelection::all(), + }, + ) + .await + .expect("should allocate support bundle"); + assert_eq!( + DatasetUuid::from(bundle.dataset_id), + expected_dataset_id, + "allocation should land on the only available dataset", + ); + bundle + } + + /// Test helper: directly tombstone a support bundle row by setting + /// `time_deleted`. The full delete flow lives in a later task. + /// Retains `_opctx` for parity with the `(datastore, opctx, ...)` shape + /// of helpers that touch the public datastore API. + async fn test_tombstone_support_bundle_row( + datastore: &DataStore, + _opctx: &OpContext, + bundle_id: SupportBundleUuid, + ) { + use nexus_db_schema::schema::support_bundle::dsl; + + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::update(dsl::support_bundle) + .filter(dsl::id.eq(bundle_id.into_untyped_uuid())) + .set(dsl::time_deleted.eq(chrono::Utc::now())) + .execute_async(&*conn) + .await + .expect("should tombstone support bundle row"); + } + + #[tokio::test] + async fn test_tombstoned_bundle_frees_dataset() { + let logctx = + dev::test_setup_log("test_tombstoned_bundle_frees_dataset"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + let dataset_id = test_create_rendezvous_dataset(datastore, opctx).await; + + // First allocation succeeds. + let bundle1 = + test_allocate_support_bundle(datastore, opctx, dataset_id).await; + // Mark the row as deleted directly via raw diesel; the delete flow is + // exercised in a later task. + test_tombstone_support_bundle_row(datastore, opctx, bundle1.id.into()) + .await; + + // Second allocation: must pick the same dataset. + let bundle2 = + test_allocate_support_bundle(datastore, opctx, dataset_id).await; + assert_eq!( + bundle2.dataset_id, bundle1.dataset_id, + "dataset should be reusable once tombstoned", + ); + + db.terminate().await; + logctx.cleanup_successful(); + } + + /// Test helper: directly transition a bundle into a state where + /// `support_bundle_delete` is permitted (`Destroying`). + async fn test_put_bundle_in_deletable_state( + datastore: &DataStore, + _opctx: &OpContext, + bundle_id: SupportBundleUuid, + ) { + use nexus_db_schema::schema::support_bundle::dsl; + + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::update(dsl::support_bundle) + .filter(dsl::id.eq(bundle_id.into_untyped_uuid())) + .set(dsl::state.eq(SupportBundleState::Destroying)) + .execute_async(&*conn) + .await + .expect("should transition bundle to Destroying state"); + } + + /// Test helper: fetch a support bundle row by id, ignoring `time_deleted`. + /// + /// Used by tests that need to inspect tombstoned rows that the normal + /// datastore read path filters out. + async fn test_support_bundle_raw( + datastore: &DataStore, + bundle_id: SupportBundleUuid, + ) -> Option { + use nexus_db_schema::schema::support_bundle::dsl; + + let conn = datastore.pool_connection_for_tests().await.unwrap(); + dsl::support_bundle + .filter(dsl::id.eq(bundle_id.into_untyped_uuid())) + .select(SupportBundle::as_select()) + .first_async(&*conn) + .await + .optional() + .unwrap() + } + + #[tokio::test] + async fn test_support_bundle_delete_branches_on_fm_case_id() { + let logctx = dev::test_setup_log( + "test_support_bundle_delete_branches_on_fm_case_id", + ); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + // The user bundle's allocation lands on the same dataset the FM + // bundle previously held — see test_tombstoned_bundle_frees_dataset + // for the standalone proof of that property. + test_create_rendezvous_dataset(datastore, opctx).await; + + // FM bundle -> soft delete. + let case_id = CaseUuid::new_v4(); + let fm_bundle = datastore + .support_bundle_create( + opctx, + SupportBundleCreateParams { + provenance: SupportBundleProvenance::Fm { + id: SupportBundleUuid::new_v4(), + case_id, + sitrep_version: 1, + }, + reason: "fm", + nexus_id: OmicronZoneUuid::new_v4(), + user_comment: None, + data_selection: Default::default(), + }, + ) + .await + .unwrap(); + let fm_id: SupportBundleUuid = fm_bundle.id.into(); + test_put_bundle_in_deletable_state(datastore, opctx, fm_id).await; + datastore + .support_bundle_delete(opctx, &authz_support_bundle_from_id(fm_id)) + .await + .unwrap(); + let row = test_support_bundle_raw(datastore, fm_id).await.unwrap(); + assert!(row.time_deleted.is_some(), "FM bundle should be tombstoned"); + + // User bundle -> hard delete. + let user_bundle = datastore + .support_bundle_create( + opctx, + SupportBundleCreateParams { + provenance: SupportBundleProvenance::User, + reason: "user", + nexus_id: OmicronZoneUuid::new_v4(), + user_comment: None, + data_selection: Default::default(), + }, + ) + .await + .unwrap(); + let user_id: SupportBundleUuid = user_bundle.id.into(); + test_put_bundle_in_deletable_state(datastore, opctx, user_id).await; + datastore + .support_bundle_delete( + opctx, + &authz_support_bundle_from_id(user_id), + ) + .await + .unwrap(); + assert!(test_support_bundle_raw(datastore, user_id).await.is_none()); + + db.terminate().await; + logctx.cleanup_successful(); + } } diff --git a/nexus/db-queries/src/db/datastore/webhook_delivery.rs b/nexus/db-queries/src/db/datastore/webhook_delivery.rs index 62f6d880c9d..89a992fc53e 100644 --- a/nexus/db-queries/src/db/datastore/webhook_delivery.rs +++ b/nexus/db-queries/src/db/datastore/webhook_delivery.rs @@ -494,7 +494,7 @@ mod test { }), ); datastore - .alert_create(&opctx, alert) + .alert_create(&opctx, alert, None) .await .expect("can't create ye alert"); diff --git a/nexus/db-queries/src/db/mod.rs b/nexus/db-queries/src/db/mod.rs index 411ed3d4cce..b5026478564 100644 --- a/nexus/db-queries/src/db/mod.rs +++ b/nexus/db-queries/src/db/mod.rs @@ -26,6 +26,7 @@ mod pool_connection; pub mod queries; mod raw_query_builder; mod sec_store; +mod sitrep_version_guard; pub(crate) mod true_or_cast_error; mod update_and_check; diff --git a/nexus/db-queries/src/db/sitrep_version_guard.rs b/nexus/db-queries/src/db/sitrep_version_guard.rs new file mode 100644 index 00000000000..7574189fdc1 --- /dev/null +++ b/nexus/db-queries/src/db/sitrep_version_guard.rs @@ -0,0 +1,315 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! A Diesel query wrapper that gates an `INSERT` on the FM rendezvous tracker. +//! +//! ## Problem +//! +//! FM rendezvous executes a sitrep by attempting to materialize the resources +//! (alerts and support bundles) that it requests. This process runs +//! periodically on every Nexus, so the same sitrep may be re-executed many +//! times, either by the same Nexus that first executed it, or by any other +//! Nexus running concurrently or lagging behind. +//! +//! Meanwhile, the same resources may be deleted out-of-band (e.g. by an +//! operator) at any time. This poses a problem: we don't want FM rendezvous +//! to resurrect deleted resources. We deal with this in part with tombstones +//! on the resources: a tombstoned resource can't be resurrected (we use +//! `ON CONFLICT DO NOTHING`). But that's not a complete solution: we need to +//! know when it's safe to hard-delete a tombstoned resource, such that a Nexus +//! (even one lagging arbitrarily far behind) can never resurrect it. +//! +//! ## Solution +//! +//! We achieve this using the `fm_rendezvous_progress` tracker, which records +//! the newest sitrep version any Nexus has fully processed. When FM rendezvous +//! attempts to insert a resource, it gates the insert on the caller's +//! (in-memory) sitrep version being at least as new as the tracker, using +//! [`SitrepVersionGuardedInsert`]. +//! +//! With this in place, a tombstoned resource is safe to hard-delete once its +//! case no longer appears in any sitrep at least as new as the tracker: any +//! Nexus that could otherwise re-create it would be working from an older +//! sitrep, which [`SitrepVersionGuardedInsert`] rejects. + +use async_bb8_diesel::AsyncRunQueryDsl; +use diesel::OptionalExtension; +use diesel::QueryResult; +use diesel::RunQueryDsl; +use diesel::pg::Pg; +use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId}; +use diesel::result::Error as DieselError; +use nexus_db_lookup::DbConnection; +use std::marker::PhantomData; + +/// Outcome of a guarded INSERT that did not return a row. +#[derive(Debug)] +pub enum GuardedInsertError { + /// The tracker has advanced past the caller's sitrep version. + StaleSitrep, + /// Any other database error. + DatabaseError(DieselError), +} + +/// Wraps a Diesel `INSERT ... RETURNING` to gate it on the FM sitrep tracker. +/// +/// Type parameters: +/// - `R` is the row type the inner insert returns (the `RETURNING` shape). +/// - `ISR` is the inner insert query itself (typically an +/// `InsertStatement<...>`). +#[must_use = "Queries must be executed"] +pub struct SitrepVersionGuardedInsert { + /// The caller's `INSERT ... RETURNING` query + insert_statement: ISR, + /// The caller's sitrep version, for comparison against the tracker. + sitrep_version: i64, + _phantom: PhantomData, +} + +impl SitrepVersionGuardedInsert { + pub fn new(insert_statement: ISR, sitrep_version: i64) -> Self { + Self { insert_statement, sitrep_version, _phantom: PhantomData } + } +} + +/// Propagate the prepared-statement cache key from the inner insert: the +/// wrapper's CTE frame is structurally fixed, so if `ISR` has a static query +/// id, so does the wrapper. +impl QueryId for SitrepVersionGuardedInsert +where + ISR: QueryId, +{ + /// Note that `R` is discarded here, following diesel's example doc: + /// `Bound` uses `Bound`). + type QueryId = SitrepVersionGuardedInsert<(), ISR::QueryId>; + const HAS_STATIC_QUERY_ID: bool = ISR::HAS_STATIC_QUERY_ID; +} + +/// The wrapper's SQL output shape is the inner insert's `RETURNING` shape: +/// the guard CTE only gates whether a row is produced, it never changes which +/// columns are projected. +impl Query for SitrepVersionGuardedInsert +where + ISR: Query, +{ + type SqlType = ISR::SqlType; +} + +/// Emit the guard CTE that wraps the caller's `INSERT ... RETURNING`. +impl QueryFragment for SitrepVersionGuardedInsert +where + ISR: QueryFragment, +{ + fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { + // The `guard` CTE reads the singleton tracker row and compares it to + // the caller's sitrep version. + // + // * If `tracker <= caller`, the `IF` returns `TRUE`. + // * Otherwise, evaluating `CAST(1/0 AS BOOL)` raises a non-retryable + // "division by zero" error, which we translate into + // [`GuardedInsertError::StaleSitrep`]. + // + // `MATERIALIZED` on the guard tells CRDB to evaluate the CTE once + // rather than inlining, or worse discarding it. We particularly don't + // want the optimizer deciding the guard's result is unused and skipping + // its evaluation. + out.push_sql( + "WITH guard AS MATERIALIZED (\ + SELECT IF(latest_processed_sitrep_version <= ", + ); + out.push_bind_param::( + &self.sitrep_version, + )?; + out.push_sql( + ", TRUE, CAST(1/0 AS BOOL)) AS proceed \ + FROM fm_rendezvous_progress WHERE singleton = true), ", + ); + + // The caller's `INSERT ... RETURNING`. Data-modifying CTEs are always + // evaluated exactly once, so this one doesn't need (and wouldn't + // benefit from) a `MATERIALIZED` hint. + out.push_sql("inserted_row AS ("); + self.insert_statement.walk_ast(out.reborrow())?; + out.push_sql(") SELECT inserted_row.* FROM inserted_row"); + Ok(()) + } +} + +/// Marker impl that opts the wrapper into diesel's async extension methods +/// (`get_result_async`, `load_async`, etc.) for [`DbConnection`]. +impl RunQueryDsl for SitrepVersionGuardedInsert {} + +impl SitrepVersionGuardedInsert +where + R: Send + 'static, + ISR: Send + 'static, + Self: Send, + Self: diesel::query_dsl::methods::LoadQuery<'static, DbConnection, R>, +{ + /// Run the guarded insert and return the (single) result row, if any. + /// + /// Outcomes: + /// - `Ok(Some(r))`: the guard passed and the underlying INSERT produced + /// a row, meaning there was no conflict. + /// - `Ok(None)`: the guard passed but the underlying INSERT produced no + /// row due to a conflict. + /// - `Err(StaleSitrep)`: the guard fired (the tracker has advanced past + /// the caller's sitrep version), so the insert was rejected. + /// - `Err(DatabaseError(_))`: any other database error. + pub async fn insert_and_get_optional_result_async( + self, + conn: &async_bb8_diesel::Connection, + ) -> Result, GuardedInsertError> { + self.get_result_async::(conn).await.optional().map_err(|e| { + if error_is_division_by_zero(&e) { + GuardedInsertError::StaleSitrep + } else { + GuardedInsertError::DatabaseError(e) + } + }) + } +} + +/// True iff `err` is the runtime "division by zero" the guard CTE raises +/// when the tracker has advanced past the caller's sitrep version. +/// +/// The match on the literal message is brittle by design — it's the receiver +/// end of the abort signal we deliberately emit in +/// [`SitrepVersionGuardedInsert::walk_ast`]. CRDB surfaces division-by-zero +/// as `DatabaseErrorKind::Unknown` rather than a more specific kind, which +/// is also why the `transaction_retry_wrapper` path doesn't retry it (only +/// `SerializationFailure` is retried). [`crate::db::collection_insert`] +/// uses the same shape for its `CollectionNotFound` signal. +fn error_is_division_by_zero(err: &DieselError) -> bool { + matches!( + err, + DieselError::DatabaseError( + diesel::result::DatabaseErrorKind::Unknown, + info, + ) if info.message() == "division by zero" + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use diesel::ExpressionMethods; + use diesel::QueryDsl; + use diesel::SelectableHelper; + use diesel::debug_query; + use nexus_db_model::Alert; + use nexus_db_schema::schema::alert; + use nexus_test_utils::db::TestDatabase; + use nexus_types::alert::AlertClass; + use nexus_types::identity::Asset; + use omicron_test_utils::dev; + use omicron_uuid_kinds::AlertUuid; + use omicron_uuid_kinds::GenericUuid; + + #[test] + fn emitted_sql_wraps_insert_in_guard_cte() { + let alert = Alert::new( + AlertUuid::new_v4(), + AlertClass::TestFoo, + serde_json::json!({}), + ); + let insert = diesel::insert_into(alert::table) + .values(alert) + .on_conflict_do_nothing() + .returning(Alert::as_returning()); + + let guarded = SitrepVersionGuardedInsert::::new(insert, 42); + let sql = debug_query::(&guarded).to_string(); + + assert!( + sql.contains("WITH guard AS MATERIALIZED"), + "missing guard CTE: {sql}" + ); + assert!( + sql.contains("inserted_row AS ("), + "missing inserted_row CTE: {sql}" + ); + assert!( + sql.contains("CAST(1/0 AS BOOL)"), + "missing division-by-zero: {sql}" + ); + assert!( + sql.contains("fm_rendezvous_progress"), + "guard must read tracker: {sql}" + ); + } + + #[tokio::test] + async fn guard_fires_when_tracker_ahead() { + let logctx = dev::test_setup_log("guard_fires_when_tracker_ahead"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let datastore = db.datastore(); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + datastore.fm_rendezvous_progress_advance(db.opctx(), 10).await.unwrap(); + + let alert = Alert::new( + AlertUuid::new_v4(), + AlertClass::TestFoo, + serde_json::json!({}), + ); + let alert_id = alert.id(); + let insert = diesel::insert_into(alert::table) + .values(alert) + .on_conflict_do_nothing() + .returning(Alert::as_returning()); + let guarded = SitrepVersionGuardedInsert::new(insert, 5); + let result = guarded.insert_and_get_optional_result_async(&conn).await; + assert!( + matches!(result, Err(GuardedInsertError::StaleSitrep)), + "got {result:?}", + ); + + let present: Option = alert::table + .filter(alert::id.eq(alert_id.into_untyped_uuid())) + .select(Alert::as_select()) + .first_async(&*conn) + .await + .optional() + .unwrap(); + assert!(present.is_none(), "guard fired but row was written anyway"); + + db.terminate().await; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn guard_passes_when_tracker_behind() { + let logctx = dev::test_setup_log("guard_passes_when_tracker_behind"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let datastore = db.datastore(); + let conn = datastore.pool_connection_for_tests().await.unwrap(); + + let alert = Alert::new( + AlertUuid::new_v4(), + AlertClass::TestFoo, + serde_json::json!({}), + ); + let insert = diesel::insert_into(alert::table) + .values(alert.clone()) + .on_conflict_do_nothing() + .returning(Alert::as_returning()); + let result = SitrepVersionGuardedInsert::new(insert, 1) + .insert_and_get_optional_result_async(&conn) + .await; + assert!(matches!(result, Ok(Some(_))), "got {result:?}"); + + let insert = diesel::insert_into(alert::table) + .values(alert) + .on_conflict_do_nothing() + .returning(Alert::as_returning()); + let result = SitrepVersionGuardedInsert::new(insert, 1) + .insert_and_get_optional_result_async(&conn) + .await; + assert!(matches!(result, Ok(None)), "got {result:?}"); + + db.terminate().await; + logctx.cleanup_successful(); + } +} diff --git a/nexus/db-queries/tests/output/webhook_rx_list_resendable_events.sql b/nexus/db-queries/tests/output/webhook_rx_list_resendable_events.sql index 2201d6b4ea8..524095d0b4b 100644 --- a/nexus/db-queries/tests/output/webhook_rx_list_resendable_events.sql +++ b/nexus/db-queries/tests/output/webhook_rx_list_resendable_events.sql @@ -7,7 +7,8 @@ SELECT alert.alert_class, alert.payload, alert.num_dispatched, - alert.case_id + alert.case_id, + alert.time_deleted FROM alert INNER JOIN webhook_delivery AS delivery ON delivery.alert_id = alert.id WHERE diff --git a/nexus/db-schema/src/schema.rs b/nexus/db-schema/src/schema.rs index 85383eac200..c53d317bb78 100644 --- a/nexus/db-schema/src/schema.rs +++ b/nexus/db-schema/src/schema.rs @@ -1593,6 +1593,7 @@ table! { assigned_nexus -> Nullable, user_comment -> Nullable, fm_case_id -> Nullable, + time_deleted -> Nullable, } } @@ -2882,6 +2883,7 @@ table! { time_dispatched -> Nullable, num_dispatched -> Int8, case_id -> Nullable, + time_deleted -> Nullable, } } @@ -3156,6 +3158,13 @@ table! { allow_tables_to_appear_in_same_query!(fm_sitrep, fm_sitrep_history); +table! { + fm_rendezvous_progress (singleton) { + singleton -> Bool, + latest_processed_sitrep_version -> Int8, + } +} + table! { disk_type_local_storage (disk_id) { disk_id -> Uuid, @@ -3343,3 +3352,22 @@ table! { allow_tables_to_appear_in_same_query!(trust_quorum_member, hw_baseboard_id); joinable!(trust_quorum_member -> hw_baseboard_id(hw_baseboard_id)); + +// Cross-table queries used by the FM resource-deletion machinery: the +// `SitrepVersionGuardedInsert` query (which reads `fm_rendezvous_progress` +// inline with an INSERT into `alert` or `support_bundle`) and the +// alert/support-bundle tombstone sweeps (which DELETE from `alert` / +// `support_bundle` filtered by a `NOT EXISTS` over a `fm_case INNER JOIN +// fm_sitrep_history` driven by the tracker in `fm_rendezvous_progress`). +allow_tables_to_appear_in_same_query!(alert, fm_case); +allow_tables_to_appear_in_same_query!(alert, fm_sitrep_history); +allow_tables_to_appear_in_same_query!(alert, fm_rendezvous_progress); +allow_tables_to_appear_in_same_query!(support_bundle, fm_case); +allow_tables_to_appear_in_same_query!(support_bundle, fm_sitrep_history); +allow_tables_to_appear_in_same_query!(support_bundle, fm_rendezvous_progress); +allow_tables_to_appear_in_same_query!(fm_case, fm_sitrep_history); +allow_tables_to_appear_in_same_query!(fm_case, fm_rendezvous_progress); +allow_tables_to_appear_in_same_query!( + fm_sitrep_history, + fm_rendezvous_progress +); diff --git a/nexus/src/app/alert.rs b/nexus/src/app/alert.rs index 7a4b21a03bb..7decd917d37 100644 --- a/nexus/src/app/alert.rs +++ b/nexus/src/app/alert.rs @@ -188,14 +188,14 @@ impl Nexus { id: AlertUuid, class: impl Into, event: serde_json::Value, - ) -> Result { + ) -> CreateResult { let alert = self .datastore() - .alert_create(opctx, Alert::new(id, class, event)) + .alert_create(opctx, Alert::new(id, class, event), None) .await?; // Once the alert has been inserted, activate the dispatcher task to - // ensure its propagated to receivers. + // ensure it's propagated to receivers. self.background_tasks.task_alert_dispatcher.activate(); Ok(alert) diff --git a/nexus/src/app/background/tasks/alert_dispatcher.rs b/nexus/src/app/background/tasks/alert_dispatcher.rs index a3d63a83a83..2686ba1517d 100644 --- a/nexus/src/app/background/tasks/alert_dispatcher.rs +++ b/nexus/src/app/background/tasks/alert_dispatcher.rs @@ -458,7 +458,7 @@ mod test { serde_json::json!({"msg": "help im trapped in a webhook event factory"}), ); datastore - .alert_create(&opctx, alert) + .alert_create(&opctx, alert, None) .await .expect("creating the event should work"); diff --git a/nexus/src/app/background/tasks/fm_rendezvous.rs b/nexus/src/app/background/tasks/fm_rendezvous.rs index 87cf70eb823..25e14785e67 100644 --- a/nexus/src/app/background/tasks/fm_rendezvous.rs +++ b/nexus/src/app/background/tasks/fm_rendezvous.rs @@ -78,6 +78,10 @@ impl FmRendezvous { return Status::default(); }; + let (ref version, ref current_sitrep) = *sitrep; + let sitrep_version = i64::from(version.version); + let sitrep_id = current_sitrep.id(); + let alerts = self.spawn_op( &sitrep, opctx, @@ -105,12 +109,99 @@ impl FmRendezvous { indicating that they were aborted. "; - Status { - sitrep_id: Some(sitrep.1.id()), - alerts: alerts.await.expect(TASKS_SHOULDNT_FAIL), - support_bundles: support_bundles.await.expect(TASKS_SHOULDNT_FAIL), - ereport_marking: marking.await.expect(TASKS_SHOULDNT_FAIL), + let alerts = alerts.await.expect(TASKS_SHOULDNT_FAIL); + let support_bundles = support_bundles.await.expect(TASKS_SHOULDNT_FAIL); + let ereport_marking = marking.await.expect(TASKS_SHOULDNT_FAIL); + + // Advance the rendezvous progress tracker to this sitrep's version, but + // only if every subtask completed without error. If there were any + // errors, we'll retry on the next activation. + // + // Note: holding the tracker back on errors is a conservative choice. If + // we advanced the tracker regardless, then `SitrepVersionGuardedInsert` + // would prevent future retries from executing, meaning that some + // requested alerts or support bundles would potentially be missed. The + // consequence is that, if there is a persistent failure here and the + // tracker never advances, the tombstone sweeps below may never be able + // to clean up resources. + let any_errors = !alerts.details.errors.is_empty() + || !support_bundles.details.errors.is_empty() + || !ereport_marking.details.errors.is_empty(); + let latest_processed_sitrep_version = if any_errors { + slog::warn!( + &opctx.log, + "skipping rendezvous progress tracker advance: subtasks \ + recorded per-request errors"; + "sitrep_version" => sitrep_version, + "sitrep_id" => ?sitrep_id, + "alert_errors" => alerts.details.errors.len(), + "support_bundle_errors" => support_bundles.details.errors.len(), + "ereport_marking_errors" => ereport_marking.details.errors.len(), + ); + None + } else { + match self + .datastore + .fm_rendezvous_progress_advance(opctx, sitrep_version) + .await + { + Ok(v) => Some(v), + Err(e) => { + slog::warn!( + &opctx.log, + "failed to advance rendezvous progress tracker"; + "sitrep_version" => sitrep_version, + "sitrep_id" => ?sitrep_id, + "error" => InlineErrorChain::new(&e).to_string(), + ); + None + } + } + }; + + let status = Status { + sitrep_id: Some(sitrep_id), + alerts, + support_bundles, + ereport_marking, + latest_processed_sitrep_version, + }; + + // Sweep eligible tombstoned alerts and support bundles. + match self.datastore.fm_alert_tombstone_sweep(opctx).await { + Ok(n) => { + if n > 0 { + slog::info!( + &opctx.log, + "swept alert tombstones"; + "count" => n, + ); + } + } + Err(e) => slog::warn!( + &opctx.log, + "alert tombstone sweep failed"; + "error" => InlineErrorChain::new(&e).to_string(), + ), } + match self.datastore.fm_support_bundle_tombstone_sweep(opctx).await { + Ok(n) => { + if n > 0 { + slog::info!( + &opctx.log, + "swept support bundle tombstones"; + "count" => n, + ); + } + } + Err(e) => slog::warn!( + &opctx.log, + "support bundle tombstone sweep failed"; + "error" => InlineErrorChain::new(&e).to_string(), + ), + } + + status } fn spawn_op( @@ -148,7 +239,8 @@ impl FmRendezvous { sitrep: CurrentSitrep, opctx: OpContext, ) -> AlertCreationStatus { - let (_, ref sitrep) = *sitrep; + let (ref version, ref sitrep) = *sitrep; + let sitrep_version = i64::from(version.version); let mut status = AlertCreationStatus::default(); // XXX(eliza): is it better to allocate all of these into a big array @@ -157,15 +249,6 @@ impl FmRendezvous { // would need to use `ON CONFLICT DO NOTHING` rather than checking for // `Conflict` errors from individual inserts, since multiple Nexus // instances may run this task concurrently. - // - // TODO(#9592) Currently, these `alert_create` calls have no guard - // against a stale Nexus inserting alerts from an outdated sitrep. This - // is fine for now because alert requests are carried forward into newer - // sitreps, so a stale insert is redundant rather than incorrect. - // However, if alerts are ever hard-deleted (e.g. when a case is - // closed), a lagging Nexus could re-create "zombie" alert records after - // deletion. At that point, the INSERT should be guarded by a CTE that - // checks the sitrep generation matches the current one. for (case_id, req) in sitrep.alerts_requested() { let &AlertRequest { id, class, requested_sitrep_id, .. } = req; status.total_alerts_requested += 1; @@ -177,13 +260,32 @@ impl FmRendezvous { .alert_create( &opctx, db::model::Alert::for_fm_alert_request(req, case_id), + Some(sitrep_version), ) .await { // Alert already exists --- this is expected, since multiple // Nexus instances may run this task concurrently for the same // sitrep, or a previous activation may have partially completed. - Err(Error::Conflict { .. }) => {} + Err(Error::ObjectAlreadyExists { .. }) => {} + Err(Error::Conflict { ref message }) + if message.external_message().contains("stale sitrep") => + { + // The FM sitrep-version guard rejected the insert + // because the rendezvous progress tracker is already + // ahead of this activation's sitrep. Some other (more + // current) Nexus has moved past us; skipping the insert + // is the correct behavior, not an error. + slog::info!( + opctx.log, + "alert_create skipped: stale sitrep version"; + "sitrep_version" => sitrep_version, + "case_id" => %case_id, + "alert_id" => %id, + "alert_class" => %class, + ); + status.stale_sitrep += 1; + } Err(e) => { slog::warn!( opctx.log, @@ -197,7 +299,16 @@ impl FmRendezvous { .errors .push(format!("alert {id} (class: {class}): {e}")); } - Ok(_) => status.alerts_created += 1, + Ok(_alert) => { + slog::debug!( + opctx.log, + "published alert"; + "case_id" => %case_id, + "alert_id" => %id, + "alert_class" => %class, + ); + status.alerts_created += 1; + } } } @@ -211,6 +322,7 @@ impl FmRendezvous { "sitrep_id" => %sitrep.id(), "total_alerts_requested" => status.total_alerts_requested, "alerts_created" => status.alerts_created, + "stale_sitrep" => status.stale_sitrep, "errors" => n_errors, ); } else if status.alerts_created > 0 { @@ -221,15 +333,35 @@ impl FmRendezvous { "sitrep_id" => %sitrep.id(), "total_alerts_requested" => status.total_alerts_requested, "alerts_created" => status.alerts_created, + "stale_sitrep" => status.stale_sitrep, ); } else if status.total_alerts_requested > 0 { - slog::debug!( - opctx.log, - "all alerts requested by the current sitrep already exist"; - "sitrep_id" => %sitrep.id(), - "total_alerts_requested" => status.total_alerts_requested, - "alerts_created" => status.alerts_created, - ); + // No errors, nothing newly created, but we did process requests. + // Branch the human-readable message on whether any inserts were + // skipped because the sitrep was stale: those alerts may not + // exist yet (a more current Nexus might be racing with this one + // and hasn't created them either), so saying "already exist" + // would be misleading. + if status.stale_sitrep > 0 { + slog::debug!( + opctx.log, + "no alerts created from the current sitrep \ + (some skipped due to stale sitrep version)"; + "sitrep_id" => %sitrep.id(), + "total_alerts_requested" => status.total_alerts_requested, + "alerts_created" => status.alerts_created, + "stale_sitrep" => status.stale_sitrep, + ); + } else { + slog::debug!( + opctx.log, + "all alerts requested by the current sitrep already exist"; + "sitrep_id" => %sitrep.id(), + "total_alerts_requested" => status.total_alerts_requested, + "alerts_created" => status.alerts_created, + "stale_sitrep" => status.stale_sitrep, + ); + } } else { slog::debug!( opctx.log, @@ -328,18 +460,14 @@ impl FmRendezvous { sitrep: CurrentSitrep, opctx: OpContext, ) -> SupportBundleCreationStatus { - let (_, ref sitrep) = *sitrep; + let (ref version, ref sitrep) = *sitrep; + let sitrep_version = i64::from(version.version); let mut status = SupportBundleCreationStatus::default(); // Like alert creation (see `create_requested_alerts`), we iterate all // bundle requests in the sitrep, not just new ones. Multiple Nexus // instances may run this concurrently. Bundle creation is idempotent, // so racing instances won't inadvertently create duplicates. - // - // TODO(#9592) Same stale-Nexus concern as alerts: if support bundle - // requests are ever removed from sitreps (e.g. when a case is closed), - // a lagging Nexus could re-create "zombie" bundles from an outdated - // sitrep. Currently safe because requests are carried forward. for (case, req) in sitrep.support_bundles_requested() { let case_id = case.id; let de = case.metadata.de; @@ -380,6 +508,7 @@ impl FmRendezvous { provenance: SupportBundleProvenance::Fm { id: bundle_id, case_id, + sitrep_version, }, reason: &reason, nexus_id: self.nexus_id, @@ -398,6 +527,23 @@ impl FmRendezvous { "bundle_id" => %bundle_id, ); } + // The FM sitrep-version guard rejected the insert because the + // rendezvous progress tracker is already ahead of this + // activation's sitrep. Some other (more current) Nexus has + // moved past us; skipping the insert is the correct behavior, + // not an error. + Err(Error::Conflict { ref message }) + if message.external_message().contains("stale sitrep") => + { + slog::info!( + opctx.log, + "support_bundle_create skipped: stale sitrep version"; + "sitrep_version" => sitrep_version, + "case_id" => %case_id, + "bundle_id" => %bundle_id, + ); + status.stale_sitrep += 1; + } // Other errors, unexpected. Err(e) => { slog::warn!( @@ -425,6 +571,7 @@ impl FmRendezvous { n_errors; "total_bundles_requested" => status.total_bundles_requested, "bundles_created" => status.bundles_created, + "stale_sitrep" => status.stale_sitrep, "errors" => n_errors, ); } else if status.bundles_created > 0 { @@ -434,14 +581,31 @@ impl FmRendezvous { status.bundles_created; "total_bundles_requested" => status.total_bundles_requested, "bundles_created" => status.bundles_created, + "stale_sitrep" => status.stale_sitrep, ); } else if status.total_bundles_requested > 0 { - slog::debug!( - opctx.log, - "all support bundles requested by the current sitrep \ - already exist"; - "total_bundles_requested" => status.total_bundles_requested, - ); + // No errors, nothing newly created, but we did process requests. + // Same reasoning as `create_requested_alerts`'s aggregate log: + // if any inserts were skipped because the sitrep was stale, + // those bundles may not exist yet, so "already exist" would be + // misleading. + if status.stale_sitrep > 0 { + slog::debug!( + opctx.log, + "no support bundles created from the current sitrep \ + (some skipped due to stale sitrep version)"; + "total_bundles_requested" => status.total_bundles_requested, + "stale_sitrep" => status.stale_sitrep, + ); + } else { + slog::debug!( + opctx.log, + "all support bundles requested by the current sitrep \ + already exist"; + "total_bundles_requested" => status.total_bundles_requested, + "stale_sitrep" => status.stale_sitrep, + ); + } } else { slog::debug!( opctx.log, @@ -603,6 +767,7 @@ mod tests { total_alerts_requested: 1, current_sitrep_alerts_requested: 1, alerts_created: 1, + stale_sitrep: 0, errors: Vec::new() } ); @@ -694,6 +859,7 @@ mod tests { total_alerts_requested: 3, current_sitrep_alerts_requested: 2, alerts_created: 2, + stale_sitrep: 0, errors: Vec::new() } ); @@ -1596,6 +1762,505 @@ mod tests { logctx.cleanup_successful(); } + #[tokio::test] + async fn test_rendezvous_activation_advances_tracker() { + let logctx = + dev::test_setup_log("test_rendezvous_activation_advances_tracker"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let (sitrep_tx, sitrep_rx) = watch::channel(None); + + let TestActivators { + alert_dispatcher_activator, + support_bundle_collector_activator, + } = make_activators(); + + let mut task = FmRendezvous::new( + datastore.clone(), + sitrep_rx, + alert_dispatcher_activator.clone(), + support_bundle_collector_activator.clone(), + OmicronZoneUuid::new_v4(), + ); + + // Tracker bootstraps at 0 before any activation. `advance(0)` is + // the natural read idiom: GREATEST(0, 0) = 0. + assert_eq!( + datastore.fm_rendezvous_progress_advance(opctx, 0).await.unwrap(), + 0, + "tracker should bootstrap at 0", + ); + + // Publish a sitrep with version=3 and no cases. + let sitrep_id = SitrepUuid::new_v4(); + let sitrep = fm::Sitrep { + metadata: fm::SitrepMetadata { + id: sitrep_id, + inv_collection_id: CollectionUuid::new_v4(), + parent_sitrep_id: None, + creator_id: OmicronZoneUuid::new_v4(), + comment: "test sitrep for tracker advance".to_string(), + time_created: Utc::now(), + next_inv_min_time_started: Utc::now(), + }, + cases: iddqd::IdOrdMap::new(), + ereports_by_id: Default::default(), + }; + sitrep_tx + .send(Some(Arc::new(( + fm::SitrepVersion { + id: sitrep_id, + version: 3, + time_made_current: Utc::now(), + }, + sitrep, + )))) + .unwrap(); + + // One full activation against version=3. + let _ = dbg!(task.actually_activate(opctx).await); + + // The tracker should have advanced to 3. + let v = + datastore.fm_rendezvous_progress_advance(opctx, 0).await.unwrap(); + assert_eq!( + v, 3, + "tracker should advance to the activated sitrep's version", + ); + + // Cleanup + db.terminate().await; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_rendezvous_activation_runs_sweep() { + let logctx = + dev::test_setup_log("test_rendezvous_activation_runs_sweep"); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let (sitrep_tx, sitrep_rx) = watch::channel(None); + + let TestActivators { + alert_dispatcher_activator, + support_bundle_collector_activator, + } = make_activators(); + + let mut task = FmRendezvous::new( + datastore.clone(), + sitrep_rx, + alert_dispatcher_activator.clone(), + support_bundle_collector_activator.clone(), + OmicronZoneUuid::new_v4(), + ); + + // Build sitrep v=1 carrying a single case, and persist it (which + // assigns it a row in `fm_sitrep_history`). The sweep query joins + // `fm_case` to `fm_sitrep_history`, so the case must really exist + // in the DB at this version for the case-lifetime check to be + // meaningful. + let sitrep1_id = SitrepUuid::new_v4(); + let case_id = CaseUuid::new_v4(); + let sitrep1 = { + let case = fm::Case { + id: case_id, + metadata: fm::case::Metadata { + created_sitrep_id: sitrep1_id, + closed_sitrep_id: None, + de: fm::DiagnosisEngineKind::PowerShelf, + comment: "case in v1".to_string(), + }, + alerts_requested: iddqd::IdOrdMap::new(), + ereports: iddqd::IdOrdMap::new(), + support_bundles_requested: iddqd::IdOrdMap::new(), + }; + let mut cases = iddqd::IdOrdMap::new(); + cases.insert_unique(case).unwrap(); + fm::Sitrep { + metadata: fm::SitrepMetadata { + id: sitrep1_id, + inv_collection_id: CollectionUuid::new_v4(), + parent_sitrep_id: None, + creator_id: OmicronZoneUuid::new_v4(), + comment: "test sitrep v1".to_string(), + time_created: Utc::now(), + next_inv_min_time_started: Utc::now(), + }, + cases, + ereports_by_id: Default::default(), + } + }; + datastore + .fm_sitrep_insert(opctx, sitrep1) + .await + .expect("insert sitrep v1"); + + // Read back the assigned version so we can later send it through + // the watch channel. + use nexus_db_schema::schema::fm_sitrep_history::dsl as history_dsl; + let v1: i64 = { + let conn = datastore.pool_connection_for_tests().await.unwrap(); + history_dsl::fm_sitrep_history + .filter( + history_dsl::sitrep_id.eq(sitrep1_id.into_untyped_uuid()), + ) + .select(history_dsl::version) + .first_async::(&*conn) + .await + .expect("read v1") + }; + + // Insert an FM-originated alert tied to the case and tombstone it. + let alert_id = AlertUuid::new_v4(); + { + let mut alert = nexus_db_model::Alert::new( + alert_id, + AlertClass::TestFoo, + serde_json::json!({}), + ); + alert.case_id = Some(case_id.into()); + alert.time_deleted = None; + let conn = datastore.pool_connection_for_tests().await.unwrap(); + diesel::insert_into(nexus_db_schema::schema::alert::table) + .values(alert) + .execute_async(&*conn) + .await + .expect("inserting fm alert"); + diesel::update(alert_dsl::alert) + .filter(alert_dsl::id.eq(alert_id.into_untyped_uuid())) + .set(alert_dsl::time_deleted.eq(Utc::now())) + .execute_async(&*conn) + .await + .expect("tombstoning alert"); + } + + // Build sitrep v=2 that does NOT carry the case forward. Persist it + // so it gets a row in `fm_sitrep_history` at version 2. + let sitrep2_id = SitrepUuid::new_v4(); + let sitrep2 = fm::Sitrep { + metadata: fm::SitrepMetadata { + id: sitrep2_id, + inv_collection_id: CollectionUuid::new_v4(), + parent_sitrep_id: Some(sitrep1_id), + creator_id: OmicronZoneUuid::new_v4(), + comment: "test sitrep v2 (case dropped)".to_string(), + time_created: Utc::now(), + next_inv_min_time_started: Utc::now(), + }, + cases: iddqd::IdOrdMap::new(), + ereports_by_id: Default::default(), + }; + datastore + .fm_sitrep_insert(opctx, sitrep2.clone()) + .await + .expect("insert sitrep v2"); + + let v2: i64 = { + let conn = datastore.pool_connection_for_tests().await.unwrap(); + history_dsl::fm_sitrep_history + .filter( + history_dsl::sitrep_id.eq(sitrep2_id.into_untyped_uuid()), + ) + .select(history_dsl::version) + .first_async::(&*conn) + .await + .expect("read v2") + }; + assert!(v2 > v1, "v2 ({v2}) must be greater than v1 ({v1})"); + + // Publish sitrep v2 through the watch channel and activate. + sitrep_tx + .send(Some(Arc::new(( + fm::SitrepVersion { + id: sitrep2_id, + version: u32::try_from(v2).expect("v2 fits in u32"), + time_made_current: Utc::now(), + }, + sitrep2, + )))) + .unwrap(); + + let _ = dbg!(task.actually_activate(opctx).await); + + // Tracker advanced to v2; sweep ran; the tombstoned alert should be + // gone (no `fm_case` row references this case at version >= v2). + let count: i64 = { + let conn = datastore.pool_connection_for_tests().await.unwrap(); + alert_dsl::alert + .filter(alert_dsl::id.eq(alert_id.into_untyped_uuid())) + .count() + .get_result_async::(&*conn) + .await + .expect("counting alerts") + }; + assert_eq!( + count, 0, + "tombstoned FM alert should have been swept after activation", + ); + + // Cleanup + db.terminate().await; + logctx.cleanup_successful(); + } + + /// When the rendezvous progress tracker is ahead of the sitrep this + /// activation is processing, `alert_create` should reject the insert via + /// the FM sitrep-version guard and the per-subtask status should record + /// the rejection in `stale_sitrep` (rather than treating it as an error + /// or silently counting it as created). + #[tokio::test] + async fn test_rendezvous_alert_stale_sitrep_is_recorded() { + let logctx = dev::test_setup_log( + "test_rendezvous_alert_stale_sitrep_is_recorded", + ); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let (sitrep_tx, sitrep_rx) = watch::channel(None); + + let TestActivators { + alert_dispatcher_activator, + support_bundle_collector_activator, + } = make_activators(); + + let mut task = FmRendezvous::new( + datastore.clone(), + sitrep_rx, + alert_dispatcher_activator.clone(), + support_bundle_collector_activator.clone(), + OmicronZoneUuid::new_v4(), + ); + + // Advance the rendezvous progress tracker well past anything we'll + // publish on the watch channel, so the guarded insert that + // `alert_create` performs in the FM path will reject our v=1 sitrep. + datastore + .fm_rendezvous_progress_advance(opctx, 10) + .await + .expect("advance tracker"); + + // Build a sitrep at v=1 with a single case that requests one alert. + let sitrep_id = SitrepUuid::new_v4(); + let alert_id = AlertUuid::new_v4(); + let case_id = CaseUuid::new_v4(); + let mut case = fm::Case { + id: case_id, + metadata: fm::case::Metadata { + created_sitrep_id: sitrep_id, + closed_sitrep_id: None, + de: fm::DiagnosisEngineKind::PowerShelf, + comment: "stale-sitrep test case".to_string(), + }, + alerts_requested: iddqd::IdOrdMap::new(), + ereports: iddqd::IdOrdMap::new(), + support_bundles_requested: iddqd::IdOrdMap::new(), + }; + case.alerts_requested + .insert_unique(fm::case::AlertRequest { + id: alert_id, + class: AlertClass::TestFoo, + requested_sitrep_id: sitrep_id, + payload: serde_json::json!({}), + comment: String::new(), + }) + .unwrap(); + let sitrep = { + let mut cases = iddqd::IdOrdMap::new(); + cases.insert_unique(case).unwrap(); + fm::Sitrep { + metadata: fm::SitrepMetadata { + id: sitrep_id, + inv_collection_id: CollectionUuid::new_v4(), + parent_sitrep_id: None, + creator_id: OmicronZoneUuid::new_v4(), + comment: "stale-sitrep test sitrep".to_string(), + time_created: Utc::now(), + next_inv_min_time_started: Utc::now(), + }, + cases, + ereports_by_id: Default::default(), + } + }; + + sitrep_tx + .send(Some(Arc::new(( + fm::SitrepVersion { + id: sitrep_id, + version: 1, + time_made_current: Utc::now(), + }, + sitrep, + )))) + .unwrap(); + + let status = dbg!(task.actually_activate(opctx).await); + let alerts = &status.alerts.details; + assert_eq!( + alerts.total_alerts_requested, 1, + "subtask should still observe the alert request", + ); + assert_eq!( + alerts.current_sitrep_alerts_requested, 1, + "subtask should still observe the alert as new in this sitrep", + ); + assert_eq!( + alerts.alerts_created, 0, + "no alert should be created when the guard rejects the insert", + ); + assert_eq!( + alerts.stale_sitrep, 1, + "the stale-sitrep rejection should be recorded in the status", + ); + assert!( + alerts.errors.is_empty(), + "stale-sitrep is not an error: {:?}", + alerts.errors, + ); + + // The alert row must NOT be present in the database. + let result = fetch_alert(&datastore, alert_id).await; + assert!( + result.is_err(), + "alert row should not exist when the guard rejected the insert", + ); + + // Cleanup + db.terminate().await; + logctx.cleanup_successful(); + } + + /// When the rendezvous progress tracker is ahead of the sitrep this + /// activation is processing, `support_bundle_create` should reject the + /// insert via the FM sitrep-version guard and the per-subtask status + /// should record the rejection in `stale_sitrep` (rather than treating + /// it as an error or silently counting it as created). + #[tokio::test] + async fn test_rendezvous_bundle_stale_sitrep_is_recorded() { + let logctx = dev::test_setup_log( + "test_rendezvous_bundle_stale_sitrep_is_recorded", + ); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + let (sitrep_tx, sitrep_rx) = watch::channel(None); + + let TestActivators { + alert_dispatcher_activator, + support_bundle_collector_activator, + } = make_activators(); + + let mut task = FmRendezvous::new( + datastore.clone(), + sitrep_rx, + alert_dispatcher_activator.clone(), + support_bundle_collector_activator.clone(), + OmicronZoneUuid::new_v4(), + ); + + // Provide free debug datasets so allocation doesn't fail before the + // guarded insert has a chance to run. + create_sled_with_zpools(&datastore, &opctx, 3).await; + + // Advance the rendezvous progress tracker well past anything we'll + // publish on the watch channel, so the guarded insert that + // `support_bundle_create` performs in the FM path will reject our + // v=1 sitrep. + datastore + .fm_rendezvous_progress_advance(opctx, 10) + .await + .expect("advance tracker"); + + // Build a sitrep at v=1 with a single case that requests one bundle. + let sitrep_id = SitrepUuid::new_v4(); + let bundle_id = SupportBundleUuid::new_v4(); + let case_id = CaseUuid::new_v4(); + let mut case = fm::Case { + id: case_id, + metadata: fm::case::Metadata { + created_sitrep_id: sitrep_id, + closed_sitrep_id: None, + de: fm::DiagnosisEngineKind::PowerShelf, + comment: "stale-sitrep test case".to_string(), + }, + alerts_requested: iddqd::IdOrdMap::new(), + ereports: iddqd::IdOrdMap::new(), + support_bundles_requested: iddqd::IdOrdMap::new(), + }; + case.support_bundles_requested + .insert_unique(fm::case::SupportBundleRequest { + id: bundle_id, + requested_sitrep_id: sitrep_id, + data_selection: BundleDataSelection::all(), + comment: "stale-sitrep test bundle".to_string(), + }) + .unwrap(); + let sitrep = { + let mut cases = iddqd::IdOrdMap::new(); + cases.insert_unique(case).unwrap(); + fm::Sitrep { + metadata: fm::SitrepMetadata { + id: sitrep_id, + inv_collection_id: CollectionUuid::new_v4(), + parent_sitrep_id: None, + creator_id: OmicronZoneUuid::new_v4(), + comment: "stale-sitrep test sitrep".to_string(), + time_created: Utc::now(), + next_inv_min_time_started: Utc::now(), + }, + cases, + ereports_by_id: Default::default(), + } + }; + + sitrep_tx + .send(Some(Arc::new(( + fm::SitrepVersion { + id: sitrep_id, + version: 1, + time_made_current: Utc::now(), + }, + sitrep, + )))) + .unwrap(); + + let status = dbg!(task.actually_activate(opctx).await); + let bundles = &status.support_bundles.details; + assert_eq!( + bundles.total_bundles_requested, 1, + "subtask should still observe the bundle request", + ); + assert_eq!( + bundles.current_sitrep_bundles_requested, 1, + "subtask should still observe the bundle as new in this sitrep", + ); + assert_eq!( + bundles.bundles_created, 0, + "no bundle should be created when the guard rejects the insert", + ); + assert_eq!( + bundles.stale_sitrep, 1, + "the stale-sitrep rejection should be recorded in the status", + ); + assert!( + bundles.errors.is_empty(), + "stale-sitrep is not an error: {:?}", + bundles.errors, + ); + + // The bundle row must NOT be present in the database. + let result = fetch_support_bundle(&datastore, bundle_id).await; + assert!( + result.is_err(), + "bundle row should not exist when the guard rejected the insert", + ); + + // Cleanup + db.terminate().await; + logctx.cleanup_successful(); + } + #[tokio::test] async fn test_support_bundle_requests_capacity_error() { let logctx = @@ -1682,6 +2347,20 @@ mod tests { assert_eq!(support_bundles.bundles_created, 0); assert_eq!(support_bundles.total_bundles_requested, 1); + // Per-request errors hold the tracker still: the activation has not + // earned the right to advance, so `latest_processed_sitrep_version` + // stays at the bootstrap 0 and the `Status` field is `None`. + assert_eq!( + status.latest_processed_sitrep_version, None, + "tracker advance should be skipped when a subtask records errors", + ); + assert_eq!( + datastore.fm_rendezvous_progress_advance(opctx, 0).await.unwrap(), + 0, + "tracker should remain at the bootstrap value when the activation \ + had per-request errors", + ); + // The bundle should NOT exist in the database. let result = fetch_support_bundle(&datastore, bundle_id).await; assert!( @@ -1704,4 +2383,182 @@ mod tests { db.terminate().await; logctx.cleanup_successful(); } + + /// End-to-end concurrent-Nexus zombie-prevention test for alerts. + /// + /// This composes the primitives built in earlier tasks (the + /// `fm_rendezvous_progress` tracker, the `SitrepVersionGuardedInsert` CTE, + /// and the `alert_create` FM path that wraps the insert in that guard) and + /// asserts the spec-mandated race contract: + /// + /// "Concurrent Nexuses at adjacent sitreps. Nexus C is executing sitrep N. + /// Nexus A is executing sitrep N+1 (N+1 became current after C started). A + /// completes first, advancing the tracker and running the sweep — which + /// drops tombstones for cases that dropped out between N and N+1. C then + /// inserts one of those resources, creating a zombie. The CTE guard closes + /// both scenarios structurally by making the tracker read atomic with each + /// individual insert." + /// + /// Both directions are checked: the older sitrep's insert must surface + /// `StaleSitrep` (and not land a row); a fresh insert at the current + /// sitrep version must succeed. + #[tokio::test] + async fn test_concurrent_nexus_cannot_zombie_alert_after_sweep() { + use nexus_types::identity::Asset; + + let logctx = dev::test_setup_log( + "test_concurrent_nexus_cannot_zombie_alert_after_sweep", + ); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + // Simulate Nexus A's outcome: it has activated for sitrep v=N+1=2, + // advanced the tracker to 2, and run the sweep — so any case whose + // requested resources hadn't yet been inserted by N=1 has had its + // tombstones dropped. + datastore + .fm_rendezvous_progress_advance(opctx, 2) + .await + .expect("advance tracker to N+1"); + + // Nexus C is still holding sitrep v=N=1 and tries to insert an alert. + // The CTE guard must reject it as `StaleSitrep` — without this guard, + // C would create a zombie row whose case-lifetime tombstone has + // already been swept. + let stale_alert = nexus_db_model::Alert::new( + AlertUuid::new_v4(), + AlertClass::TestFoo, + json!({}), + ); + let err = datastore + .alert_create(opctx, stale_alert.clone(), Some(1)) + .await + .expect_err( + "the older Nexus's insert must surface a stale-sitrep error", + ); + match err { + Error::Conflict { ref message } + if message.external_message().contains("stale sitrep") => {} + other => { + panic!("expected Conflict with stale sitrep, got {other:?}",) + } + } + let result = fetch_alert(&datastore, stale_alert.id()).await; + assert!( + result.is_err(), + "stale-sitrep insert must NOT have landed a row \ + (would have been a zombie)", + ); + + // Bonus: an insert at the current sitrep version must still succeed. + // This verifies the guard isn't accidentally blocking everything. + let fresh_alert = nexus_db_model::Alert::new( + AlertUuid::new_v4(), + AlertClass::TestFoo, + json!({}), + ); + let _ = datastore + .alert_create(opctx, fresh_alert.clone(), Some(2)) + .await + .expect("an insert at the current sitrep version must succeed"); + let row = fetch_alert(&datastore, fresh_alert.id()) + .await + .expect("fresh alert row should exist"); + assert_eq!(row.id(), fresh_alert.id()); + + // Cleanup + db.terminate().await; + logctx.cleanup_successful(); + } + + /// End-to-end concurrent-Nexus zombie-prevention test for support bundles. + /// + /// Symmetric to `test_concurrent_nexus_cannot_zombie_alert_after_sweep`, + /// but exercising the same race for support-bundle creation. See that + /// test's doc-comment for the spec quote. + #[tokio::test] + async fn test_concurrent_nexus_cannot_zombie_bundle_after_sweep() { + use omicron_uuid_kinds::CaseUuid; + + let logctx = dev::test_setup_log( + "test_concurrent_nexus_cannot_zombie_bundle_after_sweep", + ); + let db = TestDatabase::new_with_datastore(&logctx.log).await; + let (opctx, datastore) = (db.opctx(), db.datastore()); + + // Provide free debug datasets so allocation does not short-circuit + // before the guarded insert has a chance to run. + create_sled_with_zpools(&datastore, &opctx, 3).await; + + // Simulate Nexus A having advanced the tracker to N+1 and swept. + datastore + .fm_rendezvous_progress_advance(opctx, 2) + .await + .expect("advance tracker to N+1"); + + // Nexus C, still holding sitrep v=N=1, tries to create a bundle. + // The guard must reject the insert as a stale-sitrep conflict. + let stale_bundle_id = SupportBundleUuid::new_v4(); + let err = datastore + .support_bundle_create( + opctx, + SupportBundleCreateParams { + provenance: SupportBundleProvenance::Fm { + id: stale_bundle_id, + case_id: CaseUuid::new_v4(), + sitrep_version: 1, + }, + reason: "concurrent-nexus zombie test (stale)", + nexus_id: OmicronZoneUuid::new_v4(), + user_comment: None, + data_selection: BundleDataSelection::all(), + }, + ) + .await + .expect_err("support_bundle_create at stale sitrep must fail"); + assert!( + matches!( + err, + Error::Conflict { ref message } + if message.external_message().contains("stale sitrep") + ), + "expected stale-sitrep conflict, got {err:?}", + ); + let result = fetch_support_bundle(&datastore, stale_bundle_id).await; + assert!( + result.is_err(), + "stale-sitrep insert must NOT have landed a row \ + (would have been a zombie)", + ); + + // Bonus: a bundle insert at the current sitrep version must still + // succeed end-to-end (allocation + guarded insert). + let fresh_bundle_id = SupportBundleUuid::new_v4(); + let bundle = datastore + .support_bundle_create( + opctx, + SupportBundleCreateParams { + provenance: SupportBundleProvenance::Fm { + id: fresh_bundle_id, + case_id: CaseUuid::new_v4(), + sitrep_version: 2, + }, + reason: "concurrent-nexus zombie test (fresh)", + nexus_id: OmicronZoneUuid::new_v4(), + user_comment: None, + data_selection: BundleDataSelection::all(), + }, + ) + .await + .expect("support_bundle_create at current sitrep should succeed"); + assert_eq!(bundle.id, fresh_bundle_id.into()); + let row = fetch_support_bundle(&datastore, fresh_bundle_id) + .await + .expect("fresh bundle row should exist"); + assert_eq!(row.id, fresh_bundle_id.into()); + + // Cleanup + db.terminate().await; + logctx.cleanup_successful(); + } } diff --git a/nexus/tests/integration_tests/support_bundles.rs b/nexus/tests/integration_tests/support_bundles.rs index 4ddd5365e16..63b7501a6cd 100644 --- a/nexus/tests/integration_tests/support_bundles.rs +++ b/nexus/tests/integration_tests/support_bundles.rs @@ -974,6 +974,7 @@ async fn test_support_bundle_fm_case_id(cptestctx: &ControlPlaneTestContext) { provenance: SupportBundleProvenance::Fm { id: SupportBundleUuid::new_v4(), case_id, + sitrep_version: 0, }, reason: "FM test bundle", nexus_id: nexus.id(), diff --git a/nexus/types/src/internal_api/background.rs b/nexus/types/src/internal_api/background.rs index cbf09135bde..69a07319ab7 100644 --- a/nexus/types/src/internal_api/background.rs +++ b/nexus/types/src/internal_api/background.rs @@ -995,6 +995,11 @@ pub struct FmRendezvousStatus { fm_rendezvous::OpStatus, pub ereport_marking: fm_rendezvous::OpStatus, + /// Post-advance value of the `fm_rendezvous_progress` singleton tracker, + /// captured at the tail of this activation. `None` if the advance was + /// skipped (because some subtask recorded a per-request error) or the + /// advance call itself failed; both cases are logged separately. + pub latest_processed_sitrep_version: Option, } pub mod fm_rendezvous { @@ -1024,6 +1029,12 @@ pub mod fm_rendezvous { pub current_sitrep_alerts_requested: usize, /// The number of alerts created by this activation. pub alerts_created: usize, + /// The number of alert-creation attempts that the FM sitrep-version + /// guard rejected because this Nexus is processing a sitrep older than + /// the rendezvous progress tracker. These attempts are not errors: + /// they indicate that another (more current) Nexus has already moved + /// past this sitrep, so the insert is intentionally skipped. + pub stale_sitrep: usize, /// Errors that occurred during this activation. pub errors: Vec, } @@ -1037,6 +1048,13 @@ pub mod fm_rendezvous { pub current_sitrep_bundles_requested: usize, /// The number of support bundles created by this activation. pub bundles_created: usize, + /// The number of support-bundle-creation attempts that the FM + /// sitrep-version guard rejected because this Nexus is processing a + /// sitrep older than the rendezvous progress tracker. These attempts + /// are not errors: they indicate that another (more current) Nexus + /// has already moved past this sitrep, so the insert is intentionally + /// skipped. + pub stale_sitrep: usize, /// Errors that occurred during this activation. pub errors: Vec, } diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index caa7558304a..be3fb9dc298 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -3156,16 +3156,25 @@ CREATE TABLE IF NOT EXISTS omicron.public.support_bundle ( user_comment TEXT, -- If this bundle was requested by an FM case, the case UUID. - fm_case_id UUID + fm_case_id UUID, + + -- Tombstone timestamp: set when a bundle is soft-deleted. We currently + -- soft-delete only support bundles that were requested by fault management + -- (that is, when fm_case_id is set) so that they aren't accidentally + -- resurrected after deletion. Bundles created by operators (without + -- fm_case_id) can be hard-deleted. + time_deleted TIMESTAMPTZ ); --- The "UNIQUE" part of this index helps enforce that we allow one support bundle --- per debug dataset. This constraint can be removed, if the query responsible --- for allocation changes to allocate more intelligently. +-- The "UNIQUE" part of this index helps enforce that we allow one support +-- bundle per debug dataset; tombstoned rows are excluded from this index so a +-- dataset may be reused once its previous bundle is soft-deleted. This +-- constraint can be removed, if the query responsible for allocation changes to +-- allocate more intelligently. CREATE UNIQUE INDEX IF NOT EXISTS one_bundle_per_dataset ON omicron.public.support_bundle ( dataset_id -); +) WHERE time_deleted IS NULL; CREATE INDEX IF NOT EXISTS lookup_bundle_by_nexus ON omicron.public.support_bundle ( assigned_nexus @@ -7026,6 +7035,12 @@ CREATE TABLE IF NOT EXISTS omicron.public.alert ( -- The ID of the fault management case that created this alert, if any. case_id UUID, + -- Tombstone timestamp: set when an alert is soft-deleted. We currently + -- soft-delete only alerts that were requested by fault management (that is, + -- when case_id is set) so that they aren't accidentally resurrected after + -- deletion. Other alerts (without case_id) can be hard-deleted. + time_deleted TIMESTAMPTZ, + CONSTRAINT time_dispatched_set_if_dispatched CHECK ( (num_dispatched = 0) OR (time_dispatched IS NOT NULL) ), @@ -7727,6 +7742,39 @@ CREATE TABLE IF NOT EXISTS omicron.public.fm_support_bundle_request_data_selecti CHECK (start_time IS NULL OR end_time IS NULL OR start_time <= end_time) ); +-- Singleton tracker recording the highest sitrep version that the FM rendezvous +-- background task has fully processed on any Nexus. This has two related uses: +-- +-- * First, we use this to ensure forward progress. When executing a sitrep, its +-- requested alerts and support bundles (we'll collectively refer to these +-- here as "resources") are only created if the sitrep's version is at least +-- as new as the tracker (see `SitrepVersionGuardedInsert`). +-- +-- * Second, we use this to determine when it's safe to hard-delete tombstoned +-- resources that had previously been created by FM. We know (from the above) +-- that a stale sitrep can't inadvertently resurrect a deleted resource, so +-- any resource referencing a case that is no longer present in any sitrep at +-- least as new as this tracker can be safely garbage-collected. +CREATE TABLE IF NOT EXISTS omicron.public.fm_rendezvous_progress ( + -- There should only be one row of this table for the whole DB. + -- Filter on "singleton = true" before querying or applying updates. + singleton BOOL NOT NULL PRIMARY KEY, + -- Effectively a foreign key into fm_sitrep_history (version). Not + -- enforced as such: the referenced row may be GC'd out of history + -- (#9384) while this tracker is still meaningful, and the bootstrap + -- value 0 precedes any real sitrep version. + latest_processed_sitrep_version INT8 NOT NULL, + + CHECK (singleton = true) +); + +INSERT INTO omicron.public.fm_rendezvous_progress ( + singleton, + latest_processed_sitrep_version +) VALUES + (TRUE, 0) +ON CONFLICT DO NOTHING; + /* * List of datasets available to be sliced up and passed to VMMs for encrypted * instance local storage. @@ -8557,7 +8605,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - (TRUE, NOW(), NOW(), '256.0.0', NULL) + (TRUE, NOW(), NOW(), '257.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/schema/crdb/fm-resource-deletion/up01.sql b/schema/crdb/fm-resource-deletion/up01.sql new file mode 100644 index 00000000000..03bd514591b --- /dev/null +++ b/schema/crdb/fm-resource-deletion/up01.sql @@ -0,0 +1,2 @@ +ALTER TABLE omicron.public.alert + ADD COLUMN IF NOT EXISTS time_deleted TIMESTAMPTZ; diff --git a/schema/crdb/fm-resource-deletion/up02.sql b/schema/crdb/fm-resource-deletion/up02.sql new file mode 100644 index 00000000000..6244e942636 --- /dev/null +++ b/schema/crdb/fm-resource-deletion/up02.sql @@ -0,0 +1,2 @@ +ALTER TABLE omicron.public.support_bundle + ADD COLUMN IF NOT EXISTS time_deleted TIMESTAMPTZ; diff --git a/schema/crdb/fm-resource-deletion/up03.sql b/schema/crdb/fm-resource-deletion/up03.sql new file mode 100644 index 00000000000..a4325a04fd8 --- /dev/null +++ b/schema/crdb/fm-resource-deletion/up03.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS omicron.public.support_bundle@one_bundle_per_dataset; diff --git a/schema/crdb/fm-resource-deletion/up04.sql b/schema/crdb/fm-resource-deletion/up04.sql new file mode 100644 index 00000000000..17a78519028 --- /dev/null +++ b/schema/crdb/fm-resource-deletion/up04.sql @@ -0,0 +1,3 @@ +CREATE UNIQUE INDEX IF NOT EXISTS one_bundle_per_dataset + ON omicron.public.support_bundle (dataset_id) + WHERE time_deleted IS NULL; diff --git a/schema/crdb/fm-resource-deletion/up04.verify.sql b/schema/crdb/fm-resource-deletion/up04.verify.sql new file mode 100644 index 00000000000..0804dfa6fd3 --- /dev/null +++ b/schema/crdb/fm-resource-deletion/up04.verify.sql @@ -0,0 +1,2 @@ +-- DO NOT EDIT. Generated by test_migration_verification_files. +SELECT CAST(IF((SELECT true WHERE EXISTS (SELECT index_name FROM omicron.crdb_internal.table_indexes WHERE descriptor_name = 'support_bundle' AND index_name = 'one_bundle_per_dataset')),'true','Schema change verification failed: index one_bundle_per_dataset on table support_bundle does not exist') AS BOOL); diff --git a/schema/crdb/fm-resource-deletion/up05.sql b/schema/crdb/fm-resource-deletion/up05.sql new file mode 100644 index 00000000000..89d473e237e --- /dev/null +++ b/schema/crdb/fm-resource-deletion/up05.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS omicron.public.fm_rendezvous_progress ( + singleton BOOL NOT NULL PRIMARY KEY, + latest_processed_sitrep_version INT8 NOT NULL, + + CHECK (singleton = true) +); diff --git a/schema/crdb/fm-resource-deletion/up06.sql b/schema/crdb/fm-resource-deletion/up06.sql new file mode 100644 index 00000000000..568c042f487 --- /dev/null +++ b/schema/crdb/fm-resource-deletion/up06.sql @@ -0,0 +1,6 @@ +INSERT INTO omicron.public.fm_rendezvous_progress ( + singleton, + latest_processed_sitrep_version +) VALUES + (TRUE, 0) +ON CONFLICT DO NOTHING; From 4113a002eae5e42a6c73f19a27a58b57517ab0a3 Mon Sep 17 00:00:00 2001 From: Dan Rosen Date: Mon, 4 May 2026 12:55:29 -0400 Subject: [PATCH 2/2] fix comment --- dev-tools/omdb/src/bin/omdb/nexus.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/nexus.rs b/dev-tools/omdb/src/bin/omdb/nexus.rs index 8390d6c60b2..b984cfa93e5 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus.rs @@ -3741,11 +3741,11 @@ fn print_task_fm_rendezvous(details: &serde_json::Value) { match sitrep_id { Some(id) => { println!(" current sitrep: {id}"); - // The tracker line only appears when a sitrep was processed - // — without one, the activation never reaches the advance - // call, and there is nothing meaningful to print. match latest_processed_sitrep_version { Some(v) => { + // We show this line when `fm_rendezvous_progress_advance` + // succeeded, meaning that either the sitrep successfully + // executed, or it was stale and the work was skipped. println!(" latest fully-processed sitrep version: {v}",) } None => {