Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ num-traits = "0.*"
object_store = { version = "0.*", features = ["aws", "azure", "gcp" ] }
opentelemetry = { version = "0.31.0" }
opentelemetry-stdout = "0.31.0"
opentelemetry_sdk = { version = "0.31.0", features = ["trace"] }
opentelemetry_sdk = { version = "0.31.0", features = ["trace", "experimental_trace_batch_span_processor_with_async_runtime"] }
opentelemetry-proto = { version = "0.31.0" , features = ["trace"] }
opentelemetry-otlp = { version = "0.31.0", features = ["grpc-tonic", "trace"] }
owo-colors = "4.*"
Expand Down Expand Up @@ -121,6 +121,7 @@ utoipa-axum = "0"
utoipa-swagger-ui = { version = "9", features = ["axum"] }
tokio-util = "0.*"
tracing = "0.*"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = {version = "0.*", features = ["json", "time", "env-filter"] }
uuid = { version = "1.*", features = ["v4", "v7"] }
url = "2.*"
Expand Down
2 changes: 2 additions & 0 deletions crates/scouter_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ scouter-drift = { workspace = true, features = ["sql"] }
scouter-observability = { workspace = true }
scouter-sql = { workspace = true }
scouter-tonic = { workspace = true, features = ["server"] }
scouter-tracing = { workspace = true }


arrow = { workspace = true }
Expand Down Expand Up @@ -55,6 +56,7 @@ thiserror = { workspace = true }
tokio = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
utoipa = { workspace = true }
utoipa-axum = { workspace = true }
Expand Down
70 changes: 55 additions & 15 deletions crates/scouter_server/src/api/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use scouter_settings::{
use scouter_sql::PostgresClient;
use scouter_sql::sql::schema::User;
use scouter_sql::sql::traits::UserSqlLogic;
use scouter_tracing::tracer::{OtlpTracingHandle, init_server_otlp_tracing};
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use tracing::{debug, info, instrument};
use tracing::{debug, info, instrument, warn};
use tracing_subscriber::fmt::time::UtcTime;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{EnvFilter, fmt};
Expand Down Expand Up @@ -69,6 +70,7 @@ pub struct ScouterSetupComponents {
pub genai_service: Arc<GenAiSpanService>,
pub dataset_manager: Arc<DatasetEngineManager>,
pub eval_scenario_service: Arc<EvalScenarioService>,
pub otlp_tracing: Option<OtlpTracingHandle>,
}

fn build_filter(log_level: &str) -> EnvFilter {
Expand All @@ -82,13 +84,20 @@ fn build_filter(log_level: &str) -> EnvFilter {
})
}

