From ca18f60e461ebb682d68c545956ced8adfa6c0ef Mon Sep 17 00:00:00 2001 From: GatewayJ <18332154+GatewayJ@users.noreply.github.com> Date: Wed, 24 Jun 2026 00:17:48 +0800 Subject: [PATCH] fix(logging): improve operator diagnostics --- src/console/error.rs | 54 ++++++++++++++ src/console/handlers/auth.rs | 9 ++- src/console/handlers/events.rs | 44 ++++++++--- src/console/server.rs | 29 +++++--- src/console/state.rs | 2 +- src/context.rs | 14 +++- src/lib.rs | 16 ++-- src/reconcile.rs | 83 +++++++++++++++------ src/reconcile/phases.rs | 92 ++++++++++++++++++++--- src/reconcile/pool_lifecycle.rs | 125 +++++++++++++++++++++++++++----- src/reconcile/provisioning.rs | 63 ++++++++++++++++ src/sts/server.rs | 8 ++ 12 files changed, 455 insertions(+), 84 deletions(-) diff --git a/src/console/error.rs b/src/console/error.rs index 748d87d..49d9d45 100755 --- a/src/console/error.rs +++ b/src/console/error.rs @@ -86,6 +86,59 @@ pub fn map_kube_error(e: kube::Error, not_found_resource: impl Into) -> } impl Error { + fn log_if_server_error(&self) { + match self { + Error::InternalServer { message } => { + tracing::warn!( + status = StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + code = "InternalServerError", + message = %message, + "Console request failed" + ); + } + Error::KubeApi { source } => { + tracing::warn!( + status = StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + code = "KubeApiError", + error = %source, + "Console request failed" + ); + } + Error::Session { source } => { + tracing::warn!( + status = StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + code = "SessionError", + error = %source, + "Console request failed" + ); + } + Error::Json { source } => { + tracing::warn!( + status = StatusCode::INTERNAL_SERVER_ERROR.as_u16(), + code = "JsonError", + error = %source, + "Console request failed" + ); + } + Error::ActionRequired { + status, + code, + reason, + message, + .. + } if status.is_server_error() => { + tracing::warn!( + status = status.as_u16(), + code = %code, + reason = %reason, + message = %message, + "Console request failed" + ); + } + _ => {} + } + } + fn into_response_parts(self) -> (StatusCode, ConsoleErrorResponse) { let (status, code, reason, message, next_actions, details) = match self { Error::Unauthorized { message } => ( @@ -192,6 +245,7 @@ impl Error { impl IntoResponse for Error { fn into_response(self) -> Response { + self.log_if_server_error(); let (status, body) = self.into_response_parts(); (status, Json(body)).into_response() diff --git a/src/console/handlers/auth.rs b/src/console/handlers/auth.rs index b3b43b3..a51a14d 100755 --- a/src/console/handlers/auth.rs +++ b/src/console/handlers/auth.rs @@ -32,7 +32,7 @@ pub async fn login( State(state): State, Json(req): Json, ) -> Result { - tracing::info!("Login attempt"); + tracing::info!("Console login attempt"); // Validate the bearer token by building a client let client = create_k8s_client(&req.token).await?; @@ -41,8 +41,11 @@ pub async fn login( let api: kube::Api = kube::Api::all(client); api.list(&kube::api::ListParams::default().limit(1)) .await - .map_err(|e| { - tracing::warn!("K8s API test failed: {}", e); + .map_err(|error| { + tracing::warn!( + %error, + "Console login Kubernetes API permission check failed" + ); Error::Unauthorized { message: "Invalid or insufficient permissions".to_string(), } diff --git a/src/console/handlers/events.rs b/src/console/handlers/events.rs index 2aa9024..1776c54 100755 --- a/src/console/handlers/events.rs +++ b/src/console/handlers/events.rs @@ -52,8 +52,15 @@ pub async fn stream_tenant_events( let tenant_name = tenant.clone(); tokio::spawn(async move { - if let Err(e) = run_event_sse_loop(client, ns, tenant_name, tx, first_json).await { - tracing::warn!("Tenant events SSE ended with error: {}", e); + let log_namespace = ns.clone(); + let log_tenant = tenant_name.clone(); + if let Err(error) = run_event_sse_loop(client, ns, tenant_name, tx, first_json).await { + tracing::warn!( + namespace = %log_namespace, + tenant = %log_tenant, + %error, + "Tenant events SSE ended with error" + ); } }); @@ -101,9 +108,14 @@ async fn run_event_sse_loop( return Ok(()); } } - Err(e) => { - tracing::warn!("tenant events snapshot failed: {}", e); - let msg = e.to_string(); + Err(error) => { + tracing::warn!( + namespace = %namespace, + tenant = %tenant, + %error, + "tenant events snapshot failed" + ); + let msg = error.to_string(); if tx.send(Ok(stream_error_sse_event(&msg))).await.is_err() { return Ok(()); } @@ -119,18 +131,28 @@ async fn run_event_sse_loop( return Ok(()); } } - Err(e) => { - tracing::warn!("tenant events snapshot failed: {}", e); - let msg = e.to_string(); + Err(error) => { + tracing::warn!( + namespace = %namespace, + tenant = %tenant, + %error, + "tenant events snapshot failed" + ); + let msg = error.to_string(); if tx.send(Ok(stream_error_sse_event(&msg))).await.is_err() { return Ok(()); } } } } - Some(Err(e)) => { - tracing::warn!("Kubernetes Event watch error: {}", e); - let msg = format!("Kubernetes Event watch error: {}", e); + Some(Err(error)) => { + tracing::warn!( + namespace = %namespace, + tenant = %tenant, + %error, + "Kubernetes Event watch error" + ); + let msg = format!("Kubernetes Event watch error: {}", error); if tx.send(Ok(stream_error_sse_event(&msg))).await.is_err() { return Ok(()); } diff --git a/src/console/server.rs b/src/console/server.rs index 7ad0b13..1262192 100755 --- a/src/console/server.rs +++ b/src/console/server.rs @@ -60,7 +60,7 @@ pub async fn run(port: u16) -> Result<(), Box> { crate::install_rustls_crypto_provider(); crate::init_tracing(); - tracing::info!("Starting RustFS Operator Console on port {}", port); + tracing::info!(port, "Starting RustFS Operator Console"); let jwt_secret = load_jwt_secret(); @@ -71,8 +71,8 @@ pub async fn run(port: u16) -> Result<(), Box> { } Err(error) => { tracing::warn!( - "Kubernetes client unavailable; STS authorization paths fall back to compatibility mode: {}", - error + %error, + "Kubernetes client unavailable; STS authorization paths fall back to compatibility mode" ); AppState::new(jwt_secret) } @@ -119,7 +119,7 @@ pub async fn run(port: u16) -> Result<(), Box> { let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); let listener = tokio::net::TcpListener::bind(addr).await?; - tracing::info!("Console server listening on http://{}", addr); + tracing::info!(%addr, "Console server listening"); tracing::info!("API endpoints:"); tracing::info!(" - POST /api/v1/login"); tracing::info!(" - GET /api/v1/tenants"); @@ -145,13 +145,13 @@ fn api_routes() -> Router { fn with_static_frontend(app: Router) -> Router { let Some(static_dir) = static_frontend_dir() else { tracing::warn!( - "Console frontend static files not found; serving API only. Set {} to enable static UI serving.", - CONSOLE_STATIC_DIR_ENV + env = CONSOLE_STATIC_DIR_ENV, + "Console frontend static files not found; serving API only" ); return app; }; - tracing::info!("Serving Console frontend from {}", static_dir.display()); + tracing::info!(static_dir = %static_dir.display(), "Serving Console frontend"); app.fallback_service(static_frontend_service(static_dir)) } @@ -225,9 +225,12 @@ async fn health_check() -> impl IntoResponse { async fn ready_check() -> impl IntoResponse { match check_k8s_connectivity().await { Ok(()) => (StatusCode::OK, "Ready".to_string()), - Err(e) => { - tracing::warn!("Readiness check failed: {}", e); - (StatusCode::SERVICE_UNAVAILABLE, format!("Not ready: {}", e)) + Err(error) => { + tracing::warn!(%error, "Readiness check failed"); + ( + StatusCode::SERVICE_UNAVAILABLE, + format!("Not ready: {}", error), + ) } } } @@ -290,15 +293,19 @@ fn read_urandom(bytes: &mut [u8]) -> std::io::Result<()> { #[cfg(test)] mod tests { use super::*; + use std::sync::atomic::{AtomicU64, Ordering}; use tower::ServiceExt; + static NEXT_TEMP_DIR_ID: AtomicU64 = AtomicU64::new(0); + fn temp_static_dir() -> std::io::Result { + let id = NEXT_TEMP_DIR_ID.fetch_add(1, Ordering::Relaxed); let nanos = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_nanos(); let dir = std::env::temp_dir().join(format!( - "rustfs-console-static-{}-{nanos}", + "rustfs-console-static-{}-{id}-{nanos}", std::process::id() )); std::fs::create_dir_all(&dir)?; diff --git a/src/console/state.rs b/src/console/state.rs index 980cf9a..b77474e 100755 --- a/src/console/state.rs +++ b/src/console/state.rs @@ -72,7 +72,7 @@ impl AppState { let session_claims = match open_session_token(&self.jwt_secret, token) { Ok(claims) => claims, Err(error) => { - tracing::warn!("Console session token validation failed: {}", error); + tracing::warn!(%error, "Console session token validation failed"); return None; } }; diff --git a/src/context.rs b/src/context.rs index a3ccff2..de39bc0 100755 --- a/src/context.rs +++ b/src/context.rs @@ -198,7 +198,8 @@ impl Context { ) -> Result { use kube::api::{Patch, PatchParams}; - let api: Api = Api::namespaced(self.client.clone(), &resource.namespace()?); + let namespace = resource.namespace()?; + let api: Api = Api::namespaced(self.client.clone(), &namespace); let name = resource.name(); // Create a JSON merge patch for the status @@ -217,11 +218,16 @@ impl Context { .await { Ok(t) => return Ok(t), - _ => {} + Err(error) => { + info!( + tenant = %name, + namespace = %namespace, + %error, + "status update failed; retrying status patch" + ); + } } - info!("status update failed due to conflict, retrieve the latest resource and retry."); - // Retry with the same patch api.patch_status(&name, &PatchParams::default(), &Patch::Merge(status_patch)) .context(KubeSnafu) diff --git a/src/lib.rs b/src/lib.rs index 253a811..fa402de 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,7 +129,7 @@ pub async fn run(options: ServerOptions) -> Result<(), Box info!("reconciled successful, object{:?}", tenant.name), - Err(e) => warn!("reconcile failed: {}", e), + Ok((tenant, _)) => { + info!( + tenant = %tenant.name, + namespace = %tenant.namespace.as_deref().unwrap_or(""), + "reconcile completed successfully" + ); + } + Err(error) => warn!(%error, "controller reconcile stream item failed"), } } } => {} @@ -358,7 +364,7 @@ async fn run_operator_observability_server( let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); let listener = tokio::net::TcpListener::bind(addr).await?; - info!("operator observability server listening on http://{}", addr); + info!(%addr, "operator observability server listening"); axum::serve(listener, app).await?; Ok(()) } @@ -471,7 +477,7 @@ async fn bind_sts_listener( let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); let listener = tokio::net::TcpListener::bind(addr).await?; let scheme = if tls_enabled { "https" } else { "http" }; - tracing::info!("Operator STS server listening on {}://{}", scheme, addr); + tracing::info!(%scheme, %addr, "Operator STS server listening"); Ok(listener) } diff --git a/src/reconcile.rs b/src/reconcile.rs index ed69075..228804e 100755 --- a/src/reconcile.rs +++ b/src/reconcile.rs @@ -25,7 +25,7 @@ use kube::runtime::events::EventType; use snafu::Snafu; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; mod phases; mod pool_lifecycle; @@ -60,9 +60,10 @@ pub async fn reconcile_rustfs(tenant: Arc, ctx: Arc) -> Result< if latest_tenant.metadata.deletion_timestamp.is_some() { debug!( - "tenant {} is deleted, deletion_timestamp is {:?}", - tenant.name(), - latest_tenant.metadata.deletion_timestamp + tenant = %tenant.name(), + namespace = %ns, + deletion_timestamp = ?latest_tenant.metadata.deletion_timestamp, + "tenant is deleting; skipping reconcile" ); return Ok(Action::await_change()); } @@ -401,8 +402,12 @@ async fn cleanup_stuck_terminating_pods_on_down_nodes( let pod_name = pod.name_any(); warn!( - "Node {} is detected down. Pod {} is terminating on it.", - node_name, pod_name + tenant = %tenant.name(), + namespace = %namespace, + node = %node_name, + pod = %pod_name, + policy = ?policy, + "terminating pod is scheduled on a down node" ); let delete_params = match policy { crate::types::v1alpha1::k8s::PodDeletionPolicyWhenNodeIsDown::DoNothing => continue, @@ -516,9 +521,7 @@ fn requeue_after(duration: Duration) -> Action { Action::requeue(duration) } -pub fn error_policy(_object: Arc, error: &Error, _ctx: Arc) -> Action { - error!("error_policy: {:?}", error); - +pub fn error_policy(object: Arc, error: &Error, _ctx: Arc) -> Action { // Status updates happen during reconciliation before errors are returned. // The reconcile function sets appropriate conditions (Ready=False, Degraded=True) // and records events for failures before propagating errors. @@ -527,7 +530,7 @@ pub fn error_policy(_object: Arc, error: &Error, _ctx: Arc) -> // Use different requeue strategies based on error type: // - User-fixable errors (credentials, validation): Longer intervals to reduce spam // - Transient errors (API, network): Shorter intervals for quick recovery - match error { + let requeue = match error { Error::Context { source } => match source { // Credential / KMS validation errors - require user intervention // Use 60-second requeue to reduce event/log spam while user fixes the issue @@ -537,16 +540,14 @@ pub fn error_policy(_object: Arc, error: &Error, _ctx: Arc) -> | context::Error::CredentialSecretTooShort { .. } | context::Error::KmsSecretNotFound { .. } | context::Error::KmsSecretMissingKey { .. } - | context::Error::KmsConfigInvalid { .. } => requeue_after(Duration::from_secs(60)), + | context::Error::KmsConfigInvalid { .. } => Duration::from_secs(60), // Kubernetes API errors - might be transient (network, API server issues) // Use shorter requeue for faster recovery - context::Error::Kube { .. } | context::Error::Record { .. } => { - requeue_after(Duration::from_secs(5)) - } + context::Error::Kube { .. } | context::Error::Record { .. } => Duration::from_secs(5), // Other context errors - use moderate requeue - _ => requeue_after(Duration::from_secs(15)), + _ => Duration::from_secs(15), }, // Type errors - validation issues, use moderate requeue @@ -555,16 +556,56 @@ pub fn error_policy(_object: Arc, error: &Error, _ctx: Arc) -> // Use 60-second requeue to reduce event/log spam while user fixes the issue types::error::Error::ImmutableFieldModified { .. } | types::error::Error::InvalidTenantName { .. } - | types::error::Error::PoolDeleteBlocked { .. } => { - requeue_after(Duration::from_secs(60)) - } + | types::error::Error::PoolDeleteBlocked { .. } => Duration::from_secs(60), // Other type errors - use moderate requeue - _ => requeue_after(Duration::from_secs(15)), + _ => Duration::from_secs(15), }, - Error::TlsBlocked { .. } => requeue_after(Duration::from_secs(60)), - Error::TlsPending { .. } => requeue_after(Duration::from_secs(20)), + Error::TlsBlocked { .. } => Duration::from_secs(60), + Error::TlsPending { .. } => Duration::from_secs(20), + }; + + warn!( + tenant = %object.name(), + namespace = ?object.namespace(), + reason = reconcile_error_reason(error), + requeue_seconds = requeue.as_secs(), + %error, + "reconcile failed; scheduling retry" + ); + + requeue_after(requeue) +} + +fn reconcile_error_reason(error: &Error) -> &'static str { + match error { + Error::Context { source } => match source { + context::Error::CredentialSecretNotFound { .. } => "CredentialSecretNotFound", + context::Error::CredentialSecretMissingKey { .. } => "CredentialSecretMissingKey", + context::Error::CredentialSecretInvalidEncoding { .. } => { + "CredentialSecretInvalidEncoding" + } + context::Error::CredentialSecretTooShort { .. } => "CredentialSecretTooShort", + context::Error::KmsSecretNotFound { .. } => "KmsSecretNotFound", + context::Error::KmsSecretMissingKey { .. } => "KmsSecretMissingKey", + context::Error::KmsConfigInvalid { .. } => "KmsConfigInvalid", + context::Error::Kube { .. } => "KubernetesApiError", + context::Error::Record { .. } => "KubernetesEventRecordError", + context::Error::Types { .. } => "TypeError", + context::Error::Serde { .. } => "SerdeError", + }, + Error::Types { source } => match source { + types::error::Error::InvalidTenantName { .. } => "InvalidTenantName", + types::error::Error::InvalidPoolSpec { .. } => "InvalidPoolSpec", + types::error::Error::ImmutableFieldModified { .. } => "ImmutableFieldModified", + types::error::Error::PoolDeleteBlocked { .. } => "PoolDeleteBlocked", + types::error::Error::NoNamespace => "NoNamespace", + types::error::Error::InternalError { .. } => "InternalError", + types::error::Error::SerdeJson { .. } => "SerdeJsonError", + }, + Error::TlsBlocked { .. } => "TlsBlocked", + Error::TlsPending { .. } => "TlsPending", } } diff --git a/src/reconcile/phases.rs b/src/reconcile/phases.rs index bb89a30..faef084 100644 --- a/src/reconcile/phases.rs +++ b/src/reconcile/phases.rs @@ -31,7 +31,7 @@ use kube::runtime::controller::Action; use kube::runtime::events::EventType; use std::collections::HashSet; use std::time::Duration; -use tracing::{debug, error, warn}; +use tracing::{debug, info, warn}; #[derive(Default)] pub(super) struct PoolReconcileSummary { @@ -470,7 +470,14 @@ pub(super) async fn reconcile_pool_statefulsets( created_missing_pool = true; } Err(e) => { - error!("Failed to get StatefulSet {}: {}", ss_name, e); + warn!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + statefulset = %ss_name, + error = %e, + "failed to get pool StatefulSet" + ); let status_error = StatusError::from_context_error(&e); patch_status_error(ctx, tenant, &status_error).await; return Err(e.into()); @@ -600,10 +607,23 @@ async fn reconcile_existing_pool_statefulset( summary: &mut PoolReconcileSummary, ) -> Result<(), Error> { let ss_name = existing_ss.name_any(); - debug!("StatefulSet {} exists, checking if update needed", ss_name); + debug!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + statefulset = %ss_name, + "checking existing pool StatefulSet" + ); if let Err(e) = tenant.validate_statefulset_update_with_tls_plan(&existing_ss, pool, tls_plan) { - error!("StatefulSet {} update validation failed: {}", ss_name, e); + warn!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + statefulset = %ss_name, + error = %e, + "StatefulSet update validation failed" + ); let status_error = StatusError::statefulset_update_validation_failed(&ss_name); patch_status_error(ctx, tenant, &status_error).await; @@ -617,7 +637,13 @@ async fn reconcile_existing_pool_statefulset( ) .await? { - debug!("StatefulSet {} needs update, applying changes", ss_name); + info!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + statefulset = %ss_name, + "applying StatefulSet update" + ); let _ = ctx .record( @@ -640,9 +666,21 @@ async fn reconcile_existing_pool_statefulset( return Err(e.into()); } - debug!("StatefulSet {} updated successfully", ss_name); + info!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + statefulset = %ss_name, + "StatefulSet updated successfully" + ); } else { - debug!("StatefulSet {} is up to date, no changes needed", ss_name); + debug!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + statefulset = %ss_name, + "StatefulSet is up to date" + ); } let ss = context_result( @@ -667,7 +705,13 @@ async fn reconcile_missing_pool_statefulset( tls_plan: &TlsPlan, summary: &mut PoolReconcileSummary, ) -> Result<(), Error> { - debug!("StatefulSet {} not found, creating", ss_name); + info!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + statefulset = %ss_name, + "creating missing StatefulSet" + ); let _ = ctx .record( @@ -690,7 +734,13 @@ async fn reconcile_missing_pool_statefulset( return Err(e.into()); } - debug!("StatefulSet {} created successfully", ss_name); + info!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + statefulset = %ss_name, + "StatefulSet created successfully" + ); let ss = context_result( ctx.get::(ss_name, namespace) @@ -751,6 +801,7 @@ pub(super) async fn finalize_tenant_status( tls_plan: TlsPlan, ) -> Result { let mut builder = StatusBuilder::from_tenant(tenant); + let pool_count = summary.pool_statuses.len(); builder.set_pool_statuses(summary.pool_statuses); if let Some(tls_status) = tls_plan.status { builder.set_tls_status(tls_status); @@ -893,7 +944,19 @@ pub(super) async fn finalize_tenant_status( }; let status = builder.build(); - debug!("Patching tenant status if changed: {:?}", status); + debug!( + tenant = %tenant.name(), + namespace = ?tenant.namespace(), + current_state = %status.current_state, + observed_generation = ?status.observed_generation, + pool_count, + condition_count = status.conditions.len(), + reason = event_reason.as_str(), + condition = event_condition.as_str(), + ready_replicas = summary.ready_replicas, + total_replicas = summary.total_replicas, + "patching Tenant status if changed" + ); patch_status_and_record( ctx, tenant, @@ -907,12 +970,19 @@ pub(super) async fn finalize_tenant_status( if let Some(requeue_after) = summary.lifecycle_requeue_after { debug!( + tenant = %tenant.name(), + namespace = ?tenant.namespace(), seconds = requeue_after.as_secs(), "Pool lifecycle is active, requeuing" ); Ok(Action::requeue(requeue_after)) } else if summary.any_updating { - debug!("Pools are updating, requeuing in 10 seconds"); + debug!( + tenant = %tenant.name(), + namespace = ?tenant.namespace(), + seconds = 10, + "Pools are updating, requeuing" + ); Ok(Action::requeue(Duration::from_secs(10))) } else { Ok(Action::await_change()) diff --git a/src/reconcile/pool_lifecycle.rs b/src/reconcile/pool_lifecycle.rs index fe79602..b97ec63 100644 --- a/src/reconcile/pool_lifecycle.rs +++ b/src/reconcile/pool_lifecycle.rs @@ -19,7 +19,7 @@ use k8s_openapi::api::apps::v1::StatefulSet; use kube::api::{DeleteParams, PropagationPolicy}; use kube::runtime::events::EventType; use sha2::{Digest, Sha256}; -use tracing::warn; +use tracing::{info, warn}; use super::{Error, context}; use crate::context::Context; @@ -188,6 +188,14 @@ async fn reconcile_single_pool_lifecycle( let client = match rustfs_admin_client(ctx, tenant).await { Ok(client) => client, Err(error) => { + warn!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + reason = "RustfsAdminClientError", + %error, + "RustFS admin client unavailable for decommissioned pool cleanup" + ); return cleanup_retriable_decision( status, "RustfsAdminClientError", @@ -230,6 +238,15 @@ async fn reconcile_single_pool_lifecycle( let client = match rustfs_admin_client(ctx, tenant).await { Ok(client) => client, Err(error) => { + warn!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + request_id = %request.request_id, + reason = "RustfsAdminClientError", + %error, + "RustFS admin client unavailable for pool lifecycle request" + ); return retriable_decision( Some(request.request_id.clone()), "RustfsAdminClientError", @@ -241,6 +258,15 @@ async fn reconcile_single_pool_lifecycle( let matched_pool = match find_rustfs_pool(&client, tenant, namespace, pool).await { Ok(pool_item) => pool_item, Err(error) if error.is_retriable() => { + warn!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + request_id = %request.request_id, + reason = error.reason(), + message = error.message(), + "RustFS pool mapping is not ready" + ); return retriable_decision( Some(request.request_id.clone()), error.reason(), @@ -248,6 +274,15 @@ async fn reconcile_single_pool_lifecycle( ); } Err(error) => { + warn!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + request_id = %request.request_id, + reason = error.reason(), + message = error.message(), + "RustFS pool mapping failed" + ); return failed_decision( Some(request.request_id.clone()), error.reason(), @@ -261,7 +296,7 @@ async fn reconcile_single_pool_lifecycle( return active_lifecycle_decision(); } - if request.action == DecommissionAction::Start + let should_start = request.action == DecommissionAction::Start && match should_start_decommission( existing_state.as_ref(), existing.as_ref(), @@ -276,33 +311,89 @@ async fn reconcile_single_pool_lifecycle( &message, ); } + }; + if should_start { + match client.start_pool_decommission_by_id(&pool_id).await { + Ok(()) => { + info!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + rustfs_pool_id = %pool_id, + request_id = %request.request_id, + "started RustFS pool decommission" + ); + } + Err(error) => { + warn!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + rustfs_pool_id = %pool_id, + request_id = %request.request_id, + reason = "RustfsDecommissionStartFailed", + %error, + "failed to start RustFS pool decommission" + ); + return retriable_decision( + Some(request.request_id.clone()), + "RustfsDecommissionStartFailed", + &error.to_string(), + ); + } } - && let Err(error) = client.start_pool_decommission_by_id(&pool_id).await - { - return retriable_decision( - Some(request.request_id.clone()), - "RustfsDecommissionStartFailed", - &error.to_string(), - ); } - if request.action == DecommissionAction::Cancel + let should_cancel = request.action == DecommissionAction::Cancel && !matches!( existing_state, Some(PoolLifecycleState::DecommissionCanceled) - ) - && let Err(error) = client.cancel_pool_decommission_by_id(&pool_id).await - { - return retriable_decision( - Some(request.request_id.clone()), - "RustfsDecommissionCancelFailed", - &error.to_string(), ); + if should_cancel { + match client.cancel_pool_decommission_by_id(&pool_id).await { + Ok(()) => { + info!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + rustfs_pool_id = %pool_id, + request_id = %request.request_id, + "canceled RustFS pool decommission" + ); + } + Err(error) => { + warn!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + rustfs_pool_id = %pool_id, + request_id = %request.request_id, + reason = "RustfsDecommissionCancelFailed", + %error, + "failed to cancel RustFS pool decommission" + ); + return retriable_decision( + Some(request.request_id.clone()), + "RustfsDecommissionCancelFailed", + &error.to_string(), + ); + } + } } let rustfs_status = match client.pool_status_by_id(&pool_id).await { Ok(status) => status, Err(error) => { + warn!( + tenant = %tenant.name(), + namespace = %namespace, + pool = %pool.name, + rustfs_pool_id = %pool_id, + request_id = %request.request_id, + reason = "RustfsDecommissionStatusFailed", + %error, + "failed to query RustFS pool decommission status" + ); return retriable_decision( Some(request.request_id.clone()), "RustfsDecommissionStatusFailed", diff --git a/src/reconcile/provisioning.rs b/src/reconcile/provisioning.rs index 91fde2d..6fa3a92 100644 --- a/src/reconcile/provisioning.rs +++ b/src/reconcile/provisioning.rs @@ -27,6 +27,7 @@ use k8s_openapi::api::core::v1::{ConfigMap, Secret}; use serde_json::Value; use sha2::{Digest, Sha256}; use std::collections::{BTreeMap, BTreeSet}; +use tracing::{info, warn}; pub(super) struct ProvisioningReconcileResult { pub status: ProvisioningStatus, @@ -69,6 +70,7 @@ impl ProvisioningRun<'_> { } fn push_policy(&mut self, item: ProvisioningItemStatus) { + self.log_item_transition("policy", self.previous_policy(&item.name), &item); if item.state == ProvisioningItemState::Failed.as_str() { self.failures .push((reason_from_str(&item.reason), item_message(&item))); @@ -77,6 +79,7 @@ impl ProvisioningRun<'_> { } fn push_user(&mut self, item: ProvisioningItemStatus) { + self.log_item_transition("user", self.previous_user(&item.name), &item); if item.state == ProvisioningItemState::Failed.as_str() { self.failures .push((reason_from_str(&item.reason), item_message(&item))); @@ -85,6 +88,7 @@ impl ProvisioningRun<'_> { } fn push_bucket(&mut self, item: ProvisioningItemStatus) { + self.log_item_transition("bucket", self.previous_bucket(&item.name), &item); if item.state == ProvisioningItemState::Failed.as_str() { self.failures .push((reason_from_str(&item.reason), item_message(&item))); @@ -92,6 +96,50 @@ impl ProvisioningRun<'_> { self.status.buckets.push(item); } + fn log_item_transition( + &self, + item_type: &'static str, + previous: Option<&ProvisioningItemStatus>, + item: &ProvisioningItemStatus, + ) { + let changed = match previous { + Some(previous) => { + previous.state != item.state + || previous.reason != item.reason + || previous.message != item.message + } + None => true, + }; + if !changed { + return; + } + + let message = item.message.as_deref().unwrap_or(""); + if item.state == ProvisioningItemState::Failed.as_str() { + warn!( + tenant = %self.tenant.name(), + namespace = %self.namespace, + item_type = %item_type, + item = %item.name, + state = %item.state, + reason = %item.reason, + message = %message, + "RustFS provisioning item failed" + ); + } else { + info!( + tenant = %self.tenant.name(), + namespace = %self.namespace, + item_type = %item_type, + item = %item.name, + state = %item.state, + reason = %item.reason, + message = %message, + "RustFS provisioning item state changed" + ); + } + } + fn item( &self, previous: Option<&ProvisioningItemStatus>, @@ -271,6 +319,14 @@ pub(super) async fn reconcile_provisioning( Ok(client) => client, Err(error) => { let (reason, message, pending) = client_error_outcome(error); + warn!( + tenant = %tenant.name(), + namespace = %namespace, + reason = reason.as_str(), + pending, + message = %message, + "RustFS provisioning admin client unavailable" + ); if pending { run.mark_all_active(ProvisioningItemState::Pending, reason, &message); } else { @@ -296,6 +352,13 @@ pub(super) async fn reconcile_provisioning( let mut live_policies = match load_live_policies(&client, tenant).await { Ok(policies) => policies, Err(message) => { + warn!( + tenant = %tenant.name(), + namespace = %namespace, + reason = Reason::PolicyApplyFailed.as_str(), + message = %message, + "RustFS provisioning failed to load live policies" + ); run.fail_all_active(Reason::PolicyApplyFailed, &message); run.prepare_status(ProvisioningPhase::Failed); return ProvisioningReconcileResult { diff --git a/src/sts/server.rs b/src/sts/server.rs index b463f2f..2399d52 100644 --- a/src/sts/server.rs +++ b/src/sts/server.rs @@ -198,6 +198,7 @@ async fn process_assume_role_request( Err(error) => { tracing::warn!( tenant_namespace = %parsed_request.tenant_namespace, + tenant = %parsed_request.tenant_name, error = %error.code(), "TokenReview denied STS request" ); @@ -213,6 +214,7 @@ async fn process_assume_role_request( Err(error) => { tracing::warn!( tenant_namespace = %parsed_request.tenant_namespace, + tenant = %parsed_request.tenant_name, error = %error.code(), "Failed listing PolicyBindings for STS authorization" ); @@ -229,6 +231,7 @@ async fn process_assume_role_request( if matching_bindings.is_empty() { tracing::warn!( tenant_namespace = %parsed_request.tenant_namespace, + tenant = %parsed_request.tenant_name, service_account_namespace = %identity.namespace, service_account = %identity.service_account, "No PolicyBinding matched service account for this STS request" @@ -247,6 +250,7 @@ async fn process_assume_role_request( Err(error) => { tracing::warn!( tenant_namespace = %parsed_request.tenant_namespace, + tenant = %parsed_request.tenant_name, error = %error.code(), "Failed selecting tenant for STS request" ); @@ -259,6 +263,7 @@ async fn process_assume_role_request( Err(error) => { tracing::warn!( tenant_namespace = %parsed_request.tenant_namespace, + tenant = %parsed_request.tenant_name, error = %error.code(), "Failed creating RustFS admin client" ); @@ -272,6 +277,7 @@ async fn process_assume_role_request( Err(error) => { tracing::warn!( tenant_namespace = %parsed_request.tenant_namespace, + tenant = %parsed_request.tenant_name, error = %error.code(), "Failed resolving PolicyBinding policy documents" ); @@ -292,6 +298,7 @@ async fn process_assume_role_request( Err(error) => { tracing::warn!( tenant_namespace = %parsed_request.tenant_namespace, + tenant = %parsed_request.tenant_name, error = %error.code(), "Failed to build merged STS session policy" ); @@ -311,6 +318,7 @@ async fn process_assume_role_request( Err(error) => { tracing::warn!( tenant_namespace = %parsed_request.tenant_namespace, + tenant = %parsed_request.tenant_name, error = %error, "Failed calling RustFS AssumeRole" );