Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/console/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,59 @@ pub fn map_kube_error(e: kube::Error, not_found_resource: impl Into<String>) ->
}

impl Error {
fn log_if_server_error(&self) {
match self {
Error::InternalServer { message } => {
tracing::warn!(
status = StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
code = "InternalServerError",
message = %message,
"Console request failed"
);
}
Error::KubeApi { source } => {
tracing::warn!(
status = StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
code = "KubeApiError",
error = %source,
"Console request failed"
);
}
Error::Session { source } => {
tracing::warn!(
status = StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
code = "SessionError",
error = %source,
"Console request failed"
);
}
Error::Json { source } => {
tracing::warn!(
status = StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
code = "JsonError",
error = %source,
"Console request failed"
);
}
Error::ActionRequired {
status,
code,
reason,
message,
..
} if status.is_server_error() => {
tracing::warn!(
status = status.as_u16(),
code = %code,
reason = %reason,
message = %message,
"Console request failed"
);
}
_ => {}
}
}

fn into_response_parts(self) -> (StatusCode, ConsoleErrorResponse) {
let (status, code, reason, message, next_actions, details) = match self {
Error::Unauthorized { message } => (
Expand Down Expand Up @@ -192,6 +245,7 @@ impl Error {

impl IntoResponse for Error {
fn into_response(self) -> Response {
self.log_if_server_error();
let (status, body) = self.into_response_parts();

(status, Json(body)).into_response()
Expand Down
9 changes: 6 additions & 3 deletions src/console/handlers/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn login(
State(state): State<AppState>,
Json(req): Json<LoginRequest>,
) -> Result<impl IntoResponse> {
tracing::info!("Login attempt");
tracing::info!("Console login attempt");

// Validate the bearer token by building a client
let client = create_k8s_client(&req.token).await?;
Expand All @@ -41,8 +41,11 @@ pub async fn login(
let api: kube::Api<Tenant> = kube::Api::all(client);
api.list(&kube::api::ListParams::default().limit(1))
.await
.map_err(|e| {
tracing::warn!("K8s API test failed: {}", e);
.map_err(|error| {
tracing::warn!(
%error,
"Console login Kubernetes API permission check failed"
);
Error::Unauthorized {
message: "Invalid or insufficient permissions".to_string(),
}
Expand Down
44 changes: 33 additions & 11 deletions src/console/handlers/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,15 @@ pub async fn stream_tenant_events(
let tenant_name = tenant.clone();

tokio::spawn(async move {
if let Err(e) = run_event_sse_loop(client, ns, tenant_name, tx, first_json).await {
tracing::warn!("Tenant events SSE ended with error: {}", e);
let log_namespace = ns.clone();
let log_tenant = tenant_name.clone();
if let Err(error) = run_event_sse_loop(client, ns, tenant_name, tx, first_json).await {
tracing::warn!(
namespace = %log_namespace,
tenant = %log_tenant,
%error,
"Tenant events SSE ended with error"
);
}
});

Expand Down Expand Up @@ -101,9 +108,14 @@ async fn run_event_sse_loop(
return Ok(());
}
}
Err(e) => {
tracing::warn!("tenant events snapshot failed: {}", e);
let msg = e.to_string();
Err(error) => {
tracing::warn!(
namespace = %namespace,
tenant = %tenant,
%error,
"tenant events snapshot failed"
);
let msg = error.to_string();
if tx.send(Ok(stream_error_sse_event(&msg))).await.is_err() {
return Ok(());
}
Expand All @@ -119,18 +131,28 @@ async fn run_event_sse_loop(
return Ok(());
}
}
Err(e) => {
tracing::warn!("tenant events snapshot failed: {}", e);
let msg = e.to_string();
Err(error) => {
tracing::warn!(
namespace = %namespace,
tenant = %tenant,
%error,
"tenant events snapshot failed"
);
let msg = error.to_string();
if tx.send(Ok(stream_error_sse_event(&msg))).await.is_err() {
return Ok(());
}
}
}
}
Some(Err(e)) => {
tracing::warn!("Kubernetes Event watch error: {}", e);
let msg = format!("Kubernetes Event watch error: {}", e);
Some(Err(error)) => {
tracing::warn!(
namespace = %namespace,
tenant = %tenant,
%error,
"Kubernetes Event watch error"
);
let msg = format!("Kubernetes Event watch error: {}", error);
if tx.send(Ok(stream_error_sse_event(&msg))).await.is_err() {
return Ok(());
}
Expand Down
29 changes: 18 additions & 11 deletions src/console/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn run(port: u16) -> Result<(), Box<dyn std::error::Error>> {
crate::install_rustls_crypto_provider();
crate::init_tracing();

tracing::info!("Starting RustFS Operator Console on port {}", port);
tracing::info!(port, "Starting RustFS Operator Console");

let jwt_secret = load_jwt_secret();

Expand All @@ -71,8 +71,8 @@ pub async fn run(port: u16) -> Result<(), Box<dyn std::error::Error>> {
}
Err(error) => {
tracing::warn!(
"Kubernetes client unavailable; STS authorization paths fall back to compatibility mode: {}",
error
%error,
"Kubernetes client unavailable; STS authorization paths fall back to compatibility mode"
);
AppState::new(jwt_secret)
}
Expand Down Expand Up @@ -119,7 +119,7 @@ pub async fn run(port: u16) -> Result<(), Box<dyn std::error::Error>> {
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
let listener = tokio::net::TcpListener::bind(addr).await?;

tracing::info!("Console server listening on http://{}", addr);
tracing::info!(%addr, "Console server listening");
tracing::info!("API endpoints:");
tracing::info!(" - POST /api/v1/login");
tracing::info!(" - GET /api/v1/tenants");
Expand All @@ -145,13 +145,13 @@ fn api_routes() -> Router<AppState> {
fn with_static_frontend(app: Router) -> Router {
let Some(static_dir) = static_frontend_dir() else {
tracing::warn!(
"Console frontend static files not found; serving API only. Set {} to enable static UI serving.",
CONSOLE_STATIC_DIR_ENV
env = CONSOLE_STATIC_DIR_ENV,
"Console frontend static files not found; serving API only"
);
return app;
};

tracing::info!("Serving Console frontend from {}", static_dir.display());
tracing::info!(static_dir = %static_dir.display(), "Serving Console frontend");
app.fallback_service(static_frontend_service(static_dir))
}

Expand Down Expand Up @@ -225,9 +225,12 @@ async fn health_check() -> impl IntoResponse {
async fn ready_check() -> impl IntoResponse {
match check_k8s_connectivity().await {
Ok(()) => (StatusCode::OK, "Ready".to_string()),
Err(e) => {
tracing::warn!("Readiness check failed: {}", e);
(StatusCode::SERVICE_UNAVAILABLE, format!("Not ready: {}", e))
Err(error) => {
tracing::warn!(%error, "Readiness check failed");
(
StatusCode::SERVICE_UNAVAILABLE,
format!("Not ready: {}", error),
)
}
}
}
Expand Down Expand Up @@ -290,15 +293,19 @@ fn read_urandom(bytes: &mut [u8]) -> std::io::Result<()> {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
use tower::ServiceExt;

static NEXT_TEMP_DIR_ID: AtomicU64 = AtomicU64::new(0);

fn temp_static_dir() -> std::io::Result<PathBuf> {
let id = NEXT_TEMP_DIR_ID.fetch_add(1, Ordering::Relaxed);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let dir = std::env::temp_dir().join(format!(
"rustfs-console-static-{}-{nanos}",
"rustfs-console-static-{}-{id}-{nanos}",
std::process::id()
));
std::fs::create_dir_all(&dir)?;
Expand Down
2 changes: 1 addition & 1 deletion src/console/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl AppState {
let session_claims = match open_session_token(&self.jwt_secret, token) {
Ok(claims) => claims,
Err(error) => {
tracing::warn!("Console session token validation failed: {}", error);
tracing::warn!(%error, "Console session token validation failed");
return None;
}
};
Expand Down
14 changes: 10 additions & 4 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ impl Context {
) -> Result<Tenant, Error> {
use kube::api::{Patch, PatchParams};

let api: Api<Tenant> = Api::namespaced(self.client.clone(), &resource.namespace()?);
let namespace = resource.namespace()?;
let api: Api<Tenant> = Api::namespaced(self.client.clone(), &namespace);
let name = resource.name();

// Create a JSON merge patch for the status
Expand All @@ -217,11 +218,16 @@ impl Context {
.await
{
Ok(t) => return Ok(t),
_ => {}
Err(error) => {
info!(
tenant = %name,
namespace = %namespace,
%error,
"status update failed; retrying status patch"
);
}
}

info!("status update failed due to conflict, retrieve the latest resource and retry.");

// Retry with the same patch
api.patch_status(&name, &PatchParams::default(), &Patch::Merge(status_patch))
.context(KubeSnafu)
Expand Down
16 changes: 11 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub async fn run(options: ServerOptions) -> Result<(), Box<dyn std::error::Error
&material,
)?))
} else {
warn!("Operator STS TLS disabled by OPERATOR_STS_ENABLED=false");
warn!("Operator STS TLS disabled by OPERATOR_STS_TLS_ENABLED=false");
None
};
let sts_listener = bind_sts_listener(sts_port, tls_server_config.is_some()).await?;
Expand Down Expand Up @@ -244,8 +244,14 @@ async fn run_controller(client: Client, cancel: CancellationToken) {
_ = async {
while let Some(res) = reconcile_stream.next().await {
match res {
Ok((tenant, _)) => info!("reconciled successful, object{:?}", tenant.name),
Err(e) => warn!("reconcile failed: {}", e),
Ok((tenant, _)) => {
info!(
tenant = %tenant.name,
namespace = %tenant.namespace.as_deref().unwrap_or("<unknown>"),
"reconcile completed successfully"
);
}
Err(error) => warn!(%error, "controller reconcile stream item failed"),
}
}
} => {}
Expand Down Expand Up @@ -358,7 +364,7 @@ async fn run_operator_observability_server(

let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
let listener = tokio::net::TcpListener::bind(addr).await?;
info!("operator observability server listening on http://{}", addr);
info!(%addr, "operator observability server listening");
axum::serve(listener, app).await?;
Ok(())
}
Expand Down Expand Up @@ -471,7 +477,7 @@ async fn bind_sts_listener(
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
let listener = tokio::net::TcpListener::bind(addr).await?;
let scheme = if tls_enabled { "https" } else { "http" };
tracing::info!("Operator STS server listening on {}://{}", scheme, addr);
tracing::info!(%scheme, %addr, "Operator STS server listening");
Ok(listener)
}

Expand Down
Loading
Loading