diff --git a/Cargo.lock b/Cargo.lock index b7b33672..8ec0c0af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4780,6 +4780,7 @@ dependencies = [ "percent-encoding", "rand 0.9.4", "thiserror 2.0.18", + "tokio", ] [[package]] @@ -6924,6 +6925,7 @@ dependencies = [ "scouter-settings", "scouter-sql", "scouter-tonic", + "scouter-tracing", "scouter-types", "serde", "serde_json", @@ -6941,6 +6943,7 @@ dependencies = [ "tower", "tower-http", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "url", "utoipa", @@ -8389,6 +8392,22 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac28f2d093c6c477eaa76b23525478f38de514fa9aeb1285738d4b97a9552fc" +dependencies = [ + "js-sys", + "opentelemetry", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 23f6e43e..f99191b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ num-traits = "0.*" object_store = { version = "0.*", features = ["aws", "azure", "gcp" ] } opentelemetry = { version = "0.31.0" } opentelemetry-stdout = "0.31.0" -opentelemetry_sdk = { version = "0.31.0", features = ["trace"] } +opentelemetry_sdk = { version = "0.31.0", features = ["trace", "experimental_trace_batch_span_processor_with_async_runtime"] } opentelemetry-proto = { version = "0.31.0" , features = ["trace"] } opentelemetry-otlp = { version = "0.31.0", features = ["grpc-tonic", "trace"] } owo-colors = "4.*" @@ -121,6 +121,7 @@ utoipa-axum = "0" utoipa-swagger-ui = { version = "9", features = ["axum"] } tokio-util = "0.*" tracing = "0.*" +tracing-opentelemetry = "0.32.1" tracing-subscriber = {version = "0.*", features = ["json", "time", "env-filter"] } uuid = { version = "1.*", features = ["v4", "v7"] } url = "2.*" diff --git a/crates/scouter_server/Cargo.toml b/crates/scouter_server/Cargo.toml index 49f2c29b..66cce692 100644 --- a/crates/scouter_server/Cargo.toml +++ b/crates/scouter_server/Cargo.toml @@ -22,6 +22,7 @@ scouter-drift = { workspace = true, features = ["sql"] } scouter-observability = { workspace = true } scouter-sql = { workspace = true } scouter-tonic = { workspace = true, features = ["server"] } +scouter-tracing = { workspace = true } arrow = { workspace = true } @@ -55,6 +56,7 @@ thiserror = { workspace = true } tokio = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } utoipa = { workspace = true } utoipa-axum = { workspace = true } diff --git a/crates/scouter_server/src/api/setup.rs b/crates/scouter_server/src/api/setup.rs index f46cdd1f..1b806952 100644 --- a/crates/scouter_server/src/api/setup.rs +++ b/crates/scouter_server/src/api/setup.rs @@ -17,9 +17,10 @@ use scouter_settings::{ use scouter_sql::PostgresClient; use scouter_sql::sql::schema::User; use scouter_sql::sql::traits::UserSqlLogic; +use scouter_tracing::tracer::{OtlpTracingHandle, init_server_otlp_tracing}; use sqlx::{Pool, Postgres}; use std::sync::Arc; -use tracing::{debug, info, instrument}; +use tracing::{debug, info, instrument, warn}; use tracing_subscriber::fmt::time::UtcTime; use tracing_subscriber::prelude::*; use tracing_subscriber::{EnvFilter, fmt}; @@ -69,6 +70,7 @@ pub struct ScouterSetupComponents { pub genai_service: Arc, pub dataset_manager: Arc, pub eval_scenario_service: Arc, + pub otlp_tracing: Option, } fn build_filter(log_level: &str) -> EnvFilter { @@ -82,13 +84,20 @@ fn build_filter(log_level: &str) -> EnvFilter { }) } -fn build_tracer(filter: EnvFilter) -> AnyhowResult<()> { +fn build_tracer( + filter: EnvFilter, + config: &ScouterServerConfig, +) -> AnyhowResult> { let timer = UtcTime::new( time::format_description::parse( "[year]-[month]-[day]T[hour repr:24]:[minute]:[second]::[subsecond digits:4]", ) .context("Failed to parse time format")?, ); + let (otlp_tracing, otlp_error) = match init_server_otlp_tracing(&config.otel_settings) { + Ok(handle) => (handle, None), + Err(err) => (None, Some(err)), + }; tracing_subscriber::registry() .with(filter) @@ -98,19 +107,35 @@ fn build_tracer(filter: EnvFilter) -> AnyhowResult<()> { .with_thread_ids(true) .with_timer(timer), ) + .with( + otlp_tracing + .as_ref() + .map(|handle| tracing_opentelemetry::layer().with_tracer(handle.tracer())), + ) .try_init() .ok(); - Ok(()) + if let Some(err) = otlp_error { + warn!("Failed to initialize OTLP tracing: {}", err); + } + + Ok(otlp_tracing) } -fn build_json_tracer(filter: EnvFilter) -> AnyhowResult<()> { +fn build_json_tracer( + filter: EnvFilter, + config: &ScouterServerConfig, +) -> AnyhowResult> { let timer = UtcTime::new( time::format_description::parse( "[year]-[month]-[day]T[hour repr:24]:[minute]:[second]::[subsecond digits:4]", ) .context("Failed to parse time format")?, ); + let (otlp_tracing, otlp_error) = match init_server_otlp_tracing(&config.otel_settings) { + Ok(handle) => (handle, None), + Err(err) => (None, Some(err)), + }; tracing_subscriber::registry() .with(filter) @@ -122,10 +147,19 @@ fn build_json_tracer(filter: EnvFilter) -> AnyhowResult<()> { .with_thread_ids(true) .with_timer(timer), ) + .with( + otlp_tracing + .as_ref() + .map(|handle| tracing_opentelemetry::layer().with_tracer(handle.tracer())), + ) .try_init() .ok(); - Ok(()) + if let Some(err) = otlp_error { + warn!("Failed to initialize OTLP tracing: {}", err); + } + + Ok(otlp_tracing) } impl ScouterSetupComponents { @@ -133,10 +167,13 @@ impl ScouterSetupComponents { let config = Arc::new(ScouterServerConfig::new().await); // start logging - let logging = Self::setup_logging().await; - if logging.is_err() { - debug!("Failed to setup logging. {:?}", logging.err()); - } + let otlp_tracing = match Self::setup_logging(&config).await { + Ok(handle) => handle, + Err(err) => { + debug!("Failed to setup logging. {:?}", err); + None + } + }; let db_pool = Self::setup_database(&config.database_settings).await?; @@ -238,6 +275,7 @@ impl ScouterSetupComponents { genai_service, dataset_manager, eval_scenario_service, + otlp_tracing, }) } @@ -319,7 +357,9 @@ impl ScouterSetupComponents { Ok(Arc::new(service)) } - async fn setup_logging() -> AnyhowResult<()> { + async fn setup_logging( + config: &ScouterServerConfig, + ) -> AnyhowResult> { let log_level = std::env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()); let use_json = std::env::var("LOG_JSON") .unwrap_or_else(|_| "false".to_string()) @@ -327,14 +367,14 @@ impl ScouterSetupComponents { let filter = build_filter(&log_level); - if use_json { - build_json_tracer(filter)?; + let otlp_tracing = if use_json { + build_json_tracer(filter, config)? } else { - build_tracer(filter)?; - } + build_tracer(filter, config)? + }; info!("Logging setup successfully"); - Ok(()) + Ok(otlp_tracing) } // setup default users diff --git a/crates/scouter_server/src/api/state.rs b/crates/scouter_server/src/api/state.rs index ee7f77ed..6af647a6 100644 --- a/crates/scouter_server/src/api/state.rs +++ b/crates/scouter_server/src/api/state.rs @@ -13,6 +13,7 @@ use scouter_dataframe::parquet::tracing::summary::TraceSummaryService; use scouter_settings::ScouterServerConfig; use scouter_sql::sql::aggregator::shutdown_trace_cache; use scouter_sql::sql::cache::entity_cache; +use scouter_tracing::tracer::OtlpTracingHandle; use scouter_types::{ DriftType, ServerRecords, TagRecord, TraceServerRecord, contracts::ScouterServerError, }; @@ -34,6 +35,7 @@ pub struct AppState { pub genai_service: Arc, pub dataset_manager: Arc, pub eval_scenario_service: Arc, + pub otlp_tracing: Option, } impl AppState { @@ -52,6 +54,11 @@ impl AppState { self.genai_service.signal_shutdown().await; self.dataset_manager.shutdown().await; self.eval_scenario_service.signal_shutdown().await; + if let Some(otlp_tracing) = &self.otlp_tracing { + otlp_tracing.shutdown().unwrap_or_else(|e| { + error!("Failed to shutdown OTLP tracing: {:?}", e); + }); + } self.db_pool.close().await; } diff --git a/crates/scouter_server/src/lib.rs b/crates/scouter_server/src/lib.rs index 96a377a8..b1e017ae 100644 --- a/crates/scouter_server/src/lib.rs +++ b/crates/scouter_server/src/lib.rs @@ -52,6 +52,7 @@ pub async fn create_app_state() -> Result, anyhow::Error> { genai_service: scouter_components.genai_service, dataset_manager: scouter_components.dataset_manager, eval_scenario_service: scouter_components.eval_scenario_service, + otlp_tracing: scouter_components.otlp_tracing, }); Ok(app_state) @@ -97,7 +98,9 @@ pub async fn start_server_with_mode(mode: ServeMode) -> Result<(), anyhow::Error info!("HTTP server shut down gracefully"); } ServeMode::Grpc => { - start_grpc_server(app_state).await?; + let result = start_grpc_server(Arc::clone(&app_state)).await; + app_state.shutdown().await; + result?; info!("gRPC server shut down gracefully"); } ServeMode::Both => { diff --git a/crates/scouter_settings/src/lib.rs b/crates/scouter_settings/src/lib.rs index dabdf6be..789b7c95 100644 --- a/crates/scouter_settings/src/lib.rs +++ b/crates/scouter_settings/src/lib.rs @@ -9,6 +9,7 @@ pub mod events; pub mod genai; pub mod grpc; pub mod http; +pub mod otel; pub mod polling; pub mod storage; @@ -18,6 +19,7 @@ pub use database::DatabaseSettings; pub use events::{KafkaSettings, RabbitMQSettings, RedisSettings}; pub use genai::GenAISettings; pub use http::HttpConfig; +pub use otel::OtelSettings; pub use polling::PollingSettings; pub use storage::ObjectStorageSettings; @@ -46,6 +48,7 @@ pub struct ScouterServerConfig { pub auth_settings: AuthSettings, pub bootstrap_key: String, pub storage_settings: ObjectStorageSettings, + pub otel_settings: OtelSettings, } impl ScouterServerConfig { @@ -122,6 +125,7 @@ impl ScouterServerConfig { genai_settings, http_consumer_settings, storage_settings: ObjectStorageSettings::default(), + otel_settings: OtelSettings::default(), } } } diff --git a/crates/scouter_settings/src/otel.rs b/crates/scouter_settings/src/otel.rs new file mode 100644 index 00000000..4d28df50 --- /dev/null +++ b/crates/scouter_settings/src/otel.rs @@ -0,0 +1,159 @@ +use serde::Serialize; +use std::str::FromStr; +use tracing::warn; + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize)] +pub enum OtelProtocol { + #[default] + Grpc, +} + +impl FromStr for OtelProtocol { + type Err = String; + + fn from_str(value: &str) -> Result { + match value.to_ascii_lowercase().as_str() { + "grpc" => Ok(Self::Grpc), + other => Err(format!("unsupported OTLP protocol: {other}")), + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct OtelSettings { + pub enabled: bool, + pub endpoint: String, + pub protocol: OtelProtocol, + pub service_name: String, + pub sample_ratio: f64, + pub export_timeout_secs: u64, +} + +impl Default for OtelSettings { + fn default() -> Self { + let enabled = std::env::var("SCOUTER_OTEL_ENABLED") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(false); + + let endpoint = std::env::var("SCOUTER_OTEL_ENDPOINT") + .unwrap_or_else(|_| "http://localhost:4317".to_string()); + + let protocol = std::env::var("SCOUTER_OTEL_PROTOCOL") + .ok() + .and_then(|v| match v.parse::() { + Ok(protocol) => Some(protocol), + Err(err) => { + warn!("{err}; falling back to grpc"); + None + } + }) + .unwrap_or_default(); + + let service_name = std::env::var("SCOUTER_OTEL_SERVICE_NAME") + .unwrap_or_else(|_| "scouter-server".to_string()); + + let sample_ratio = std::env::var("SCOUTER_OTEL_SAMPLE_RATIO") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(1.0) + .clamp(0.0, 1.0); + + let export_timeout_secs = std::env::var("SCOUTER_OTEL_EXPORT_TIMEOUT_SECS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(10); + + Self { + enabled, + endpoint, + protocol, + service_name, + sample_ratio, + export_timeout_secs, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const OTEL_ENV: &[&str] = &[ + "SCOUTER_OTEL_ENABLED", + "SCOUTER_OTEL_ENDPOINT", + "SCOUTER_OTEL_PROTOCOL", + "SCOUTER_OTEL_SERVICE_NAME", + "SCOUTER_OTEL_SAMPLE_RATIO", + "SCOUTER_OTEL_EXPORT_TIMEOUT_SECS", + ]; + + fn with_clean_env(f: F) { + let prev: Vec<_> = OTEL_ENV + .iter() + .map(|key| (*key, std::env::var(key).ok())) + .collect(); + for key in OTEL_ENV { + unsafe { + std::env::remove_var(key); + } + } + + f(); + + for (key, value) in prev { + unsafe { + match value { + Some(value) => std::env::set_var(key, value), + None => std::env::remove_var(key), + } + } + } + } + + #[test] + fn otel_settings_defaults() { + with_clean_env(|| { + let settings = OtelSettings::default(); + assert!(!settings.enabled); + assert_eq!(settings.endpoint, "http://localhost:4317"); + assert_eq!(settings.protocol, OtelProtocol::Grpc); + assert_eq!(settings.service_name, "scouter-server"); + assert_eq!(settings.sample_ratio, 1.0); + assert_eq!(settings.export_timeout_secs, 10); + }); + } + + #[test] + fn otel_settings_read_env_overrides() { + with_clean_env(|| { + unsafe { + std::env::set_var("SCOUTER_OTEL_ENABLED", "true"); + std::env::set_var("SCOUTER_OTEL_ENDPOINT", "http://collector:4317"); + std::env::set_var("SCOUTER_OTEL_PROTOCOL", "grpc"); + std::env::set_var("SCOUTER_OTEL_SERVICE_NAME", "scouter-test"); + std::env::set_var("SCOUTER_OTEL_SAMPLE_RATIO", "0.25"); + std::env::set_var("SCOUTER_OTEL_EXPORT_TIMEOUT_SECS", "3"); + } + + let settings = OtelSettings::default(); + assert!(settings.enabled); + assert_eq!(settings.endpoint, "http://collector:4317"); + assert_eq!(settings.protocol, OtelProtocol::Grpc); + assert_eq!(settings.service_name, "scouter-test"); + assert_eq!(settings.sample_ratio, 0.25); + assert_eq!(settings.export_timeout_secs, 3); + }); + } + + #[test] + fn sample_ratio_is_clamped_to_valid_range() { + with_clean_env(|| { + unsafe { + std::env::set_var("SCOUTER_OTEL_SAMPLE_RATIO", "2.0"); + } + + assert_eq!(OtelSettings::default().sample_ratio, 1.0); + }); + } +} diff --git a/crates/scouter_tracing/src/tracer.rs b/crates/scouter_tracing/src/tracer.rs index 4fdb212f..8abd5d8d 100644 --- a/crates/scouter_tracing/src/tracer.rs +++ b/crates/scouter_tracing/src/tracer.rs @@ -9,6 +9,7 @@ use crate::error::TraceError; use crate::exporter::SpanExporterNum; use crate::exporter::processor::BatchConfig; use crate::exporter::scouter::ScouterSpanExporter; +use crate::resource::ScouterResourceConfig; use crate::utils::BoxedSpan; use crate::utils::py_obj_to_otel_keyvalue; use crate::utils::{ @@ -29,9 +30,14 @@ use opentelemetry::{ trace::{Span, SpanContext, Status, TraceContextExt, TraceState}, }; use opentelemetry::{SpanId, TraceFlags, TraceId}; +use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::trace::BatchConfigBuilder as OTelBatchConfigBuilder; +use opentelemetry_sdk::trace::BatchSpanProcessor; +use opentelemetry_sdk::trace::Sampler; use opentelemetry_sdk::trace::SdkTracer; use opentelemetry_sdk::trace::SdkTracerProvider; +use opentelemetry_sdk::trace::SpanExporter; use potato_head::create_uuid7; use pyo3::IntoPyObjectExt; use pyo3::prelude::*; @@ -39,6 +45,7 @@ use pyo3::types::{PyDict, PyList, PyTuple}; use scouter_events::queue::ScouterQueue; use scouter_events::queue::types::TransportConfig; use scouter_settings::grpc::GrpcConfig; +use scouter_settings::otel::{OtelProtocol as ScouterOtelProtocol, OtelSettings}; use scouter_types::{ BAGGAGE_PREFIX, EXCEPTION_TRACEBACK, EvalRecord, SCOUTER_TAG_PREFIX, SCOUTER_TRACING_INPUT, @@ -50,7 +57,7 @@ use scouter_types::{SCOUTER_EVAL_PROFILE_UID, SCOUTER_EVAL_RECORD_UID}; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, OnceLock, RwLock}; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; use tracing::{debug, info, instrument, warn}; /// Global static instance of the tracer provider. @@ -64,6 +71,89 @@ static SCOUTER_QUEUE_STORE: RwLock>> = RwLock::new(None) // Re-export span capture state from scouter-types for use within this crate. pub use scouter_types::span_capture::{CAPTURE_BUFFER_MAX, CAPTURE_BUFFERS, CAPTURING}; +#[derive(Clone, Debug)] +pub struct OtlpTracingHandle { + provider: SdkTracerProvider, + export_timeout: Duration, + service_name: String, +} + +impl OtlpTracingHandle { + fn new(provider: SdkTracerProvider, export_timeout: Duration, service_name: String) -> Self { + Self { + provider, + export_timeout, + service_name, + } + } + + pub fn tracer(&self) -> SdkTracer { + self.provider.tracer(self.service_name.clone()) + } + + pub fn force_flush(&self) -> Result<(), TraceError> { + self.provider.force_flush()?; + Ok(()) + } + + pub fn shutdown(&self) -> Result<(), TraceError> { + self.force_flush()?; + self.provider.shutdown_with_timeout(self.export_timeout)?; + Ok(()) + } +} + +pub fn init_server_otlp_tracing( + settings: &OtelSettings, +) -> Result, TraceError> { + if !settings.enabled { + return Ok(None); + } + + match settings.protocol { + ScouterOtelProtocol::Grpc => build_server_grpc_otlp_tracing(settings).map(Some), + } +} + +fn build_server_grpc_otlp_tracing( + settings: &OtelSettings, +) -> Result { + let export_timeout = Duration::from_secs(settings.export_timeout_secs); + let resource = ScouterResourceConfig { + service_name: Some(settings.service_name.clone()), + ..Default::default() + } + .build_resource(); + + let mut exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_export_config(opentelemetry_otlp::ExportConfig { + endpoint: Some(settings.endpoint.clone()), + protocol: opentelemetry_otlp::Protocol::Grpc, + timeout: Some(export_timeout), + }) + .build()?; + exporter.set_resource(&resource); + + let batch_config = OTelBatchConfigBuilder::default() + .with_max_export_timeout(export_timeout) + .build(); + let span_processor = BatchSpanProcessor::builder(exporter) + .with_batch_config(batch_config) + .build(); + let provider = SdkTracerProvider::builder() + .with_span_processor(span_processor) + .with_sampler(Sampler::TraceIdRatioBased(settings.sample_ratio)) + .with_resource(resource) + .build(); + + Ok(OtlpTracingHandle::new( + provider, + export_timeout, + settings.service_name.clone(), + )) +} + /// Stable Phase 0 OLAP observability contract. /// /// These names are intentionally centralized before full instrumentation lands. @@ -2142,6 +2232,26 @@ pub fn try_set_span_attribute(py: Python<'_>, key: &str, value: &str) -> Result< Ok(true) } +#[cfg(test)] +mod server_otlp_tests { + use super::*; + + #[test] + fn disabled_server_otlp_returns_no_handle() { + let settings = OtelSettings { + enabled: false, + endpoint: "http://localhost:4317".to_string(), + protocol: ScouterOtelProtocol::Grpc, + service_name: "scouter-server".to_string(), + sample_ratio: 1.0, + export_timeout_secs: 10, + }; + + let handle = init_server_otlp_tracing(&settings).unwrap(); + assert!(handle.is_none()); + } +} + #[cfg(test)] mod capture_tests { use super::*;