fn build_tracer(filter: EnvFilter) -> AnyhowResult<()> {
fn build_tracer(
filter: EnvFilter,
config: &ScouterServerConfig,
) -> AnyhowResult<Option<OtlpTracingHandle>> {
let timer = UtcTime::new(
time::format_description::parse(
"[year]-[month]-[day]T[hour repr:24]:[minute]:[second]::[subsecond digits:4]",
)
.context("Failed to parse time format")?,
);
let (otlp_tracing, otlp_error) = match init_server_otlp_tracing(&config.otel_settings) {
Ok(handle) => (handle, None),
Err(err) => (None, Some(err)),
};

tracing_subscriber::registry()
.with(filter)
Expand All @@ -98,19 +107,35 @@ fn build_tracer(filter: EnvFilter) -> AnyhowResult<()> {
.with_thread_ids(true)
.with_timer(timer),
)
.with(
otlp_tracing
.as_ref()
.map(|handle| tracing_opentelemetry::layer().with_tracer(handle.tracer())),
)
.try_init()
.ok();

Ok(())
if let Some(err) = otlp_error {
warn!("Failed to initialize OTLP tracing: {}", err);
}

Ok(otlp_tracing)
}

fn build_json_tracer(filter: EnvFilter) -> AnyhowResult<()> {
fn build_json_tracer(
filter: EnvFilter,
config: &ScouterServerConfig,
) -> AnyhowResult<Option<OtlpTracingHandle>> {
let timer = UtcTime::new(
time::format_description::parse(
"[year]-[month]-[day]T[hour repr:24]:[minute]:[second]::[subsecond digits:4]",
)
.context("Failed to parse time format")?,
);
let (otlp_tracing, otlp_error) = match init_server_otlp_tracing(&config.otel_settings) {
Ok(handle) => (handle, None),
Err(err) => (None, Some(err)),
};

tracing_subscriber::registry()
.with(filter)
Expand All @@ -122,21 +147,33 @@ fn build_json_tracer(filter: EnvFilter) -> AnyhowResult<()> {
.with_thread_ids(true)
.with_timer(timer),
)
.with(
otlp_tracing
.as_ref()
.map(|handle| tracing_opentelemetry::layer().with_tracer(handle.tracer())),
)
.try_init()
.ok();

Ok(())
if let Some(err) = otlp_error {
warn!("Failed to initialize OTLP tracing: {}", err);
}

Ok(otlp_tracing)
}

impl ScouterSetupComponents {
pub async fn new() -> AnyhowResult<Self> {
let config = Arc::new(ScouterServerConfig::new().await);

// start logging
let logging = Self::setup_logging().await;
if logging.is_err() {
debug!("Failed to setup logging. {:?}", logging.err());
}
let otlp_tracing = match Self::setup_logging(&config).await {
Ok(handle) => handle,
Err(err) => {
debug!("Failed to setup logging. {:?}", err);
None
}
};

let db_pool = Self::setup_database(&config.database_settings).await?;

Expand Down Expand Up @@ -238,6 +275,7 @@ impl ScouterSetupComponents {
genai_service,
dataset_manager,
eval_scenario_service,
otlp_tracing,
})
}

Expand Down Expand Up @@ -319,22 +357,24 @@ impl ScouterSetupComponents {
Ok(Arc::new(service))
}

async fn setup_logging() -> AnyhowResult<()> {
async fn setup_logging(
config: &ScouterServerConfig,
) -> AnyhowResult<Option<OtlpTracingHandle>> {
let log_level = std::env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
let use_json = std::env::var("LOG_JSON")
.unwrap_or_else(|_| "false".to_string())
.parse::<bool>()?;

let filter = build_filter(&log_level);

if use_json {
build_json_tracer(filter)?;
let otlp_tracing = if use_json {
build_json_tracer(filter, config)?
} else {
build_tracer(filter)?;
}
build_tracer(filter, config)?
};

info!("Logging setup successfully");
Ok(())
Ok(otlp_tracing)
}

// setup default users
Expand Down
7 changes: 7 additions & 0 deletions crates/scouter_server/src/api/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use scouter_dataframe::parquet::tracing::summary::TraceSummaryService;
use scouter_settings::ScouterServerConfig;
use scouter_sql::sql::aggregator::shutdown_trace_cache;
use scouter_sql::sql::cache::entity_cache;
use scouter_tracing::tracer::OtlpTracingHandle;
use scouter_types::{
DriftType, ServerRecords, TagRecord, TraceServerRecord, contracts::ScouterServerError,
};
Expand All @@ -34,6 +35,7 @@ pub struct AppState {
pub genai_service: Arc<GenAiSpanService>,
pub dataset_manager: Arc<DatasetEngineManager>,
pub eval_scenario_service: Arc<EvalScenarioService>,
pub otlp_tracing: Option<OtlpTracingHandle>,
}

impl AppState {
Expand All @@ -52,6 +54,11 @@ impl AppState {
self.genai_service.signal_shutdown().await;
self.dataset_manager.shutdown().await;
self.eval_scenario_service.signal_shutdown().await;
if let Some(otlp_tracing) = &self.otlp_tracing {
otlp_tracing.shutdown().unwrap_or_else(|e| {
error!("Failed to shutdown OTLP tracing: {:?}", e);
});
}
self.db_pool.close().await;
}

Expand Down
5 changes: 4 additions & 1 deletion crates/scouter_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub async fn create_app_state() -> Result<Arc<AppState>, anyhow::Error> {
genai_service: scouter_components.genai_service,
dataset_manager: scouter_components.dataset_manager,
eval_scenario_service: scouter_components.eval_scenario_service,
otlp_tracing: scouter_components.otlp_tracing,
});

Ok(app_state)
Expand Down Expand Up @@ -97,7 +98,9 @@ pub async fn start_server_with_mode(mode: ServeMode) -> Result<(), anyhow::Error
info!("HTTP server shut down gracefully");
}
ServeMode::Grpc => {
start_grpc_server(app_state).await?;
let result = start_grpc_server(Arc::clone(&app_state)).await;
app_state.shutdown().await;
result?;
info!("gRPC server shut down gracefully");
}
ServeMode::Both => {
Expand Down
4 changes: 4 additions & 0 deletions crates/scouter_settings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod events;
pub mod genai;
pub mod grpc;
pub mod http;
pub mod otel;
pub mod polling;
pub mod storage;

Expand All @@ -18,6 +19,7 @@ pub use database::DatabaseSettings;
pub use events::{KafkaSettings, RabbitMQSettings, RedisSettings};
pub use genai::GenAISettings;
pub use http::HttpConfig;
pub use otel::OtelSettings;
pub use polling::PollingSettings;
pub use storage::ObjectStorageSettings;

Expand Down Expand Up @@ -46,6 +48,7 @@ pub struct ScouterServerConfig {
pub auth_settings: AuthSettings,
pub bootstrap_key: String,
pub storage_settings: ObjectStorageSettings,
pub otel_settings: OtelSettings,
}

impl ScouterServerConfig {
Expand Down Expand Up @@ -122,6 +125,7 @@ impl ScouterServerConfig {
genai_settings,
http_consumer_settings,
storage_settings: ObjectStorageSettings::default(),
otel_settings: OtelSettings::default(),
}
}
}
Expand Down
Loading
Loading