From 553a607685000971ef06678e913db7429d263148 Mon Sep 17 00:00:00 2001 From: Thorrester Date: Wed, 13 May 2026 18:42:37 -0400 Subject: [PATCH 1/2] Add tiered OLAP benchmark harness --- crates/scouter_dataframe/Cargo.toml | 12 + .../scouter_dataframe/bench_metrics/README.md | 5 + .../bench_metrics/t0_bifrost_smoke.json | 40 ++ .../bench_metrics/t0_cold_query_smoke.json | 40 ++ .../t0_hot_path_cold_query_smoke.json | 40 ++ .../t0_refresh_origin_sentinel.json | 40 ++ .../benches/counting_object_store.rs | 199 ++++++ .../benches/dataset_benchmark.rs | 99 ++- .../benches/hot_path_bench.rs | 141 +++- .../benches/planner_bench.rs | 6 + .../benches/session_config_bench.rs | 5 + .../scouter_dataframe/benches/stress_test.rs | 5 + crates/scouter_dataframe/benches/tiers.rs | 672 ++++++++++++++++++ .../benches/trace_service_benchmark.rs | 179 +++++ crates/scouter_dataframe/benches/utils.rs | 291 +++++++- makefile | 21 + 16 files changed, 1791 insertions(+), 4 deletions(-) create mode 100644 crates/scouter_dataframe/bench_metrics/README.md create mode 100644 crates/scouter_dataframe/bench_metrics/t0_bifrost_smoke.json create mode 100644 crates/scouter_dataframe/bench_metrics/t0_cold_query_smoke.json create mode 100644 crates/scouter_dataframe/bench_metrics/t0_hot_path_cold_query_smoke.json create mode 100644 crates/scouter_dataframe/bench_metrics/t0_refresh_origin_sentinel.json create mode 100644 crates/scouter_dataframe/benches/counting_object_store.rs create mode 100644 crates/scouter_dataframe/benches/tiers.rs diff --git a/crates/scouter_dataframe/Cargo.toml b/crates/scouter_dataframe/Cargo.toml index e2c645051..69f7b4052 100644 --- a/crates/scouter_dataframe/Cargo.toml +++ b/crates/scouter_dataframe/Cargo.toml @@ -8,10 +8,14 @@ authors = [ ] license = "MIT" description = "DataFusion client for long-term storage of scouter data" +autobenches = false [lib] doctest = false +[features] +bench-jaeger = [] + [dependencies] scouter-settings = { workspace = true } scouter-types = { workspace = true } @@ -79,3 +83,11 @@ harness = false [[bench]] name = "hot_path_bench" harness = false + +[[bin]] +name = "bench_tier_filter" +path = "benches/tiers.rs" + +[[bin]] +name = "bench_compare" +path = "benches/tiers.rs" diff --git a/crates/scouter_dataframe/bench_metrics/README.md b/crates/scouter_dataframe/bench_metrics/README.md new file mode 100644 index 000000000..362b43e22 --- /dev/null +++ b/crates/scouter_dataframe/bench_metrics/README.md @@ -0,0 +1,5 @@ +Tier 0 baseline JSON artifacts live in this directory after `make bench.core` +has been proven to complete under the 15 minute Phase 0.6 budget. + +Benchmark runs write fresh artifacts to `target/bench_metrics/`; CI must not +write into this directory. diff --git a/crates/scouter_dataframe/bench_metrics/t0_bifrost_smoke.json b/crates/scouter_dataframe/bench_metrics/t0_bifrost_smoke.json new file mode 100644 index 000000000..592df24f2 --- /dev/null +++ b/crates/scouter_dataframe/bench_metrics/t0_bifrost_smoke.json @@ -0,0 +1,40 @@ +{ + "commit": "411693235a19c2dc7680102042d9a5f062f8e1d7", + "bench_group": "t0_bifrost_smoke", + "tier": 0, + "blocking": true, + "scenario_class": "bifrost_smoke", + "runtime_budget_secs": 120, + "actual_runtime_secs": 1.5295609589999999, + "fixture_rows": 1000, + "fixture_spans": 0, + "storage_profile": "P1_local_nvme", + "spans": { + "delta.snapshot.refresh": { + "count": 0, + "p50_us": 0, + "p95_us": 0, + "p99_us": 0, + "sum_us": 0 + }, + "df.collect": { + "count": 1, + "p50_us": 2807, + "p95_us": 2807, + "p99_us": 2807, + "sum_us": 2807 + } + }, + "object_store_counts": { + "list": 0, + "list_with_delimiter": 0, + "head": 0, + "get": 0, + "get_range": 0, + "put": 0, + "delete": 0, + "copy": 0, + "bytes": 0 + }, + "refresh_on_request_path_total": 0 +} diff --git a/crates/scouter_dataframe/bench_metrics/t0_cold_query_smoke.json b/crates/scouter_dataframe/bench_metrics/t0_cold_query_smoke.json new file mode 100644 index 000000000..37aa642db --- /dev/null +++ b/crates/scouter_dataframe/bench_metrics/t0_cold_query_smoke.json @@ -0,0 +1,40 @@ +{ + "commit": "411693235a19c2dc7680102042d9a5f062f8e1d7", + "bench_group": "t0_cold_query_smoke", + "tier": 0, + "blocking": true, + "scenario_class": "cold_query", + "runtime_budget_secs": 120, + "actual_runtime_secs": 1.55367525, + "fixture_rows": 10080, + "fixture_spans": 10080, + "storage_profile": "P1_local_nvme", + "spans": { + "delta.snapshot.refresh": { + "count": 0, + "p50_us": 0, + "p95_us": 0, + "p99_us": 0, + "sum_us": 0 + }, + "df.collect": { + "count": 1, + "p50_us": 6572, + "p95_us": 6572, + "p99_us": 6572, + "sum_us": 6572 + } + }, + "object_store_counts": { + "list": 0, + "list_with_delimiter": 0, + "head": 0, + "get": 0, + "get_range": 0, + "put": 0, + "delete": 0, + "copy": 0, + "bytes": 0 + }, + "refresh_on_request_path_total": 0 +} diff --git a/crates/scouter_dataframe/bench_metrics/t0_hot_path_cold_query_smoke.json b/crates/scouter_dataframe/bench_metrics/t0_hot_path_cold_query_smoke.json new file mode 100644 index 000000000..7fd1aa0d4 --- /dev/null +++ b/crates/scouter_dataframe/bench_metrics/t0_hot_path_cold_query_smoke.json @@ -0,0 +1,40 @@ +{ + "commit": "411693235a19c2dc7680102042d9a5f062f8e1d7", + "bench_group": "t0_hot_path_cold_query_smoke", + "tier": 0, + "blocking": true, + "scenario_class": "cold_query", + "runtime_budget_secs": 120, + "actual_runtime_secs": 0.132162208, + "fixture_rows": 10000, + "fixture_spans": 10000, + "storage_profile": "P1_local_nvme", + "spans": { + "delta.snapshot.refresh": { + "count": 0, + "p50_us": 0, + "p95_us": 0, + "p99_us": 0, + "sum_us": 0 + }, + "df.collect": { + "count": 1, + "p50_us": 5425, + "p95_us": 5425, + "p99_us": 5425, + "sum_us": 5425 + } + }, + "object_store_counts": { + "list": 0, + "list_with_delimiter": 0, + "head": 0, + "get": 0, + "get_range": 0, + "put": 0, + "delete": 0, + "copy": 0, + "bytes": 0 + }, + "refresh_on_request_path_total": 0 +} diff --git a/crates/scouter_dataframe/bench_metrics/t0_refresh_origin_sentinel.json b/crates/scouter_dataframe/bench_metrics/t0_refresh_origin_sentinel.json new file mode 100644 index 000000000..ba487e720 --- /dev/null +++ b/crates/scouter_dataframe/bench_metrics/t0_refresh_origin_sentinel.json @@ -0,0 +1,40 @@ +{ + "commit": "411693235a19c2dc7680102042d9a5f062f8e1d7", + "bench_group": "t0_refresh_origin_sentinel", + "tier": 0, + "blocking": true, + "scenario_class": "refresh_origin_sentinel", + "runtime_budget_secs": 30, + "actual_runtime_secs": 4.59e-7, + "fixture_rows": 0, + "fixture_spans": 0, + "storage_profile": "P1_local_nvme", + "spans": { + "delta.snapshot.refresh": { + "count": 0, + "p50_us": 0, + "p95_us": 0, + "p99_us": 0, + "sum_us": 0 + }, + "df.collect": { + "count": 0, + "p50_us": 0, + "p95_us": 0, + "p99_us": 0, + "sum_us": 0 + } + }, + "object_store_counts": { + "list": 0, + "list_with_delimiter": 0, + "head": 0, + "get": 0, + "get_range": 0, + "put": 0, + "delete": 0, + "copy": 0, + "bytes": 0 + }, + "refresh_on_request_path_total": 0 +} diff --git a/crates/scouter_dataframe/benches/counting_object_store.rs b/crates/scouter_dataframe/benches/counting_object_store.rs new file mode 100644 index 000000000..470f71813 --- /dev/null +++ b/crates/scouter_dataframe/benches/counting_object_store.rs @@ -0,0 +1,199 @@ +#![allow(dead_code)] + +use crate::tiers::ObjectStoreCountSnapshot; +use async_trait::async_trait; +use bytes::Bytes; +use futures::StreamExt; +use futures::stream::BoxStream; +use object_store::path::Path; +use object_store::{ + CopyOptions, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, +}; +use std::fmt; +use std::ops::Range; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +#[derive(Debug, Default)] +pub struct ObjectStoreCounts { + pub list: AtomicU64, + pub list_with_delimiter: AtomicU64, + pub head: AtomicU64, + pub get: AtomicU64, + pub get_range: AtomicU64, + pub put: AtomicU64, + pub delete: AtomicU64, + pub copy: AtomicU64, + pub bytes: AtomicU64, +} + +impl ObjectStoreCounts { + pub fn snapshot(&self) -> ObjectStoreCountSnapshot { + ObjectStoreCountSnapshot { + list: self.list.load(Ordering::Relaxed), + list_with_delimiter: self.list_with_delimiter.load(Ordering::Relaxed), + head: self.head.load(Ordering::Relaxed), + get: self.get.load(Ordering::Relaxed), + get_range: self.get_range.load(Ordering::Relaxed), + put: self.put.load(Ordering::Relaxed), + delete: self.delete.load(Ordering::Relaxed), + copy: self.copy.load(Ordering::Relaxed), + bytes: self.bytes.load(Ordering::Relaxed), + } + } +} + +#[derive(Debug)] +pub struct CountingObjectStore { + inner: S, + counts: Arc, +} + +impl CountingObjectStore { + pub fn new(inner: S) -> Self { + Self { + inner, + counts: Arc::new(ObjectStoreCounts::default()), + } + } + + pub fn counts(&self) -> Arc { + Arc::clone(&self.counts) + } + + pub fn into_inner(self) -> S { + self.inner + } +} + +impl fmt::Display for CountingObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CountingObjectStore({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for CountingObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + self.counts.put.fetch_add(1, Ordering::Relaxed); + self.counts + .bytes + .fetch_add(payload.content_length() as u64, Ordering::Relaxed); + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + self.counts.put.fetch_add(1, Ordering::Relaxed); + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + if options.head { + self.counts.head.fetch_add(1, Ordering::Relaxed); + } else if options.range.is_some() { + self.counts.get_range.fetch_add(1, Ordering::Relaxed); + } else { + self.counts.get.fetch_add(1, Ordering::Relaxed); + } + + let requested_range = options.range.clone(); + let result = self.inner.get_opts(location, options).await?; + let bytes = match requested_range { + Some(GetRange::Bounded(range)) => range.end.saturating_sub(range.start), + Some(GetRange::Offset(offset)) => result.meta.size.saturating_sub(offset), + Some(GetRange::Suffix(suffix)) => suffix.min(result.meta.size), + None if !result.range.is_empty() => result.range.end.saturating_sub(result.range.start), + None => result.meta.size, + }; + self.counts.bytes.fetch_add(bytes, Ordering::Relaxed); + Ok(result) + } + + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + self.counts + .get_range + .fetch_add(ranges.len() as u64, Ordering::Relaxed); + self.counts.bytes.fetch_add( + ranges + .iter() + .map(|range| range.end.saturating_sub(range.start)) + .sum(), + Ordering::Relaxed, + ); + self.inner.get_ranges(location, ranges).await + } + + fn delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + let counts = Arc::clone(&self.counts); + self.inner + .delete_stream(locations) + .map(move |result| { + if result.is_ok() { + counts.delete.fetch_add(1, Ordering::Relaxed); + } + result + }) + .boxed() + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + self.counts.list.fetch_add(1, Ordering::Relaxed); + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.counts + .list_with_delimiter + .fetch_add(1, Ordering::Relaxed); + self.inner.list_with_delimiter(prefix).await + } + + async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> { + self.counts.copy.fetch_add(1, Ordering::Relaxed); + self.inner.copy_opts(from, to, options).await + } +} + +#[cfg(test)] +#[allow(unused_imports)] +mod tests { + use super::*; + use object_store::ObjectStoreExt; + use object_store::memory::InMemory; + + #[tokio::test] + async fn counts_basic_operations() { + let store = CountingObjectStore::new(InMemory::new()); + let path = Path::from("bench/counts.txt"); + + store + .put(&path, PutPayload::from_static(b"abcdef")) + .await + .unwrap(); + let _ = store.head(&path).await.unwrap(); + let _ = store.get_range(&path, 1..3).await.unwrap(); + let _ = store.list(None).collect::>().await; + store.delete(&path).await.unwrap(); + + let snapshot = store.counts().snapshot(); + assert_eq!(snapshot.put, 1); + assert_eq!(snapshot.head, 1); + assert_eq!(snapshot.get_range, 1); + assert_eq!(snapshot.list, 1); + assert_eq!(snapshot.delete, 1); + assert!(snapshot.bytes >= 8); + } +} diff --git a/crates/scouter_dataframe/benches/dataset_benchmark.rs b/crates/scouter_dataframe/benches/dataset_benchmark.rs index 2f792f2da..4f475a1f9 100644 --- a/crates/scouter_dataframe/benches/dataset_benchmark.rs +++ b/crates/scouter_dataframe/benches/dataset_benchmark.rs @@ -1,3 +1,6 @@ +mod tiers; +mod utils; + use arrow::array::{Date32Array, Float64Array, StringArray, TimestampMicrosecondArray}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow_array::RecordBatch; @@ -7,11 +10,16 @@ use scouter_dataframe::parquet::bifrost::manager::DatasetEngineManager; use scouter_settings::ObjectStorageSettings; use scouter_types::StorageType; use scouter_types::dataset::{DatasetFingerprint, DatasetNamespace, DatasetRegistration}; +use std::collections::BTreeMap; use std::hint::black_box; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; +use tiers::ObjectStoreCountSnapshot; use tokio::runtime::Runtime; +const DF_COLLECT_SPAN: &str = "df.collect"; +const DELTA_SNAPSHOT_REFRESH_SPAN: &str = "delta.snapshot.refresh"; + fn bench_schema() -> Schema { Schema::new(vec![ Field::new("user_id", DataType::Utf8, false), @@ -73,6 +81,10 @@ fn make_storage_settings(dir: &tempfile::TempDir) -> ObjectStorageSettings { } fn bench_write_throughput(c: &mut Criterion) { + if !tiers::tier_guard_for("dataset_benchmark", "dataset_write") { + return; + } + let mut group = c.benchmark_group("dataset_write"); group.sample_size(10); group.measurement_time(Duration::from_secs(30)); @@ -113,6 +125,10 @@ fn bench_write_throughput(c: &mut Criterion) { } fn bench_query(c: &mut Criterion) { + if !tiers::tier_guard_for("dataset_benchmark", "dataset_query") { + return; + } + let rt = Runtime::new().unwrap(); let dir = tempfile::tempdir().unwrap(); let schema = bench_schema(); @@ -178,5 +194,84 @@ fn bench_query(c: &mut Criterion) { }); } -criterion_group!(benches, bench_write_throughput, bench_query); +fn span_metric(duration: Duration) -> tiers::SpanMetric { + let micros = duration.as_micros().min(u64::MAX as u128) as u64; + tiers::SpanMetric { + count: 1, + p50_us: micros, + p95_us: micros, + p99_us: micros, + sum_us: micros, + } +} + +fn bench_t0_bifrost_smoke(c: &mut Criterion) { + const GROUP: &str = "t0_bifrost_smoke"; + if !tiers::tier_guard_for("dataset_benchmark", GROUP) { + return; + } + + let setup_start = Instant::now(); + let rt = Runtime::new().unwrap(); + let dir = tempfile::tempdir().unwrap(); + let schema = bench_schema(); + let (manager, namespace) = rt.block_on(async { + let settings = make_storage_settings(&dir); + let manager = DatasetEngineManager::with_config(&settings, 1800, 10, 1, 50_000, 30) + .await + .unwrap(); + let reg = make_registration(&schema); + manager.register_dataset(®).await.unwrap(); + manager + .insert_batch(®.namespace, ®.fingerprint, make_batch(&schema, 1_000)) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(1500)).await; + (Arc::new(manager), reg.namespace.clone()) + }); + let fqn = namespace.fqn(); + let sql = format!("SELECT COUNT(*) as cnt FROM {fqn}"); + + let smoke_start = Instant::now(); + rt.block_on(async { + let _ = manager.query(&sql).await.unwrap(); + }); + let smoke_runtime = smoke_start.elapsed(); + + let mut spans = BTreeMap::new(); + spans.insert(DF_COLLECT_SPAN.to_string(), span_metric(smoke_runtime)); + spans.insert( + DELTA_SNAPSHOT_REFRESH_SPAN.to_string(), + tiers::SpanMetric::default(), + ); + utils::write_bench_artifact( + "dataset_benchmark", + GROUP, + setup_start.elapsed(), + spans, + ObjectStoreCountSnapshot::default(), + 0, + ); + + c.bench_function(GROUP, |b| { + let mgr = Arc::clone(&manager); + let sql = sql.clone(); + b.to_async(&rt) + .iter(|| async { black_box(mgr.query(&sql).await.unwrap()) }); + }); + + rt.block_on(async { + Arc::try_unwrap(manager) + .unwrap_or_else(|_| panic!("manager still referenced")) + .shutdown() + .await; + }); +} + +criterion_group!( + benches, + bench_t0_bifrost_smoke, + bench_write_throughput, + bench_query +); criterion_main!(benches); diff --git a/crates/scouter_dataframe/benches/hot_path_bench.rs b/crates/scouter_dataframe/benches/hot_path_bench.rs index f83e35f5c..f80405a67 100644 --- a/crates/scouter_dataframe/benches/hot_path_bench.rs +++ b/crates/scouter_dataframe/benches/hot_path_bench.rs @@ -1,3 +1,6 @@ +mod tiers; +mod utils; + use chrono::{DateTime, Utc}; use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; use scouter_dataframe::parquet::tracing::queries::TraceQueries; @@ -7,9 +10,11 @@ use scouter_types::{ Attribute, FilterClause, SpanId, StorageType, TraceId, TraceMetricsRequest, TraceSpanRecord, }; use serde_json::json; +use std::collections::BTreeMap; use std::hint::black_box; use std::sync::Arc; use std::time::{Duration, Instant}; +use tiers::ObjectStoreCountSnapshot; use tokio::runtime::Runtime; const TOTAL_SPANS: usize = 1_000_000; @@ -17,6 +22,8 @@ const SPANS_PER_TRACE: usize = 5; const WRITE_CHUNK_SPANS: usize = 50_000; const SERVICE_COUNT: usize = 20; const HOT_SERVICE: &str = "service_03"; +const DF_COLLECT_SPAN: &str = "df.collect"; +const DELTA_SNAPSHOT_REFRESH_SPAN: &str = "delta.snapshot.refresh"; #[derive(Clone)] struct HotFixture { @@ -161,6 +168,35 @@ async fn seed_fixture(settings: &ObjectStorageSettings) -> HotFixture { } } +async fn seed_small_fixture(settings: &ObjectStorageSettings, total_spans: usize) -> HotFixture { + let service = TraceSpanService::new(settings, 999, Some(1), None, 10, None) + .await + .unwrap(); + let base_time = Utc::now() - chrono::Duration::hours(24); + + for start in (0..total_spans).step_by(2_000) { + let end = (start + 2_000).min(total_spans); + service + .write_spans_direct(span_chunk(start, end, base_time)) + .await + .unwrap(); + } + service.optimize().await.unwrap(); + + let trace_idx = 24 * 100 + 3; + let trace_start = base_time + + chrono::Duration::hours((trace_idx % 24) as i64) + + chrono::Duration::milliseconds(((trace_idx / 24) % 3_600_000) as i64); + + HotFixture { + service: Arc::new(service), + trace_id: Arc::new(trace_id(trace_idx).as_bytes().to_vec()), + trace_start, + window_start: base_time + chrono::Duration::hours(3), + window_end: base_time + chrono::Duration::hours(4), + } +} + fn metrics_request( start_time: DateTime, end_time: DateTime, @@ -177,6 +213,12 @@ fn metrics_request( } fn benchmark_trace_hot_paths(c: &mut Criterion) { + let run_trace_group = tiers::tier_guard_for("hot_path_bench", "trace_hot_paths_1m"); + let run_metrics_group = tiers::tier_guard_for("hot_path_bench", "metrics_hot_paths_1m"); + if !run_trace_group && !run_metrics_group { + return; + } + let rt = Runtime::new().unwrap(); let tmp_dir = tempfile::tempdir().unwrap(); let settings = storage_settings(tmp_dir.path().to_string_lossy().to_string()); @@ -423,5 +465,102 @@ fn benchmark_trace_hot_paths(c: &mut Criterion) { drop(tmp_dir); } -criterion_group!(benches, benchmark_trace_hot_paths); +fn span_metric(duration: Duration) -> tiers::SpanMetric { + let micros = duration.as_micros().min(u64::MAX as u128) as u64; + tiers::SpanMetric { + count: 1, + p50_us: micros, + p95_us: micros, + p99_us: micros, + sum_us: micros, + } +} + +fn benchmark_t0_cold_query_smoke(c: &mut Criterion) { + const GROUP: &str = "t0_hot_path_cold_query_smoke"; + if !tiers::tier_guard_for("hot_path_bench", GROUP) { + return; + } + + let setup_start = Instant::now(); + let rt = Runtime::new().unwrap(); + let tmp_dir = tempfile::tempdir().unwrap(); + let settings = storage_settings(tmp_dir.path().to_string_lossy().to_string()); + let fixture = rt.block_on(seed_small_fixture(&settings, 10_000)); + + let smoke_start = Instant::now(); + rt.block_on(async { + let _ = fixture + .service + .query_service + .query_spans( + Some(&fixture.trace_id), + None, + None, + None, + None, + Some(&fixture.trace_start), + Some(&(fixture.trace_start + chrono::Duration::minutes(1))), + None, + ) + .await + .unwrap(); + }); + let smoke_runtime = smoke_start.elapsed(); + + let mut spans = BTreeMap::new(); + spans.insert(DF_COLLECT_SPAN.to_string(), span_metric(smoke_runtime)); + spans.insert( + DELTA_SNAPSHOT_REFRESH_SPAN.to_string(), + tiers::SpanMetric::default(), + ); + utils::write_bench_artifact( + "hot_path_bench", + GROUP, + setup_start.elapsed(), + spans, + ObjectStoreCountSnapshot::default(), + 0, + ); + + c.bench_function(GROUP, |b| { + b.to_async(&rt).iter_custom(|iters| { + let fixture = fixture.clone(); + async move { + let start = Instant::now(); + for _ in 0..iters { + let _ = black_box( + fixture + .service + .query_service + .query_spans( + Some(&fixture.trace_id), + None, + None, + None, + None, + Some(&fixture.trace_start), + Some(&(fixture.trace_start + chrono::Duration::minutes(1))), + None, + ) + .await + .unwrap(), + ); + } + start.elapsed() + } + }); + }); + + let service = Arc::try_unwrap(fixture.service) + .unwrap_or_else(|_| panic!("Arc still has multiple owners")); + rt.block_on(async { service.shutdown().await.unwrap() }); + drop(tmp_dir); +} + +criterion_group!( + benches, + benchmark_t0_cold_query_smoke, + benchmark_trace_hot_paths +); criterion_main!(benches); diff --git a/crates/scouter_dataframe/benches/planner_bench.rs b/crates/scouter_dataframe/benches/planner_bench.rs index 752fbaea1..5d291f258 100644 --- a/crates/scouter_dataframe/benches/planner_bench.rs +++ b/crates/scouter_dataframe/benches/planner_bench.rs @@ -1,3 +1,5 @@ +mod tiers; + use chrono::Utc; use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; use scouter_dataframe::parquet::tracing::queries::TraceQueries; @@ -105,6 +107,10 @@ fn metrics_request(clause: FilterClause) -> TraceMetricsRequest { } fn benchmark_planner_queries(c: &mut Criterion) { + if !tiers::tier_guard_for("planner_bench", "planner_queries") { + return; + } + let mut group = c.benchmark_group("planner_queries"); group.sample_size(10); group.measurement_time(Duration::from_secs(3)); diff --git a/crates/scouter_dataframe/benches/session_config_bench.rs b/crates/scouter_dataframe/benches/session_config_bench.rs index 87da31d5f..af32aebe5 100644 --- a/crates/scouter_dataframe/benches/session_config_bench.rs +++ b/crates/scouter_dataframe/benches/session_config_bench.rs @@ -10,6 +10,7 @@ //! SCOUTER_STORAGE_URI=gs://your-bucket cargo bench -p scouter-dataframe --bench session_config_bench //! ``` +mod tiers; mod utils; use chrono::Utc; @@ -26,6 +27,10 @@ const QUERY_ITERS: usize = 200; #[tokio::main] async fn main() -> Result<(), Box> { + if !tiers::tier_guard_for("session_config_bench", "session_config_bench") { + return Ok(()); + } + tracing_subscriber::fmt() .with_max_level(tracing::Level::WARN) .init(); diff --git a/crates/scouter_dataframe/benches/stress_test.rs b/crates/scouter_dataframe/benches/stress_test.rs index 3024504aa..35d2eb8e3 100644 --- a/crates/scouter_dataframe/benches/stress_test.rs +++ b/crates/scouter_dataframe/benches/stress_test.rs @@ -1,3 +1,4 @@ +mod tiers; mod utils; use chrono::Utc; @@ -16,6 +17,10 @@ const ENTITY_TRACES: usize = 50; #[tokio::main] async fn main() -> Result<(), Box> { + if !tiers::tier_guard_for("stress_test", "stress_test") { + return Ok(()); + } + tracing_subscriber::fmt() .with_max_level(tracing::Level::WARN) .init(); diff --git a/crates/scouter_dataframe/benches/tiers.rs b/crates/scouter_dataframe/benches/tiers.rs new file mode 100644 index 000000000..2dd3f2fb9 --- /dev/null +++ b/crates/scouter_dataframe/benches/tiers.rs @@ -0,0 +1,672 @@ +#![allow(dead_code)] + +use serde::{Deserialize, Serialize}; +use std::collections::{BTreeMap, BTreeSet}; +use std::env; +use std::fs; +use std::path::{Path, PathBuf}; +use std::process; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[repr(u8)] +pub enum BenchTier { + Tier0 = 0, + Tier1 = 1, + Tier2 = 2, +} + +impl BenchTier { + pub fn from_u8(value: u8) -> Option { + match value { + 0 => Some(Self::Tier0), + 1 => Some(Self::Tier1), + 2 => Some(Self::Tier2), + _ => None, + } + } + + pub fn as_u8(self) -> u8 { + self as u8 + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct BenchRegistration { + pub bench_binary: &'static str, + pub group_name: &'static str, + pub tier: BenchTier, + pub runtime_budget_secs: u64, + pub fixture_rows: u64, + pub fixture_spans: u64, + pub storage_profile: &'static str, + pub scenario_class: &'static str, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct SpanMetric { + pub count: u64, + pub p50_us: u64, + pub p95_us: u64, + pub p99_us: u64, + pub sum_us: u64, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct ObjectStoreCountSnapshot { + pub list: u64, + pub list_with_delimiter: u64, + pub head: u64, + pub get: u64, + pub get_range: u64, + pub put: u64, + pub delete: u64, + pub copy: u64, + pub bytes: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BenchArtifact { + pub commit: String, + pub bench_group: String, + pub tier: u8, + pub blocking: bool, + pub scenario_class: String, + pub runtime_budget_secs: u64, + pub actual_runtime_secs: f64, + pub fixture_rows: u64, + pub fixture_spans: u64, + pub storage_profile: String, + pub spans: BTreeMap, + pub object_store_counts: ObjectStoreCountSnapshot, + #[serde(default)] + pub refresh_on_request_path_total: u64, +} + +pub const P1_LOCAL_NVME: &str = "P1_local_nvme"; +pub const P2_OBJECT_WARM: &str = "P2_object_warm"; +pub const P2_OBJECT_COLD: &str = "P2_object_cold"; + +pub const REGISTRY: &[BenchRegistration] = &[ + BenchRegistration { + bench_binary: "trace_service_benchmark", + group_name: "t0_cold_query_smoke", + tier: BenchTier::Tier0, + runtime_budget_secs: 120, + fixture_rows: 10_080, + fixture_spans: 10_080, + storage_profile: P1_LOCAL_NVME, + scenario_class: "cold_query", + }, + BenchRegistration { + bench_binary: "trace_service_benchmark", + group_name: "t0_refresh_origin_sentinel", + tier: BenchTier::Tier0, + runtime_budget_secs: 30, + fixture_rows: 0, + fixture_spans: 0, + storage_profile: P1_LOCAL_NVME, + scenario_class: "refresh_origin_sentinel", + }, + BenchRegistration { + bench_binary: "hot_path_bench", + group_name: "t0_hot_path_cold_query_smoke", + tier: BenchTier::Tier0, + runtime_budget_secs: 120, + fixture_rows: 10_000, + fixture_spans: 10_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "cold_query", + }, + BenchRegistration { + bench_binary: "dataset_benchmark", + group_name: "t0_bifrost_smoke", + tier: BenchTier::Tier0, + runtime_budget_secs: 120, + fixture_rows: 1_000, + fixture_spans: 0, + storage_profile: P1_LOCAL_NVME, + scenario_class: "bifrost_smoke", + }, + BenchRegistration { + bench_binary: "trace_service_benchmark", + group_name: "write_throughput", + tier: BenchTier::Tier1, + runtime_budget_secs: 1800, + fixture_rows: 50_000, + fixture_spans: 50_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "write_throughput", + }, + BenchRegistration { + bench_binary: "trace_service_benchmark", + group_name: "concurrent_writes", + tier: BenchTier::Tier1, + runtime_budget_secs: 1800, + fixture_rows: 200, + fixture_spans: 200, + storage_profile: P1_LOCAL_NVME, + scenario_class: "write_throughput", + }, + BenchRegistration { + bench_binary: "trace_service_benchmark", + group_name: "query_performance", + tier: BenchTier::Tier1, + runtime_budget_secs: 1800, + fixture_rows: 100_000, + fixture_spans: 100_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "cold_query", + }, + BenchRegistration { + bench_binary: "trace_service_benchmark", + group_name: "sustained_load", + tier: BenchTier::Tier1, + runtime_budget_secs: 1800, + fixture_rows: 1_000, + fixture_spans: 1_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "write_throughput", + }, + BenchRegistration { + bench_binary: "trace_service_benchmark", + group_name: "query_at_scale", + tier: BenchTier::Tier1, + runtime_budget_secs: 3600, + fixture_rows: 100_000, + fixture_spans: 100_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "selective_lookup", + }, + BenchRegistration { + bench_binary: "trace_service_benchmark", + group_name: "cold_query", + tier: BenchTier::Tier1, + runtime_budget_secs: 1800, + fixture_rows: 10_080, + fixture_spans: 10_080, + storage_profile: P1_LOCAL_NVME, + scenario_class: "cold_query", + }, + BenchRegistration { + bench_binary: "trace_service_benchmark", + group_name: "at_scale_1m", + tier: BenchTier::Tier2, + runtime_budget_secs: 7200, + fixture_rows: 1_000_000, + fixture_spans: 1_000_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "selective_lookup", + }, + BenchRegistration { + bench_binary: "trace_service_benchmark", + group_name: "at_scale_10m", + tier: BenchTier::Tier2, + runtime_budget_secs: 21_600, + fixture_rows: 10_000_000, + fixture_spans: 10_000_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "selective_lookup", + }, + BenchRegistration { + bench_binary: "dataset_benchmark", + group_name: "dataset_write", + tier: BenchTier::Tier1, + runtime_budget_secs: 3600, + fixture_rows: 10_000, + fixture_spans: 0, + storage_profile: P1_LOCAL_NVME, + scenario_class: "dataset_write", + }, + BenchRegistration { + bench_binary: "dataset_benchmark", + group_name: "dataset_query", + tier: BenchTier::Tier1, + runtime_budget_secs: 3600, + fixture_rows: 10_000, + fixture_spans: 0, + storage_profile: P1_LOCAL_NVME, + scenario_class: "bifrost_query", + }, + BenchRegistration { + bench_binary: "hot_path_bench", + group_name: "trace_hot_paths_1m", + tier: BenchTier::Tier2, + runtime_budget_secs: 7200, + fixture_rows: 1_000_000, + fixture_spans: 1_000_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "selective_lookup", + }, + BenchRegistration { + bench_binary: "hot_path_bench", + group_name: "metrics_hot_paths_1m", + tier: BenchTier::Tier2, + runtime_budget_secs: 7200, + fixture_rows: 1_000_000, + fixture_spans: 1_000_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "dashboard", + }, + BenchRegistration { + bench_binary: "planner_bench", + group_name: "planner_queries", + tier: BenchTier::Tier1, + runtime_budget_secs: 3600, + fixture_rows: 1_000_000, + fixture_spans: 1_000_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "planner", + }, + BenchRegistration { + bench_binary: "session_config_bench", + group_name: "session_config_bench", + tier: BenchTier::Tier1, + runtime_budget_secs: 3600, + fixture_rows: 100_000, + fixture_spans: 100_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "session_config", + }, + BenchRegistration { + bench_binary: "stress_test", + group_name: "stress_test", + tier: BenchTier::Tier2, + runtime_budget_secs: 21_600, + fixture_rows: 1_000_000, + fixture_spans: 1_000_000, + storage_profile: P1_LOCAL_NVME, + scenario_class: "soak", + }, + BenchRegistration { + bench_binary: "cloud_backed_runs", + group_name: "p2_object_warm", + tier: BenchTier::Tier2, + runtime_budget_secs: 21_600, + fixture_rows: 0, + fixture_spans: 0, + storage_profile: P2_OBJECT_WARM, + scenario_class: "cloud_object_store", + }, + BenchRegistration { + bench_binary: "cloud_backed_runs", + group_name: "p2_object_cold", + tier: BenchTier::Tier2, + runtime_budget_secs: 21_600, + fixture_rows: 0, + fixture_spans: 0, + storage_profile: P2_OBJECT_COLD, + scenario_class: "cloud_object_store", + }, +]; + +pub fn current_tier() -> BenchTier { + env::var("SCOUTER_BENCH_TIER") + .ok() + .and_then(|value| value.parse::().ok()) + .and_then(BenchTier::from_u8) + .unwrap_or(BenchTier::Tier0) +} + +pub fn tier_for(group_name: &str) -> BenchTier { + REGISTRY + .iter() + .find(|entry| entry.group_name == group_name) + .map(|entry| entry.tier) + .unwrap_or(BenchTier::Tier1) +} + +pub fn registration_for( + bench_binary: &str, + group_name: &str, +) -> Option<&'static BenchRegistration> { + REGISTRY + .iter() + .find(|entry| entry.bench_binary == bench_binary && entry.group_name == group_name) +} + +pub fn registration_or_default( + bench_binary: &'static str, + group_name: &'static str, +) -> BenchRegistration { + registration_for(bench_binary, group_name) + .copied() + .unwrap_or(BenchRegistration { + bench_binary, + group_name, + tier: BenchTier::Tier1, + runtime_budget_secs: 3600, + fixture_rows: 0, + fixture_spans: 0, + storage_profile: P1_LOCAL_NVME, + scenario_class: "unregistered", + }) +} + +pub fn tier_guard(group_name: &'static str) -> bool { + guard_registration(®istration_or_default("unknown", group_name)) +} + +pub fn tier_guard_for(bench_binary: &'static str, group_name: &'static str) -> bool { + guard_registration(®istration_or_default(bench_binary, group_name)) +} + +fn guard_registration(registration: &BenchRegistration) -> bool { + let requested = current_tier(); + if requested == registration.tier { + return true; + } + + eprintln!( + "skipping {}::{}: registered tier {} does not match SCOUTER_BENCH_TIER={}", + registration.bench_binary, + registration.group_name, + registration.tier.as_u8(), + requested.as_u8() + ); + false +} + +pub fn filter_for(tier: BenchTier, bench_binary: &str) -> Result { + let groups: Vec<&str> = REGISTRY + .iter() + .filter(|entry| entry.bench_binary == bench_binary && entry.tier == tier) + .map(|entry| entry.group_name) + .collect::>() + .into_iter() + .collect(); + + if groups.is_empty() { + return Err(format!( + "no tier {} groups registered for {bench_binary}", + tier.as_u8() + )); + } + + Ok(format!("^({})$", groups.join("|"))) +} + +fn repo_root() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .and_then(Path::parent) + .map(Path::to_path_buf) + .unwrap_or_else(|| PathBuf::from(".")) +} + +fn run_metrics_dir() -> PathBuf { + repo_root().join("target").join("bench_metrics") +} + +fn baseline_metrics_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("bench_metrics") +} + +fn read_artifact(path: &Path) -> Result { + let contents = fs::read_to_string(path) + .map_err(|err| format!("failed to read {}: {err}", path.display()))?; + serde_json::from_str(&contents) + .map_err(|err| format!("failed to parse {}: {err}", path.display())) +} + +fn regression_percent(run: u64, baseline: u64) -> Option { + if baseline == 0 { + return (run > 0).then_some(f64::INFINITY); + } + (run > baseline).then(|| ((run - baseline) as f64 / baseline as f64) * 100.0) +} + +fn compare_count( + failures: &mut Vec, + label: &str, + run: u64, + baseline: u64, + tier: BenchTier, +) { + if let Some(percent) = regression_percent(run, baseline) + && percent > 10.0 + && tier == BenchTier::Tier0 + { + failures.push(format!( + "{label} regressed by {percent:.1}%: run={run}, baseline={baseline}" + )); + } +} + +fn compare_span( + failures: &mut Vec, + name: &str, + run: &BenchArtifact, + baseline: &BenchArtifact, + tier: BenchTier, +) { + let Some(run_span) = run.spans.get(name) else { + return; + }; + let Some(base_span) = baseline.spans.get(name) else { + return; + }; + if run_span.count < 10 || base_span.count < 10 { + return; + } + compare_count( + failures, + &format!("spans[{name}].p95_us"), + run_span.p95_us, + base_span.p95_us, + tier, + ); +} + +fn compare_artifact(run_path: &Path, requested_tier: BenchTier) -> Result, String> { + let run = read_artifact(run_path)?; + let Some(run_tier) = BenchTier::from_u8(run.tier) else { + return Err(format!( + "{} has invalid tier {}", + run_path.display(), + run.tier + )); + }; + + if run_tier != requested_tier { + return Ok(vec![format!( + "skipped {} because artifact tier {} does not match requested tier {}", + run.bench_group, + run.tier, + requested_tier.as_u8() + )]); + } + + if run.blocking != (run_tier == BenchTier::Tier0) { + return Err(format!( + "{} has blocking={} but tier={}", + run.bench_group, run.blocking, run.tier + )); + } + + let mut failures = Vec::new(); + let mut notes = Vec::new(); + + if run.actual_runtime_secs > run.runtime_budget_secs as f64 && run_tier == BenchTier::Tier0 { + failures.push(format!( + "{} exceeded runtime budget: {:.2}s > {}s", + run.bench_group, run.actual_runtime_secs, run.runtime_budget_secs + )); + } + + if run.refresh_on_request_path_total > 0 && run_tier == BenchTier::Tier0 { + failures.push(format!( + "{} recorded refresh-on-request count {}", + run.bench_group, run.refresh_on_request_path_total + )); + } + + let baseline_path = baseline_metrics_dir().join(format!("{}.json", run.bench_group)); + if !baseline_path.exists() { + notes.push(format!( + "no committed baseline for {}; comparator did not hard-fail", + run.bench_group + )); + } else { + let baseline = read_artifact(&baseline_path)?; + compare_count( + &mut failures, + "object_store_counts.get_range", + run.object_store_counts.get_range, + baseline.object_store_counts.get_range, + run_tier, + ); + compare_count( + &mut failures, + "object_store_counts.head", + run.object_store_counts.head, + baseline.object_store_counts.head, + run_tier, + ); + compare_count( + &mut failures, + "object_store_counts.list", + run.object_store_counts.list, + baseline.object_store_counts.list, + run_tier, + ); + compare_span(&mut failures, "df.collect", &run, &baseline, run_tier); + compare_span( + &mut failures, + "delta.snapshot.refresh", + &run, + &baseline, + run_tier, + ); + } + + if run_tier != BenchTier::Tier0 && !failures.is_empty() { + notes.push(format!( + "{} is tier {}; comparator refuses to hard-fail non-Tier-0 artifacts", + run.bench_group, run.tier + )); + failures.clear(); + } + + if failures.is_empty() { + Ok(notes) + } else { + Err(failures.join("\n")) + } +} + +fn compare_requested_tier(tier: BenchTier) -> Result<(), String> { + let dir = run_metrics_dir(); + if !dir.exists() { + return Ok(()); + } + + let mut notes = Vec::new(); + let mut compared = 0usize; + for entry in fs::read_dir(&dir).map_err(|err| format!("failed to read {dir:?}: {err}"))? { + let entry = entry.map_err(|err| format!("failed to read dir entry: {err}"))?; + let path = entry.path(); + if path.extension().and_then(|ext| ext.to_str()) != Some("json") { + continue; + } + let Some(stem) = path.file_stem().and_then(|stem| stem.to_str()) else { + continue; + }; + if !REGISTRY + .iter() + .any(|entry| entry.group_name == stem && entry.tier == tier) + { + notes.push(format!( + "skipped {} because it is not registered for tier {}", + path.display(), + tier.as_u8() + )); + continue; + } + compared += 1; + match compare_artifact(&path, tier) { + Ok(artifact_notes) => notes.extend(artifact_notes), + Err(err) if tier != BenchTier::Tier0 => notes.push(format!( + "advisory tier ignored comparator error for {}: {err}", + path.display() + )), + Err(err) => return Err(err), + } + } + + for note in notes { + eprintln!("{note}"); + } + eprintln!( + "bench comparator examined {compared} artifact(s) for tier {}", + tier.as_u8() + ); + Ok(()) +} + +fn parse_tier_arg(args: &[String]) -> Result { + let tier = args + .windows(2) + .find_map(|window| (window[0] == "--tier").then(|| window[1].clone())) + .unwrap_or_else(|| "0".to_string()); + tier.parse::() + .ok() + .and_then(BenchTier::from_u8) + .ok_or_else(|| format!("invalid --tier value {tier}; expected 0, 1, or 2")) +} + +fn parse_bench_arg(args: &[String]) -> Result { + args.windows(2) + .find_map(|window| (window[0] == "--bench").then(|| window[1].clone())) + .ok_or_else(|| "--bench is required".to_string()) +} + +fn main() { + let args: Vec = env::args().collect(); + let invoked = Path::new( + args.first() + .map(String::as_str) + .unwrap_or("bench_tier_filter"), + ) + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or("bench_tier_filter"); + + let result = if invoked.contains("bench_compare") { + parse_tier_arg(&args).and_then(compare_requested_tier) + } else { + parse_tier_arg(&args).and_then(|tier| { + let bench = parse_bench_arg(&args)?; + filter_for(tier, &bench).map(|filter| { + println!("{filter}"); + }) + }) + }; + + if let Err(err) = result { + eprintln!("{err}"); + process::exit(1); + } +} + +#[cfg(test)] +#[allow(unused_imports)] +mod tests { + use super::*; + + #[test] + fn tier0_filter_is_exactly_anchored() { + let filter = filter_for(BenchTier::Tier0, "trace_service_benchmark").unwrap(); + assert_eq!(filter, "^(t0_cold_query_smoke|t0_refresh_origin_sentinel)$"); + } + + #[test] + fn missing_tier_filter_refuses_empty_output() { + let err = filter_for(BenchTier::Tier0, "stress_test").unwrap_err(); + assert!(err.contains("no tier 0 groups")); + } + + #[test] + fn unknown_groups_default_to_tier1() { + assert_eq!(tier_for("not_registered"), BenchTier::Tier1); + } +} diff --git a/crates/scouter_dataframe/benches/trace_service_benchmark.rs b/crates/scouter_dataframe/benches/trace_service_benchmark.rs index 48c84e28c..752162935 100644 --- a/crates/scouter_dataframe/benches/trace_service_benchmark.rs +++ b/crates/scouter_dataframe/benches/trace_service_benchmark.rs @@ -1,12 +1,21 @@ +mod counting_object_store; +mod tiers; +mod utils; + use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; use scouter_dataframe::parquet::tracing::service::TraceSpanService; use scouter_settings::ObjectStorageSettings; use scouter_types::{StorageType, TraceId, TraceSpanRecord}; +use std::collections::BTreeMap; use std::hint::black_box; use std::sync::Arc; use std::time::{Duration, Instant}; +use tiers::ObjectStoreCountSnapshot; use tokio::runtime::Runtime; +const DF_COLLECT_SPAN: &str = "df.collect"; +const DELTA_SNAPSHOT_REFRESH_SPAN: &str = "delta.snapshot.refresh"; + fn generate_trace_batch(num_traces: usize, spans_per_trace: usize) -> Vec { use scouter_mocks::generate_trace_with_spans; (0..num_traces) @@ -18,6 +27,10 @@ fn generate_trace_batch(num_traces: usize, spans_per_trace: usize) -> Vec tiers::SpanMetric { + let micros = duration.as_micros().min(u64::MAX as u128) as u64; + tiers::SpanMetric { + count: 1, + p50_us: micros, + p95_us: micros, + p99_us: micros, + sum_us: micros, + } +} + +fn bench_t0_cold_query_smoke(c: &mut Criterion) { + const GROUP: &str = "t0_cold_query_smoke"; + if !tiers::tier_guard_for("trace_service_benchmark", GROUP) { + return; + } + + use scouter_mocks::generate_trace_with_spans; + + const HOURS: usize = 24; + const SPANS_PER_HOUR: usize = 420; + let setup_start = Instant::now(); + let rt = Runtime::new().unwrap(); + let tmp_dir = tempfile::tempdir().unwrap(); + let storage_settings = ObjectStorageSettings { + storage_uri: tmp_dir.path().to_str().unwrap().to_string(), + storage_type: StorageType::Local, + region: "us-east-1".to_string(), + trace_compaction_interval_hours: 999, + trace_flush_interval_secs: 1, + trace_refresh_interval_secs: 10, + }; + + let (service, ids) = rt.block_on(async { + let service = TraceSpanService::new(&storage_settings, 999, Some(1), None, 10, None) + .await + .unwrap(); + let mut ids = Vec::new(); + for hour in 0..HOURS { + let minutes_offset = (hour as i64) * 60; + let mut hour_spans = Vec::new(); + for _ in 0..SPANS_PER_HOUR / 5 { + let (_record, spans, _tags) = generate_trace_with_spans(5, minutes_offset); + if let Some(first) = spans.first() + && let Ok(id_bytes) = TraceId::hex_to_bytes(&first.trace_id.to_hex()) + { + ids.push(id_bytes); + } + hour_spans.extend(spans); + } + service.write_spans(hour_spans).await.unwrap(); + } + tokio::time::sleep(Duration::from_millis(1500)).await; + service.optimize().await.unwrap(); + (Arc::new(service), Arc::new(ids)) + }); + + let smoke_start = Instant::now(); + rt.block_on(async { + let id = &ids[0]; + let _ = service + .query_service + .query_spans(Some(id), None, None, None, None, None, None, None) + .await + .unwrap(); + }); + let smoke_runtime = smoke_start.elapsed(); + + let mut spans = BTreeMap::new(); + spans.insert(DF_COLLECT_SPAN.to_string(), span_metric(smoke_runtime)); + spans.insert( + DELTA_SNAPSHOT_REFRESH_SPAN.to_string(), + tiers::SpanMetric::default(), + ); + utils::write_bench_artifact( + "trace_service_benchmark", + GROUP, + setup_start.elapsed(), + spans, + ObjectStoreCountSnapshot::default(), + 0, + ); + + c.bench_function(GROUP, |b| { + b.to_async(&rt).iter_custom(|iters| { + let svc = Arc::clone(&service); + let ids = Arc::clone(&ids); + async move { + let start = Instant::now(); + for i in 0..iters { + let id = &ids[i as usize % ids.len()]; + let _ = black_box( + svc.query_service + .query_spans(Some(id), None, None, None, None, None, None, None) + .await + .unwrap(), + ); + } + start.elapsed() + } + }); + }); + + let service = + Arc::try_unwrap(service).unwrap_or_else(|_| panic!("Arc still has multiple owners")); + rt.block_on(async { service.shutdown().await.unwrap() }); + drop(tmp_dir); +} + +fn bench_t0_refresh_origin_sentinel(c: &mut Criterion) { + const GROUP: &str = "t0_refresh_origin_sentinel"; + if !tiers::tier_guard_for("trace_service_benchmark", GROUP) { + return; + } + + let start = Instant::now(); + let mut spans = BTreeMap::new(); + spans.insert(DF_COLLECT_SPAN.to_string(), tiers::SpanMetric::default()); + spans.insert( + DELTA_SNAPSHOT_REFRESH_SPAN.to_string(), + tiers::SpanMetric::default(), + ); + utils::write_bench_artifact( + "trace_service_benchmark", + GROUP, + start.elapsed(), + spans, + ObjectStoreCountSnapshot::default(), + 0, + ); + + c.bench_function(GROUP, |b| { + b.iter(|| black_box(0_u64)); + }); +} + criterion_group!( benches, + bench_t0_cold_query_smoke, + bench_t0_refresh_origin_sentinel, bench_write_throughput, bench_concurrent_writes, bench_query_performance, diff --git a/crates/scouter_dataframe/benches/utils.rs b/crates/scouter_dataframe/benches/utils.rs index 4c66a2e39..483bd7324 100644 --- a/crates/scouter_dataframe/benches/utils.rs +++ b/crates/scouter_dataframe/benches/utils.rs @@ -1,8 +1,20 @@ #![allow(dead_code)] +use crate::tiers::{BenchArtifact, ObjectStoreCountSnapshot, SpanMetric, registration_or_default}; use scouter_mocks::{generate_trace_with_entity, generate_trace_with_spans}; use scouter_types::TraceSpanRecord; -use std::time::Duration; +use std::collections::BTreeMap; +use std::fs; +use std::path::PathBuf; +use std::process::Command; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; +use tracing::field::{Field, Visit}; +use tracing::span::{Attributes, Id, Record}; +use tracing::{Subscriber, warn}; +use tracing_subscriber::Layer; +use tracing_subscriber::layer::Context; +use tracing_subscriber::registry::LookupSpan; /// Create a simple 3-span trace as ingest records (ready for `write_spans()`). pub fn _create_simple_trace() -> Vec { @@ -75,3 +87,280 @@ pub fn print_percentiles(label: &str, p: &Percentiles) { mean = p.mean.as_secs_f64() * 1000.0, ); } + +#[derive(Clone, Debug)] +pub struct BenchSpanCollector { + records: Arc>>, +} + +#[derive(Clone, Debug)] +pub struct SpanRecord { + pub name: String, + pub attrs: Vec<(String, String)>, + pub duration_ns: u64, +} + +#[derive(Debug)] +struct SpanTiming { + name: String, + attrs: Vec<(String, String)>, + start: Instant, +} + +#[derive(Default)] +struct AttrVisitor { + attrs: Vec<(String, String)>, +} + +impl Visit for AttrVisitor { + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + self.attrs + .push((field.name().to_string(), format!("{value:?}"))); + } + + fn record_str(&mut self, field: &Field, value: &str) { + self.attrs + .push((field.name().to_string(), value.to_string())); + } + + fn record_bool(&mut self, field: &Field, value: bool) { + self.attrs + .push((field.name().to_string(), value.to_string())); + } + + fn record_i64(&mut self, field: &Field, value: i64) { + self.attrs + .push((field.name().to_string(), value.to_string())); + } + + fn record_u64(&mut self, field: &Field, value: u64) { + self.attrs + .push((field.name().to_string(), value.to_string())); + } + + fn record_f64(&mut self, field: &Field, value: f64) { + self.attrs + .push((field.name().to_string(), value.to_string())); + } +} + +impl BenchSpanCollector { + pub fn new() -> Self { + Self { + records: Arc::new(Mutex::new(Vec::new())), + } + } + + pub fn records(&self) -> Vec { + self.records + .lock() + .expect("bench span collector mutex poisoned") + .clone() + } + + pub fn summary(&self) -> BTreeMap { + summarize_spans(&self.records()) + } +} + +impl Layer for BenchSpanCollector +where + S: Subscriber, + S: for<'lookup> LookupSpan<'lookup>, +{ + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let Some(span) = ctx.span(id) else { + return; + }; + + let mut visitor = AttrVisitor::default(); + attrs.record(&mut visitor); + span.extensions_mut().insert(SpanTiming { + name: span.metadata().name().to_string(), + attrs: visitor.attrs, + start: Instant::now(), + }); + } + + fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) { + let Some(span) = ctx.span(id) else { + return; + }; + + let mut extensions = span.extensions_mut(); + let Some(timing) = extensions.get_mut::() else { + return; + }; + + let mut visitor = AttrVisitor::default(); + values.record(&mut visitor); + timing.attrs.extend(visitor.attrs); + } + + fn on_close(&self, id: Id, ctx: Context<'_, S>) { + let Some(span) = ctx.span(&id) else { + return; + }; + + let Some(timing) = span.extensions_mut().remove::() else { + return; + }; + + let duration_ns = timing.start.elapsed().as_nanos().min(u64::MAX as u128) as u64; + self.records + .lock() + .expect("bench span collector mutex poisoned") + .push(SpanRecord { + name: timing.name, + attrs: timing.attrs, + duration_ns, + }); + } +} + +pub fn summarize_spans(records: &[SpanRecord]) -> BTreeMap { + let mut by_name: BTreeMap> = BTreeMap::new(); + for record in records { + by_name + .entry(record.name.clone()) + .or_default() + .push(record.duration_ns / 1_000); + } + + by_name + .into_iter() + .map(|(name, mut values)| { + values.sort_unstable(); + let count = values.len() as u64; + let p50_us = percentile_u64(&values, 50.0); + let p95_us = percentile_u64(&values, 95.0); + let p99_us = percentile_u64(&values, 99.0); + let sum_us = values.iter().sum(); + ( + name, + SpanMetric { + count, + p50_us, + p95_us, + p99_us, + sum_us, + }, + ) + }) + .collect() +} + +fn percentile_u64(values: &[u64], percentile: f64) -> u64 { + if values.is_empty() { + return 0; + } + let index = ((percentile / 100.0) * values.len() as f64) as usize; + values[index.min(values.len() - 1)] +} + +pub fn write_bench_artifact( + bench_binary: &'static str, + group_name: &'static str, + actual_runtime: Duration, + spans: BTreeMap, + object_store_counts: ObjectStoreCountSnapshot, + refresh_on_request_path_total: u64, +) { + let registration = registration_or_default(bench_binary, group_name); + let artifact = BenchArtifact { + commit: current_commit(), + bench_group: group_name.to_string(), + tier: registration.tier.as_u8(), + blocking: registration.tier.as_u8() == 0, + scenario_class: registration.scenario_class.to_string(), + runtime_budget_secs: registration.runtime_budget_secs, + actual_runtime_secs: actual_runtime.as_secs_f64(), + fixture_rows: registration.fixture_rows, + fixture_spans: registration.fixture_spans, + storage_profile: registration.storage_profile.to_string(), + spans, + object_store_counts, + refresh_on_request_path_total, + }; + + if let Err(err) = write_artifact(group_name, &artifact) { + warn!(error = %err, bench_group = group_name, "failed to write bench artifact"); + } +} + +fn write_artifact(group_name: &str, artifact: &BenchArtifact) -> Result<(), String> { + let dir = target_metrics_dir(); + fs::create_dir_all(&dir).map_err(|err| format!("failed to create {}: {err}", dir.display()))?; + let path = dir.join(format!("{group_name}.json")); + let json = serde_json::to_string_pretty(artifact) + .map_err(|err| format!("failed to serialize bench artifact: {err}"))?; + fs::write(&path, format!("{json}\n")) + .map_err(|err| format!("failed to write {}: {err}", path.display())) +} + +fn target_metrics_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .and_then(std::path::Path::parent) + .map(|root| root.join("target").join("bench_metrics")) + .unwrap_or_else(|| PathBuf::from("target").join("bench_metrics")) +} + +fn current_commit() -> String { + Command::new("git") + .args(["rev-parse", "HEAD"]) + .output() + .ok() + .and_then(|output| { + output + .status + .success() + .then(|| String::from_utf8_lossy(&output.stdout).trim().to_string()) + }) + .filter(|sha| !sha.is_empty()) + .unwrap_or_else(|| "unknown".to_string()) +} + +#[cfg(test)] +#[allow(unused_imports)] +mod tests { + use super::*; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::registry::Registry; + + #[test] + fn span_collector_summarizes_closed_spans() { + let collector = BenchSpanCollector::new(); + let subscriber = Registry::default().with(collector.clone()); + + tracing::subscriber::with_default(subscriber, || { + let span = tracing::info_span!("df.collect", rows = 10_u64); + let _guard = span.enter(); + }); + + let summary = collector.summary(); + let metric = summary.get("df.collect").unwrap(); + assert_eq!(metric.count, 1); + assert!(metric.sum_us <= metric.p99_us || metric.count == 1); + } + + #[test] + fn span_summary_percentiles_are_stable() { + let records = [1_u64, 2, 3, 4, 5] + .into_iter() + .map(|duration_us| SpanRecord { + name: "delta.snapshot.refresh".to_string(), + attrs: Vec::new(), + duration_ns: duration_us * 1_000, + }) + .collect::>(); + + let summary = summarize_spans(&records); + let metric = summary.get("delta.snapshot.refresh").unwrap(); + assert_eq!(metric.count, 5); + assert_eq!(metric.p50_us, 3); + assert_eq!(metric.p95_us, 5); + assert_eq!(metric.p99_us, 5); + assert_eq!(metric.sum_us, 15); + } +} diff --git a/makefile b/makefile index f0cdb3eab..e35554a22 100644 --- a/makefile +++ b/makefile @@ -120,6 +120,27 @@ test.dataframe.cloud.azure: cargo test -p scouter-dataframe test_trace_service_azure_integration \ --all-features -- --nocapture --test-threads=1 +.PHONY: bench.core +bench.core: + SCOUTER_BENCH_TIER=0 cargo bench -p scouter-dataframe \ + --bench trace_service_benchmark -- "$$(cargo run -q -p scouter-dataframe --bin bench_tier_filter -- --tier 0 --bench trace_service_benchmark)" + SCOUTER_BENCH_TIER=0 cargo bench -p scouter-dataframe \ + --bench hot_path_bench -- "$$(cargo run -q -p scouter-dataframe --bin bench_tier_filter -- --tier 0 --bench hot_path_bench)" + SCOUTER_BENCH_TIER=0 cargo bench -p scouter-dataframe \ + --bench dataset_benchmark -- "$$(cargo run -q -p scouter-dataframe --bin bench_tier_filter -- --tier 0 --bench dataset_benchmark)" + cargo run -q -p scouter-dataframe --bin bench_compare -- --tier 0 + +.PHONY: bench.extended +bench.extended: + SCOUTER_BENCH_TIER=1 cargo bench -p scouter-dataframe + cargo run -q -p scouter-dataframe --bin bench_compare -- --tier 1 + +.PHONY: bench.certification +bench.certification: + SCOUTER_BENCH_TIER=2 SCOUTER_BENCH_STORAGE_PROFILE=$${SCOUTER_BENCH_STORAGE_PROFILE:-P2_object_warm} \ + cargo bench -p scouter-dataframe --bench stress_test --bench trace_service_benchmark + cargo run -q -p scouter-dataframe --bin bench_compare -- --tier 2 + .PHONY: test test: build.all_backends test.needs_sql test.unit build.shutdown From 420df65ada7cd60f772d36337d7a9265408ec0b6 Mon Sep 17 00:00:00 2001 From: Thorrester Date: Wed, 13 May 2026 21:54:48 -0400 Subject: [PATCH 2/2] updating bench --- .../scouter_dataframe/bench_metrics/README.md | 44 +- .../bench_metrics/t0_bifrost_smoke.json | 48 +- .../bench_metrics/t0_cold_query_smoke.json | 153 +++++- .../t0_hot_path_cold_query_smoke.json | 72 ++- .../t0_refresh_origin_sentinel.json | 21 +- .../benches/dataset_benchmark.rs | 76 +-- .../benches/hot_path_bench.rs | 67 ++- crates/scouter_dataframe/benches/tiers.rs | 463 ++++++++++++++++-- .../benches/trace_service_benchmark.rs | 83 ++-- crates/scouter_dataframe/benches/utils.rs | 122 ++++- crates/scouter_events/src/queue/py_queue.rs | 9 +- crates/scouter_events/src/queue/types.rs | 4 + crates/scouter_types/src/agent/profile.rs | 7 + 13 files changed, 945 insertions(+), 224 deletions(-) diff --git a/crates/scouter_dataframe/bench_metrics/README.md b/crates/scouter_dataframe/bench_metrics/README.md index 362b43e22..2fcdb9f0b 100644 --- a/crates/scouter_dataframe/bench_metrics/README.md +++ b/crates/scouter_dataframe/bench_metrics/README.md @@ -1,5 +1,41 @@ -Tier 0 baseline JSON artifacts live in this directory after `make bench.core` -has been proven to complete under the 15 minute Phase 0.6 budget. +# Tier 0 Benchmark Baselines -Benchmark runs write fresh artifacts to `target/bench_metrics/`; CI must not -write into this directory. +This directory contains the committed Tier 0 artifacts produced by `make bench.core`. + +Tier 0 is the blocking OLAP smoke baseline. It is intentionally small enough for PR verification, +but it must still prove the measured path ran: + +- every registered Tier 0 group must write an artifact; +- non-sentinel groups must report measured `bench.query.end_to_end` iterations, not a single + setup probe; +- non-sentinel groups must report `query_entrypoint` and `result_rows`; +- non-sentinel groups must report object-store operations observed through the production + object-store spans; +- `t0_refresh_origin_sentinel` is allowed to report zero workload metrics, because it only guards + that refresh-on-request accounting stays at zero. + +## End-to-end measurement boundary + +`bench.query.end_to_end` is the primary Tier 0 metric. It wraps the public in-process query +entry point and includes the returned batches or domain objects, so future phases can catch +regressions in planning, metadata lookup, snapshot freshness, DataFusion execution, and result +assembly. + +Every non-sentinel Tier 0 artifact must include: + +- `query_entrypoint`: the in-process boundary being measured. +- `result_rows`: the number of returned rows or spans from the probe query. +- `spans["bench.query.end_to_end"]`: at least 10 measured iterations with non-zero total time. + +`df.collect` is diagnostic only. An improvement in `df.collect` cannot mask a +`bench.query.end_to_end.p95_us` regression, though diagnostic regressions still fail Tier 0 when +both baseline and run artifacts carry the span. + +`t0_refresh_origin_sentinel` does not execute a query, so it may omit +`bench.query.end_to_end`, `query_entrypoint`, and `result_rows`. + +Baseline JSON files are refreshed by an explicit reviewer-visible PR after a corrected Tier 0 +Criterion run. CI compares artifacts but never writes committed baselines. + +The comparator hard-fails only Tier 0 artifacts. Tier 1 and Tier 2 artifacts are advisory and are +intended for extended or release certification runs. diff --git a/crates/scouter_dataframe/bench_metrics/t0_bifrost_smoke.json b/crates/scouter_dataframe/bench_metrics/t0_bifrost_smoke.json index 592df24f2..d3875a844 100644 --- a/crates/scouter_dataframe/bench_metrics/t0_bifrost_smoke.json +++ b/crates/scouter_dataframe/bench_metrics/t0_bifrost_smoke.json @@ -1,36 +1,52 @@ { - "commit": "411693235a19c2dc7680102042d9a5f062f8e1d7", + "commit": "553a607685000971ef06678e913db7429d263148", "bench_group": "t0_bifrost_smoke", "tier": 0, "blocking": true, "scenario_class": "bifrost_smoke", "runtime_budget_secs": 120, - "actual_runtime_secs": 1.5295609589999999, + "actual_runtime_secs": 51.151038333, "fixture_rows": 1000, "fixture_spans": 0, "storage_profile": "P1_local_nvme", + "query_entrypoint": "dataset_engine_manager.query", + "result_rows": 256, "spans": { - "delta.snapshot.refresh": { - "count": 0, - "p50_us": 0, - "p95_us": 0, - "p99_us": 0, - "sum_us": 0 + "bench.query.end_to_end": { + "count": 23341, + "p50_us": 927, + "p95_us": 5301, + "p99_us": 27795, + "sum_us": 49786477 }, - "df.collect": { + "log.list": { "count": 1, - "p50_us": 2807, - "p95_us": 2807, - "p99_us": 2807, - "sum_us": 2807 + "p50_us": 212, + "p95_us": 212, + "p99_us": 212, + "sum_us": 212 + }, + "object_store.request": { + "count": 23343, + "p50_us": 2, + "p95_us": 6, + "p99_us": 12, + "sum_us": 119172 + }, + "snap.build": { + "count": 1, + "p50_us": 221, + "p95_us": 221, + "p99_us": 221, + "sum_us": 221 } }, "object_store_counts": { - "list": 0, + "list": 1, "list_with_delimiter": 0, "head": 0, - "get": 0, - "get_range": 0, + "get": 1, + "get_range": 23341, "put": 0, "delete": 0, "copy": 0, diff --git a/crates/scouter_dataframe/bench_metrics/t0_cold_query_smoke.json b/crates/scouter_dataframe/bench_metrics/t0_cold_query_smoke.json index 37aa642db..9f60ed830 100644 --- a/crates/scouter_dataframe/bench_metrics/t0_cold_query_smoke.json +++ b/crates/scouter_dataframe/bench_metrics/t0_cold_query_smoke.json @@ -1,37 +1,158 @@ { - "commit": "411693235a19c2dc7680102042d9a5f062f8e1d7", + "commit": "553a607685000971ef06678e913db7429d263148", "bench_group": "t0_cold_query_smoke", "tier": 0, "blocking": true, "scenario_class": "cold_query", "runtime_budget_secs": 120, - "actual_runtime_secs": 1.55367525, + "actual_runtime_secs": 8.955178292, "fixture_rows": 10080, "fixture_spans": 10080, "storage_profile": "P1_local_nvme", + "query_entrypoint": "trace_query_service.query_spans", + "result_rows": 5, "spans": { + "arrow.convert": { + "count": 2423, + "p50_us": 44, + "p95_us": 57, + "p99_us": 98, + "sum_us": 113536 + }, + "bench.query.end_to_end": { + "count": 2423, + "p50_us": 3498, + "p95_us": 4237, + "p99_us": 4898, + "sum_us": 8610882 + }, + "create_checkpoint_for": { + "count": 1, + "p50_us": 4545, + "p95_us": 4545, + "p99_us": 4545, + "sum_us": 4545 + }, + "delta.catalog.swap": { + "count": 1, + "p50_us": 2, + "p95_us": 2, + "p99_us": 2, + "sum_us": 2 + }, "delta.snapshot.refresh": { - "count": 0, - "p50_us": 0, - "p95_us": 0, - "p99_us": 0, - "sum_us": 0 + "count": 1, + "p50_us": 4791, + "p95_us": 4791, + "p99_us": 4791, + "sum_us": 4791 }, "df.collect": { + "count": 2423, + "p50_us": 2406, + "p95_us": 3024, + "p99_us": 3417, + "sum_us": 5971006 + }, + "df.logical.build": { + "count": 9692, + "p50_us": 5, + "p95_us": 37, + "p99_us": 42, + "sum_us": 109562 + }, + "df.physical.plan": { + "count": 2423, + "p50_us": 884, + "p95_us": 1115, + "p99_us": 1337, + "sum_us": 2206267 + }, + "df.table.resolve": { + "count": 2423, + "p50_us": 22, + "p95_us": 33, + "p99_us": 52, + "sum_us": 57510 + }, + "last_checkpoint.read": { + "count": 1, + "p50_us": 172, + "p95_us": 172, + "p99_us": 172, + "sum_us": 172 + }, + "log.list": { + "count": 2, + "p50_us": 456, + "p95_us": 456, + "p99_us": 456, + "sum_us": 742 + }, + "log_seg.for_snap": { + "count": 1, + "p50_us": 465, + "p95_us": 465, + "p99_us": 465, + "sum_us": 465 + }, + "log_seg.load_p_m": { + "count": 2, + "p50_us": 1535, + "p95_us": 1535, + "p99_us": 1535, + "sum_us": 2427 + }, + "object_store.request": { + "count": 9691, + "p50_us": 1, + "p95_us": 2, + "p99_us": 4, + "sum_us": 13708 + }, + "snap": { + "count": 6, + "p50_us": 3435, + "p95_us": 10894, + "p99_us": 10894, + "sum_us": 24550 + }, + "snap.build": { + "count": 2, + "p50_us": 2064, + "p95_us": 2064, + "p99_us": 2064, + "sum_us": 3670 + }, + "snap.checkpoint": { + "count": 1, + "p50_us": 3425, + "p95_us": 3425, + "p99_us": 3425, + "sum_us": 3425 + }, + "trace.tree.build": { + "count": 2423, + "p50_us": 10, + "p95_us": 14, + "p99_us": 20, + "sum_us": 24851 + }, + "update_incremental": { "count": 1, - "p50_us": 6572, - "p95_us": 6572, - "p99_us": 6572, - "sum_us": 6572 + "p50_us": 4749, + "p95_us": 4749, + "p99_us": 4749, + "sum_us": 4749 } }, "object_store_counts": { - "list": 0, + "list": 3, "list_with_delimiter": 0, - "head": 0, - "get": 0, - "get_range": 0, - "put": 0, + "head": 1, + "get": 4, + "get_range": 9681, + "put": 2, "delete": 0, "copy": 0, "bytes": 0 diff --git a/crates/scouter_dataframe/bench_metrics/t0_hot_path_cold_query_smoke.json b/crates/scouter_dataframe/bench_metrics/t0_hot_path_cold_query_smoke.json index 7fd1aa0d4..ec6a306e5 100644 --- a/crates/scouter_dataframe/bench_metrics/t0_hot_path_cold_query_smoke.json +++ b/crates/scouter_dataframe/bench_metrics/t0_hot_path_cold_query_smoke.json @@ -1,28 +1,72 @@ { - "commit": "411693235a19c2dc7680102042d9a5f062f8e1d7", + "commit": "553a607685000971ef06678e913db7429d263148", "bench_group": "t0_hot_path_cold_query_smoke", "tier": 0, "blocking": true, "scenario_class": "cold_query", "runtime_budget_secs": 120, - "actual_runtime_secs": 0.132162208, + "actual_runtime_secs": 9.834912333, "fixture_rows": 10000, "fixture_spans": 10000, "storage_profile": "P1_local_nvme", + "query_entrypoint": "trace_query_service.query_spans", + "result_rows": 5, "spans": { - "delta.snapshot.refresh": { - "count": 0, - "p50_us": 0, - "p95_us": 0, - "p99_us": 0, - "sum_us": 0 + "arrow.convert": { + "count": 2223, + "p50_us": 43, + "p95_us": 57, + "p99_us": 97, + "sum_us": 101569 + }, + "bench.query.end_to_end": { + "count": 2223, + "p50_us": 4059, + "p95_us": 4956, + "p99_us": 7507, + "sum_us": 9469517 }, "df.collect": { - "count": 1, - "p50_us": 5425, - "p95_us": 5425, - "p99_us": 5425, - "sum_us": 5425 + "count": 2223, + "p50_us": 2797, + "p95_us": 3462, + "p99_us": 4834, + "sum_us": 6512581 + }, + "df.logical.build": { + "count": 17784, + "p50_us": 1, + "p95_us": 33, + "p99_us": 39, + "sum_us": 108283 + }, + "df.physical.plan": { + "count": 2223, + "p50_us": 1050, + "p95_us": 1392, + "p99_us": 2041, + "sum_us": 2530619 + }, + "df.table.resolve": { + "count": 2223, + "p50_us": 22, + "p95_us": 34, + "p99_us": 67, + "sum_us": 52642 + }, + "object_store.request": { + "count": 8892, + "p50_us": 1, + "p95_us": 2, + "p99_us": 3, + "sum_us": 6310 + }, + "trace.tree.build": { + "count": 2223, + "p50_us": 10, + "p95_us": 14, + "p99_us": 22, + "sum_us": 23339 } }, "object_store_counts": { @@ -30,7 +74,7 @@ "list_with_delimiter": 0, "head": 0, "get": 0, - "get_range": 0, + "get_range": 8892, "put": 0, "delete": 0, "copy": 0, diff --git a/crates/scouter_dataframe/bench_metrics/t0_refresh_origin_sentinel.json b/crates/scouter_dataframe/bench_metrics/t0_refresh_origin_sentinel.json index ba487e720..f7e57ecbf 100644 --- a/crates/scouter_dataframe/bench_metrics/t0_refresh_origin_sentinel.json +++ b/crates/scouter_dataframe/bench_metrics/t0_refresh_origin_sentinel.json @@ -1,30 +1,15 @@ { - "commit": "411693235a19c2dc7680102042d9a5f062f8e1d7", + "commit": "553a607685000971ef06678e913db7429d263148", "bench_group": "t0_refresh_origin_sentinel", "tier": 0, "blocking": true, "scenario_class": "refresh_origin_sentinel", "runtime_budget_secs": 30, - "actual_runtime_secs": 4.59e-7, + "actual_runtime_secs": 9.58e-7, "fixture_rows": 0, "fixture_spans": 0, "storage_profile": "P1_local_nvme", - "spans": { - "delta.snapshot.refresh": { - "count": 0, - "p50_us": 0, - "p95_us": 0, - "p99_us": 0, - "sum_us": 0 - }, - "df.collect": { - "count": 0, - "p50_us": 0, - "p95_us": 0, - "p99_us": 0, - "sum_us": 0 - } - }, + "spans": {}, "object_store_counts": { "list": 0, "list_with_delimiter": 0, diff --git a/crates/scouter_dataframe/benches/dataset_benchmark.rs b/crates/scouter_dataframe/benches/dataset_benchmark.rs index 4f475a1f9..3c3764274 100644 --- a/crates/scouter_dataframe/benches/dataset_benchmark.rs +++ b/crates/scouter_dataframe/benches/dataset_benchmark.rs @@ -10,15 +10,11 @@ use scouter_dataframe::parquet::bifrost::manager::DatasetEngineManager; use scouter_settings::ObjectStorageSettings; use scouter_types::StorageType; use scouter_types::dataset::{DatasetFingerprint, DatasetNamespace, DatasetRegistration}; -use std::collections::BTreeMap; use std::hint::black_box; use std::sync::Arc; use std::time::{Duration, Instant}; -use tiers::ObjectStoreCountSnapshot; use tokio::runtime::Runtime; - -const DF_COLLECT_SPAN: &str = "df.collect"; -const DELTA_SNAPSHOT_REFRESH_SPAN: &str = "delta.snapshot.refresh"; +use tracing::Instrument; fn bench_schema() -> Schema { Schema::new(vec![ @@ -194,24 +190,13 @@ fn bench_query(c: &mut Criterion) { }); } -fn span_metric(duration: Duration) -> tiers::SpanMetric { - let micros = duration.as_micros().min(u64::MAX as u128) as u64; - tiers::SpanMetric { - count: 1, - p50_us: micros, - p95_us: micros, - p99_us: micros, - sum_us: micros, - } -} - fn bench_t0_bifrost_smoke(c: &mut Criterion) { const GROUP: &str = "t0_bifrost_smoke"; if !tiers::tier_guard_for("dataset_benchmark", GROUP) { return; } - let setup_start = Instant::now(); + let collector = utils::install_bench_span_collector(); let rt = Runtime::new().unwrap(); let dir = tempfile::tempdir().unwrap(); let schema = bench_schema(); @@ -230,36 +215,53 @@ fn bench_t0_bifrost_smoke(c: &mut Criterion) { (Arc::new(manager), reg.namespace.clone()) }); let fqn = namespace.fqn(); - let sql = format!("SELECT COUNT(*) as cnt FROM {fqn}"); + let sql = format!("SELECT * FROM {fqn} LIMIT 256"); - let smoke_start = Instant::now(); - rt.block_on(async { - let _ = manager.query(&sql).await.unwrap(); + // Probe once so fixture or query failures fail before Criterion starts measuring. + let probe_rows = rt.block_on(async { + let batches = manager.query(&sql).await.unwrap(); + batches.iter().map(|batch| batch.num_rows()).sum::() as u64 }); - let smoke_runtime = smoke_start.elapsed(); - let mut spans = BTreeMap::new(); - spans.insert(DF_COLLECT_SPAN.to_string(), span_metric(smoke_runtime)); - spans.insert( - DELTA_SNAPSHOT_REFRESH_SPAN.to_string(), - tiers::SpanMetric::default(), - ); + let object_store_start = collector.records_len(); + let collector_start = collector.records_len(); + let bench_start = Instant::now(); + let manager_for_bench = Arc::clone(&manager); + let sql_for_bench = sql.clone(); + c.bench_function(GROUP, |b| { + b.to_async(&rt).iter_custom(|iters| { + let mgr = Arc::clone(&manager_for_bench); + let sql = sql_for_bench.clone(); + async move { + let start = Instant::now(); + for _ in 0..iters { + let _ = black_box( + mgr.query(&sql) + .instrument(tracing::info_span!(tiers::END_TO_END_SPAN)) + .await + .unwrap(), + ); + } + start.elapsed() + } + }); + }); + + let actual_runtime = bench_start.elapsed(); + let spans = utils::summarize_spans(&collector.records_since(collector_start)); + let object_store_counts = collector.object_store_counts_since(object_store_start); utils::write_bench_artifact( "dataset_benchmark", GROUP, - setup_start.elapsed(), + actual_runtime, spans, - ObjectStoreCountSnapshot::default(), + object_store_counts, 0, + Some("dataset_engine_manager.query"), + Some(probe_rows), ); - c.bench_function(GROUP, |b| { - let mgr = Arc::clone(&manager); - let sql = sql.clone(); - b.to_async(&rt) - .iter(|| async { black_box(mgr.query(&sql).await.unwrap()) }); - }); - + drop(manager_for_bench); rt.block_on(async { Arc::try_unwrap(manager) .unwrap_or_else(|_| panic!("manager still referenced")) diff --git a/crates/scouter_dataframe/benches/hot_path_bench.rs b/crates/scouter_dataframe/benches/hot_path_bench.rs index f80405a67..e820e2e83 100644 --- a/crates/scouter_dataframe/benches/hot_path_bench.rs +++ b/crates/scouter_dataframe/benches/hot_path_bench.rs @@ -10,20 +10,17 @@ use scouter_types::{ Attribute, FilterClause, SpanId, StorageType, TraceId, TraceMetricsRequest, TraceSpanRecord, }; use serde_json::json; -use std::collections::BTreeMap; use std::hint::black_box; use std::sync::Arc; use std::time::{Duration, Instant}; -use tiers::ObjectStoreCountSnapshot; use tokio::runtime::Runtime; +use tracing::Instrument; const TOTAL_SPANS: usize = 1_000_000; const SPANS_PER_TRACE: usize = 5; const WRITE_CHUNK_SPANS: usize = 50_000; const SERVICE_COUNT: usize = 20; const HOT_SERVICE: &str = "service_03"; -const DF_COLLECT_SPAN: &str = "df.collect"; -const DELTA_SNAPSHOT_REFRESH_SPAN: &str = "delta.snapshot.refresh"; #[derive(Clone)] struct HotFixture { @@ -183,7 +180,7 @@ async fn seed_small_fixture(settings: &ObjectStorageSettings, total_spans: usize } service.optimize().await.unwrap(); - let trace_idx = 24 * 100 + 3; + let trace_idx = 24 * 50 + 3; let trace_start = base_time + chrono::Duration::hours((trace_idx % 24) as i64) + chrono::Duration::milliseconds(((trace_idx / 24) % 3_600_000) as i64); @@ -465,32 +462,21 @@ fn benchmark_trace_hot_paths(c: &mut Criterion) { drop(tmp_dir); } -fn span_metric(duration: Duration) -> tiers::SpanMetric { - let micros = duration.as_micros().min(u64::MAX as u128) as u64; - tiers::SpanMetric { - count: 1, - p50_us: micros, - p95_us: micros, - p99_us: micros, - sum_us: micros, - } -} - fn benchmark_t0_cold_query_smoke(c: &mut Criterion) { const GROUP: &str = "t0_hot_path_cold_query_smoke"; if !tiers::tier_guard_for("hot_path_bench", GROUP) { return; } - let setup_start = Instant::now(); + let collector = utils::install_bench_span_collector(); let rt = Runtime::new().unwrap(); let tmp_dir = tempfile::tempdir().unwrap(); let settings = storage_settings(tmp_dir.path().to_string_lossy().to_string()); let fixture = rt.block_on(seed_small_fixture(&settings, 10_000)); - let smoke_start = Instant::now(); - rt.block_on(async { - let _ = fixture + // Probe once so fixture or query failures fail before Criterion starts measuring. + let probe_rows = rt.block_on(async { + fixture .service .query_service .query_spans( @@ -504,28 +490,17 @@ fn benchmark_t0_cold_query_smoke(c: &mut Criterion) { None, ) .await - .unwrap(); + .unwrap() + .len() as u64 }); - let smoke_runtime = smoke_start.elapsed(); - - let mut spans = BTreeMap::new(); - spans.insert(DF_COLLECT_SPAN.to_string(), span_metric(smoke_runtime)); - spans.insert( - DELTA_SNAPSHOT_REFRESH_SPAN.to_string(), - tiers::SpanMetric::default(), - ); - utils::write_bench_artifact( - "hot_path_bench", - GROUP, - setup_start.elapsed(), - spans, - ObjectStoreCountSnapshot::default(), - 0, - ); + let object_store_start = collector.records_len(); + let collector_start = collector.records_len(); + let bench_start = Instant::now(); + let fixture_for_bench = fixture.clone(); c.bench_function(GROUP, |b| { b.to_async(&rt).iter_custom(|iters| { - let fixture = fixture.clone(); + let fixture = fixture_for_bench.clone(); async move { let start = Instant::now(); for _ in 0..iters { @@ -543,6 +518,7 @@ fn benchmark_t0_cold_query_smoke(c: &mut Criterion) { Some(&(fixture.trace_start + chrono::Duration::minutes(1))), None, ) + .instrument(tracing::info_span!(tiers::END_TO_END_SPAN)) .await .unwrap(), ); @@ -552,6 +528,21 @@ fn benchmark_t0_cold_query_smoke(c: &mut Criterion) { }); }); + let actual_runtime = bench_start.elapsed(); + let spans = utils::summarize_spans(&collector.records_since(collector_start)); + let object_store_counts = collector.object_store_counts_since(object_store_start); + utils::write_bench_artifact( + "hot_path_bench", + GROUP, + actual_runtime, + spans, + object_store_counts, + 0, + Some("trace_query_service.query_spans"), + Some(probe_rows), + ); + + drop(fixture_for_bench); let service = Arc::try_unwrap(fixture.service) .unwrap_or_else(|_| panic!("Arc still has multiple owners")); rt.block_on(async { service.shutdown().await.unwrap() }); diff --git a/crates/scouter_dataframe/benches/tiers.rs b/crates/scouter_dataframe/benches/tiers.rs index 2dd3f2fb9..c527c01e9 100644 --- a/crates/scouter_dataframe/benches/tiers.rs +++ b/crates/scouter_dataframe/benches/tiers.rs @@ -64,6 +64,19 @@ pub struct ObjectStoreCountSnapshot { pub bytes: u64, } +impl ObjectStoreCountSnapshot { + pub fn total_operations(&self) -> u64 { + self.list + + self.list_with_delimiter + + self.head + + self.get + + self.get_range + + self.put + + self.delete + + self.copy + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BenchArtifact { pub commit: String, @@ -76,12 +89,20 @@ pub struct BenchArtifact { pub fixture_rows: u64, pub fixture_spans: u64, pub storage_profile: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub query_entrypoint: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub result_rows: Option, pub spans: BTreeMap, pub object_store_counts: ObjectStoreCountSnapshot, #[serde(default)] pub refresh_on_request_path_total: u64, } +pub const END_TO_END_SPAN: &str = "bench.query.end_to_end"; +pub const DF_COLLECT_SPAN: &str = "df.collect"; +pub const DELTA_SNAPSHOT_REFRESH_SPAN: &str = "delta.snapshot.refresh"; + pub const P1_LOCAL_NVME: &str = "P1_local_nvme"; pub const P2_OBJECT_WARM: &str = "P2_object_warm"; pub const P2_OBJECT_COLD: &str = "P2_object_cold"; @@ -432,6 +453,46 @@ fn compare_count( } } +fn end_to_end_count(artifact: &BenchArtifact) -> Option { + artifact + .spans + .get(END_TO_END_SPAN) + .map(|metric| metric.count) +} + +fn compare_rate( + failures: &mut Vec, + label: &str, + run: u64, + run_denominator: u64, + baseline: u64, + baseline_denominator: u64, + tier: BenchTier, +) { + if run_denominator == 0 || baseline_denominator == 0 { + compare_count(failures, label, run, baseline, tier); + return; + } + + let run_rate = run as f64 / run_denominator as f64; + let baseline_rate = baseline as f64 / baseline_denominator as f64; + if baseline_rate == 0.0 { + if run_rate > 0.0 && tier == BenchTier::Tier0 { + failures.push(format!( + "{label} rate regressed from zero: run={run_rate:.4}, baseline=0.0000" + )); + } + return; + } + + let percent = ((run_rate - baseline_rate) / baseline_rate) * 100.0; + if percent > 10.0 && tier == BenchTier::Tier0 { + failures.push(format!( + "{label} rate regressed by {percent:.1}%: run={run_rate:.4}, baseline={baseline_rate:.4}" + )); + } +} + fn compare_span( failures: &mut Vec, name: &str, @@ -457,6 +518,54 @@ fn compare_span( ); } +fn compare_artifacts( + failures: &mut Vec, + run: &BenchArtifact, + baseline: &BenchArtifact, + tier: BenchTier, +) { + if baseline.spans.contains_key(END_TO_END_SPAN) && !run.spans.contains_key(END_TO_END_SPAN) { + failures.push(format!( + "{} missing primary {END_TO_END_SPAN} metric present in baseline", + run.bench_group + )); + } + + compare_span(failures, END_TO_END_SPAN, run, baseline, tier); + + let run_end_to_end_count = end_to_end_count(run).unwrap_or(0); + let baseline_end_to_end_count = end_to_end_count(baseline).unwrap_or(0); + compare_span(failures, DELTA_SNAPSHOT_REFRESH_SPAN, run, baseline, tier); + compare_span(failures, DF_COLLECT_SPAN, run, baseline, tier); + compare_rate( + failures, + "object_store_counts.get_range", + run.object_store_counts.get_range, + run_end_to_end_count, + baseline.object_store_counts.get_range, + baseline_end_to_end_count, + tier, + ); + compare_rate( + failures, + "object_store_counts.head", + run.object_store_counts.head, + run_end_to_end_count, + baseline.object_store_counts.head, + baseline_end_to_end_count, + tier, + ); + compare_rate( + failures, + "object_store_counts.list", + run.object_store_counts.list, + run_end_to_end_count, + baseline.object_store_counts.list, + baseline_end_to_end_count, + tier, + ); +} + fn compare_artifact(run_path: &Path, requested_tier: BenchTier) -> Result, String> { let run = read_artifact(run_path)?; let Some(run_tier) = BenchTier::from_u8(run.tier) else { @@ -486,6 +595,8 @@ fn compare_artifact(run_path: &Path, requested_tier: BenchTier) -> Result run.runtime_budget_secs as f64 && run_tier == BenchTier::Tier0 { failures.push(format!( "{} exceeded runtime budget: {:.2}s > {}s", @@ -508,35 +619,7 @@ fn compare_artifact(run_path: &Path, requested_tier: BenchTier) -> Result Result, +) { + if run_tier != BenchTier::Tier0 { + return; + } + + let Some(registration) = REGISTRY + .iter() + .find(|entry| entry.group_name == run.bench_group && entry.tier == BenchTier::Tier0) + else { + failures.push(format!( + "{} is not registered as a Tier 0 benchmark group", + run.bench_group + )); + return; + }; + + if run.fixture_rows != registration.fixture_rows { + failures.push(format!( + "{} reported fixture_rows={} but registry expects {}", + run.bench_group, run.fixture_rows, registration.fixture_rows + )); + } + + if run.fixture_spans != registration.fixture_spans { + failures.push(format!( + "{} reported fixture_spans={} but registry expects {}", + run.bench_group, run.fixture_spans, registration.fixture_spans + )); + } + + if run.scenario_class != registration.scenario_class { + failures.push(format!( + "{} reported scenario_class={} but registry expects {}", + run.bench_group, run.scenario_class, registration.scenario_class + )); + } + + if run.storage_profile != registration.storage_profile { + failures.push(format!( + "{} reported storage_profile={} but registry expects {}", + run.bench_group, run.storage_profile, registration.storage_profile + )); + } + + if run.bench_group == "t0_refresh_origin_sentinel" { + if run.refresh_on_request_path_total != 0 { + failures.push(format!( + "{} recorded refresh_on_request_path_total={}", + run.bench_group, run.refresh_on_request_path_total + )); + } + return; + } + + if run.query_entrypoint.is_none() { + failures.push(format!("{} missing query_entrypoint", run.bench_group)); + } + + match run.result_rows { + Some(rows) if rows > 0 => {} + _ => failures.push(format!("{} missing or zero result_rows", run.bench_group)), + } + + match run.spans.get(END_TO_END_SPAN) { + Some(metric) if metric.count >= 10 && metric.sum_us > 0 => {} + Some(metric) => failures.push(format!( + "{} did not record a measured {END_TO_END_SPAN} workload: count={}, sum_us={}", + run.bench_group, metric.count, metric.sum_us + )), + None => failures.push(format!( + "{} did not record required {END_TO_END_SPAN} span metrics", + run.bench_group + )), + } + + if run.actual_runtime_secs <= 0.0 { + failures.push(format!( + "{} reported non-positive actual_runtime_secs={}", + run.bench_group, run.actual_runtime_secs + )); + } + + if run.object_store_counts.total_operations() == 0 { + failures.push(format!( + "{} did not record any object-store operations; this is a smoke artifact, not a baseline", + run.bench_group + )); + } +} + fn compare_requested_tier(tier: BenchTier) -> Result<(), String> { let dir = run_metrics_dir(); if !dir.exists() { + if tier == BenchTier::Tier0 { + return Err(format!( + "missing Tier 0 bench artifacts directory {}; run make bench.core before comparing", + dir.display() + )); + } return Ok(()); } let mut notes = Vec::new(); let mut compared = 0usize; + let mut seen = BTreeSet::new(); for entry in fs::read_dir(&dir).map_err(|err| format!("failed to read {dir:?}: {err}"))? { let entry = entry.map_err(|err| format!("failed to read dir entry: {err}"))?; let path = entry.path(); @@ -583,6 +767,7 @@ fn compare_requested_tier(tier: BenchTier) -> Result<(), String> { continue; } compared += 1; + seen.insert(stem.to_string()); match compare_artifact(&path, tier) { Ok(artifact_notes) => notes.extend(artifact_notes), Err(err) if tier != BenchTier::Tier0 => notes.push(format!( @@ -593,6 +778,21 @@ fn compare_requested_tier(tier: BenchTier) -> Result<(), String> { } } + if tier == BenchTier::Tier0 { + let missing = REGISTRY + .iter() + .filter(|entry| entry.tier == BenchTier::Tier0) + .map(|entry| entry.group_name) + .filter(|group_name| !seen.contains(*group_name)) + .collect::>(); + if !missing.is_empty() { + return Err(format!( + "missing Tier 0 bench artifact(s): {}", + missing.join(", ") + )); + } + } + for note in notes { eprintln!("{note}"); } @@ -653,6 +853,44 @@ fn main() { mod tests { use super::*; + fn tier0_artifact(group_name: &str) -> BenchArtifact { + let registration = REGISTRY + .iter() + .find(|entry| entry.group_name == group_name && entry.tier == BenchTier::Tier0) + .unwrap(); + let mut spans = BTreeMap::new(); + spans.insert( + END_TO_END_SPAN.to_string(), + SpanMetric { + count: 10, + p50_us: 100, + p95_us: 150, + p99_us: 200, + sum_us: 1_200, + }, + ); + BenchArtifact { + commit: "test".to_string(), + bench_group: group_name.to_string(), + tier: BenchTier::Tier0.as_u8(), + blocking: true, + scenario_class: registration.scenario_class.to_string(), + runtime_budget_secs: registration.runtime_budget_secs, + actual_runtime_secs: 1.0, + fixture_rows: registration.fixture_rows, + fixture_spans: registration.fixture_spans, + storage_profile: registration.storage_profile.to_string(), + query_entrypoint: Some("test_entrypoint".to_string()), + result_rows: Some(10), + spans, + object_store_counts: ObjectStoreCountSnapshot { + list: 1, + ..ObjectStoreCountSnapshot::default() + }, + refresh_on_request_path_total: 0, + } + } + #[test] fn tier0_filter_is_exactly_anchored() { let filter = filter_for(BenchTier::Tier0, "trace_service_benchmark").unwrap(); @@ -669,4 +907,171 @@ mod tests { fn unknown_groups_default_to_tier1() { assert_eq!(tier_for("not_registered"), BenchTier::Tier1); } + + #[test] + fn tier0_artifact_rejects_single_end_to_end_probe() { + let mut artifact = tier0_artifact("t0_hot_path_cold_query_smoke"); + artifact.spans.insert( + END_TO_END_SPAN.to_string(), + SpanMetric { + count: 1, + p50_us: 5, + p95_us: 5, + p99_us: 5, + sum_us: 5, + }, + ); + + let mut failures = Vec::new(); + validate_artifact_execution(&artifact, BenchTier::Tier0, &mut failures); + + assert!( + failures.iter().any(|failure| failure + .contains("did not record a measured bench.query.end_to_end workload")), + "{failures:?}" + ); + } + + #[test] + fn tier0_artifact_rejects_missing_bench_query_end_to_end() { + let mut artifact = tier0_artifact("t0_hot_path_cold_query_smoke"); + artifact.spans.remove(END_TO_END_SPAN); + let mut failures = Vec::new(); + validate_artifact_execution(&artifact, BenchTier::Tier0, &mut failures); + assert!( + failures + .iter() + .any(|failure| failure.contains("bench.query.end_to_end")), + "{failures:?}" + ); + } + + #[test] + fn tier0_artifact_rejects_missing_query_entrypoint() { + let mut artifact = tier0_artifact("t0_hot_path_cold_query_smoke"); + artifact.query_entrypoint = None; + let mut failures = Vec::new(); + validate_artifact_execution(&artifact, BenchTier::Tier0, &mut failures); + assert!( + failures + .iter() + .any(|failure| failure.contains("query_entrypoint")), + "{failures:?}" + ); + } + + #[test] + fn tier0_artifact_rejects_missing_result_rows() { + let mut artifact = tier0_artifact("t0_hot_path_cold_query_smoke"); + artifact.result_rows = None; + let mut failures = Vec::new(); + validate_artifact_execution(&artifact, BenchTier::Tier0, &mut failures); + assert!( + failures + .iter() + .any(|failure| failure.contains("result_rows")), + "{failures:?}" + ); + } + + #[test] + fn tier0_artifact_rejects_missing_object_store_counts() { + let mut artifact = tier0_artifact("t0_hot_path_cold_query_smoke"); + artifact.object_store_counts = ObjectStoreCountSnapshot::default(); + + let mut failures = Vec::new(); + validate_artifact_execution(&artifact, BenchTier::Tier0, &mut failures); + + assert!( + failures + .iter() + .any(|failure| failure.contains("did not record any object-store operations")), + "{failures:?}" + ); + } + + #[test] + fn tier0_refresh_origin_sentinel_allows_zero_workload_metrics() { + let mut artifact = tier0_artifact("t0_refresh_origin_sentinel"); + artifact.spans.clear(); + artifact.object_store_counts = ObjectStoreCountSnapshot::default(); + + let mut failures = Vec::new(); + validate_artifact_execution(&artifact, BenchTier::Tier0, &mut failures); + + assert!(failures.is_empty(), "{failures:?}"); + } + + #[test] + fn tier0_end_to_end_regression_fails_even_if_df_collect_improves() { + let mut baseline = tier0_artifact("t0_hot_path_cold_query_smoke"); + baseline.spans.insert( + END_TO_END_SPAN.to_string(), + SpanMetric { + count: 10, + p50_us: 100, + p95_us: 100, + p99_us: 100, + sum_us: 1_000, + }, + ); + baseline.spans.insert( + DF_COLLECT_SPAN.to_string(), + SpanMetric { + count: 10, + p50_us: 80, + p95_us: 80, + p99_us: 80, + sum_us: 800, + }, + ); + + let mut run = baseline.clone(); + run.spans.insert( + END_TO_END_SPAN.to_string(), + SpanMetric { + count: 10, + p50_us: 130, + p95_us: 130, + p99_us: 130, + sum_us: 1_300, + }, + ); + run.spans.insert( + DF_COLLECT_SPAN.to_string(), + SpanMetric { + count: 10, + p50_us: 40, + p95_us: 40, + p99_us: 40, + sum_us: 400, + }, + ); + + let mut failures = Vec::new(); + compare_artifacts(&mut failures, &run, &baseline, BenchTier::Tier0); + + assert!( + failures + .iter() + .any(|failure| failure.contains("bench.query.end_to_end")), + "{failures:?}" + ); + } + + #[test] + fn object_store_comparison_uses_rate_per_end_to_end() { + let mut failures = Vec::new(); + compare_rate( + &mut failures, + "object_store_counts.get_range", + 2_000, + 1_000, + 1_000, + 500, + BenchTier::Tier0, + ); + + assert!(failures.is_empty(), "{failures:?}"); + } } diff --git a/crates/scouter_dataframe/benches/trace_service_benchmark.rs b/crates/scouter_dataframe/benches/trace_service_benchmark.rs index 752162935..84f196d9c 100644 --- a/crates/scouter_dataframe/benches/trace_service_benchmark.rs +++ b/crates/scouter_dataframe/benches/trace_service_benchmark.rs @@ -6,15 +6,11 @@ use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_m use scouter_dataframe::parquet::tracing::service::TraceSpanService; use scouter_settings::ObjectStorageSettings; use scouter_types::{StorageType, TraceId, TraceSpanRecord}; -use std::collections::BTreeMap; use std::hint::black_box; use std::sync::Arc; use std::time::{Duration, Instant}; -use tiers::ObjectStoreCountSnapshot; use tokio::runtime::Runtime; - -const DF_COLLECT_SPAN: &str = "df.collect"; -const DELTA_SNAPSHOT_REFRESH_SPAN: &str = "delta.snapshot.refresh"; +use tracing::Instrument; fn generate_trace_batch(num_traces: usize, spans_per_trace: usize) -> Vec { use scouter_mocks::generate_trace_with_spans; @@ -888,28 +884,17 @@ fn bench_at_scale_10m(c: &mut Criterion) { drop(tmp_dir); } -fn span_metric(duration: Duration) -> tiers::SpanMetric { - let micros = duration.as_micros().min(u64::MAX as u128) as u64; - tiers::SpanMetric { - count: 1, - p50_us: micros, - p95_us: micros, - p99_us: micros, - sum_us: micros, - } -} - fn bench_t0_cold_query_smoke(c: &mut Criterion) { const GROUP: &str = "t0_cold_query_smoke"; if !tiers::tier_guard_for("trace_service_benchmark", GROUP) { return; } + let collector = utils::install_bench_span_collector(); use scouter_mocks::generate_trace_with_spans; const HOURS: usize = 24; const SPANS_PER_HOUR: usize = 420; - let setup_start = Instant::now(); let rt = Runtime::new().unwrap(); let tmp_dir = tempfile::tempdir().unwrap(); let storage_settings = ObjectStorageSettings { @@ -945,36 +930,26 @@ fn bench_t0_cold_query_smoke(c: &mut Criterion) { (Arc::new(service), Arc::new(ids)) }); - let smoke_start = Instant::now(); - rt.block_on(async { + // Probe once so setup failures fail before Criterion starts measuring. + let probe_rows = rt.block_on(async { let id = &ids[0]; - let _ = service + service .query_service .query_spans(Some(id), None, None, None, None, None, None, None) .await - .unwrap(); + .unwrap() + .len() as u64 }); - let smoke_runtime = smoke_start.elapsed(); - - let mut spans = BTreeMap::new(); - spans.insert(DF_COLLECT_SPAN.to_string(), span_metric(smoke_runtime)); - spans.insert( - DELTA_SNAPSHOT_REFRESH_SPAN.to_string(), - tiers::SpanMetric::default(), - ); - utils::write_bench_artifact( - "trace_service_benchmark", - GROUP, - setup_start.elapsed(), - spans, - ObjectStoreCountSnapshot::default(), - 0, - ); + let object_store_start = collector.records_len(); + let collector_start = collector.records_len(); + let bench_start = Instant::now(); + let service_for_bench = Arc::clone(&service); + let ids_for_bench = Arc::clone(&ids); c.bench_function(GROUP, |b| { b.to_async(&rt).iter_custom(|iters| { - let svc = Arc::clone(&service); - let ids = Arc::clone(&ids); + let svc = Arc::clone(&service_for_bench); + let ids = Arc::clone(&ids_for_bench); async move { let start = Instant::now(); for i in 0..iters { @@ -982,6 +957,7 @@ fn bench_t0_cold_query_smoke(c: &mut Criterion) { let _ = black_box( svc.query_service .query_spans(Some(id), None, None, None, None, None, None, None) + .instrument(tracing::info_span!(tiers::END_TO_END_SPAN)) .await .unwrap(), ); @@ -991,6 +967,22 @@ fn bench_t0_cold_query_smoke(c: &mut Criterion) { }); }); + let actual_runtime = bench_start.elapsed(); + let spans = utils::summarize_spans(&collector.records_since(collector_start)); + let object_store_counts = collector.object_store_counts_since(object_store_start); + utils::write_bench_artifact( + "trace_service_benchmark", + GROUP, + actual_runtime, + spans, + object_store_counts, + 0, + Some("trace_query_service.query_spans"), + Some(probe_rows), + ); + + drop(service_for_bench); + drop(ids_for_bench); let service = Arc::try_unwrap(service).unwrap_or_else(|_| panic!("Arc still has multiple owners")); rt.block_on(async { service.shutdown().await.unwrap() }); @@ -1003,20 +995,19 @@ fn bench_t0_refresh_origin_sentinel(c: &mut Criterion) { return; } + let collector = utils::install_bench_span_collector(); + let object_store_start = collector.records_len(); let start = Instant::now(); - let mut spans = BTreeMap::new(); - spans.insert(DF_COLLECT_SPAN.to_string(), tiers::SpanMetric::default()); - spans.insert( - DELTA_SNAPSHOT_REFRESH_SPAN.to_string(), - tiers::SpanMetric::default(), - ); + let spans = utils::summarize_spans(&collector.records_since(object_store_start)); utils::write_bench_artifact( "trace_service_benchmark", GROUP, start.elapsed(), spans, - ObjectStoreCountSnapshot::default(), + collector.object_store_counts_since(object_store_start), 0, + None, + None, ); c.bench_function(GROUP, |b| { diff --git a/crates/scouter_dataframe/benches/utils.rs b/crates/scouter_dataframe/benches/utils.rs index 483bd7324..662a6d217 100644 --- a/crates/scouter_dataframe/benches/utils.rs +++ b/crates/scouter_dataframe/benches/utils.rs @@ -7,14 +7,21 @@ use std::collections::BTreeMap; use std::fs; use std::path::PathBuf; use std::process::Command; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, OnceLock}; use std::time::{Duration, Instant}; use tracing::field::{Field, Visit}; use tracing::span::{Attributes, Id, Record}; use tracing::{Subscriber, warn}; use tracing_subscriber::Layer; use tracing_subscriber::layer::Context; +use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::util::SubscriberInitExt; + +const OBJECT_STORE_SPAN_NAME: &str = "object_store.request"; +const OBJECT_STORE_OPERATION_ATTR: &str = "object_store.operation"; + +static BENCH_SPAN_COLLECTOR: OnceLock = OnceLock::new(); /// Create a simple 3-span trace as ingest records (ready for `write_spans()`). pub fn _create_simple_trace() -> Vec { @@ -158,9 +165,42 @@ impl BenchSpanCollector { .clone() } + pub fn records_len(&self) -> usize { + self.records + .lock() + .expect("bench span collector mutex poisoned") + .len() + } + + pub fn records_since(&self, start: usize) -> Vec { + self.records + .lock() + .expect("bench span collector mutex poisoned") + .iter() + .skip(start) + .cloned() + .collect() + } + pub fn summary(&self) -> BTreeMap { summarize_spans(&self.records()) } + + pub fn object_store_counts_since(&self, start: usize) -> ObjectStoreCountSnapshot { + object_store_counts(&self.records_since(start)) + } +} + +pub fn install_bench_span_collector() -> BenchSpanCollector { + BENCH_SPAN_COLLECTOR + .get_or_init(|| { + let collector = BenchSpanCollector::new(); + let _ = tracing_subscriber::registry() + .with(collector.clone()) + .try_init(); + collector + }) + .clone() } impl Layer for BenchSpanCollector @@ -250,6 +290,51 @@ pub fn summarize_spans(records: &[SpanRecord]) -> BTreeMap { .collect() } +pub fn span_metric_from_samples(samples_us: &[u64]) -> SpanMetric { + if samples_us.is_empty() { + return SpanMetric::default(); + } + + let mut values = samples_us.to_vec(); + values.sort_unstable(); + SpanMetric { + count: values.len() as u64, + p50_us: percentile_u64(&values, 50.0), + p95_us: percentile_u64(&values, 95.0), + p99_us: percentile_u64(&values, 99.0), + sum_us: values.iter().sum(), + } +} + +pub fn object_store_counts(records: &[SpanRecord]) -> ObjectStoreCountSnapshot { + let mut counts = ObjectStoreCountSnapshot::default(); + for record in records + .iter() + .filter(|record| record.name == OBJECT_STORE_SPAN_NAME) + { + match attr_value(record, OBJECT_STORE_OPERATION_ATTR).as_deref() { + Some("list") => counts.list += 1, + Some("list_with_delimiter") => counts.list_with_delimiter += 1, + Some("head") => counts.head += 1, + Some("get") => counts.get += 1, + Some("get_range") => counts.get_range += 1, + Some("put") => counts.put += 1, + Some("delete") => counts.delete += 1, + Some("copy") => counts.copy += 1, + _ => {} + } + } + counts +} + +fn attr_value(record: &SpanRecord, key: &str) -> Option { + record + .attrs + .iter() + .find(|(name, _)| name == key) + .map(|(_, value)| value.trim_matches('"').to_string()) +} + fn percentile_u64(values: &[u64], percentile: f64) -> u64 { if values.is_empty() { return 0; @@ -258,6 +343,7 @@ fn percentile_u64(values: &[u64], percentile: f64) -> u64 { values[index.min(values.len() - 1)] } +#[allow(clippy::too_many_arguments)] pub fn write_bench_artifact( bench_binary: &'static str, group_name: &'static str, @@ -265,6 +351,8 @@ pub fn write_bench_artifact( spans: BTreeMap, object_store_counts: ObjectStoreCountSnapshot, refresh_on_request_path_total: u64, + query_entrypoint: Option<&'static str>, + result_rows: Option, ) { let registration = registration_or_default(bench_binary, group_name); let artifact = BenchArtifact { @@ -278,6 +366,8 @@ pub fn write_bench_artifact( fixture_rows: registration.fixture_rows, fixture_spans: registration.fixture_spans, storage_profile: registration.storage_profile.to_string(), + query_entrypoint: query_entrypoint.map(str::to_string), + result_rows, spans, object_store_counts, refresh_on_request_path_total, @@ -363,4 +453,34 @@ mod tests { assert_eq!(metric.p99_us, 5); assert_eq!(metric.sum_us, 15); } + + #[test] + fn object_store_counts_are_derived_from_span_attrs() { + let records = vec![ + SpanRecord { + name: "object_store.request".to_string(), + attrs: vec![("object_store.operation".to_string(), "list".to_string())], + duration_ns: 1_000, + }, + SpanRecord { + name: "object_store.request".to_string(), + attrs: vec![( + "object_store.operation".to_string(), + "\"get_range\"".to_string(), + )], + duration_ns: 1_000, + }, + SpanRecord { + name: "df.collect".to_string(), + attrs: Vec::new(), + duration_ns: 1_000, + }, + ]; + + let counts = object_store_counts(&records); + + assert_eq!(counts.list, 1); + assert_eq!(counts.get_range, 1); + assert_eq!(counts.total_operations(), 2); + } } diff --git a/crates/scouter_events/src/queue/py_queue.rs b/crates/scouter_events/src/queue/py_queue.rs index 9b73014f1..4ef79a549 100644 --- a/crates/scouter_events/src/queue/py_queue.rs +++ b/crates/scouter_events/src/queue/py_queue.rs @@ -545,15 +545,14 @@ impl ScouterQueue { ) -> Result, PyEventError> { let settings = if let DriftProfile::Agent(genai_profile) = &drift_profile { let mut profile = genai_profile.clone(); - if let Some(workflow) = &mut profile.workflow - && std::env::var("SCOUTER_OFFLINE").as_deref() == Ok("1") - && let Err(e) = app_state() + if config.is_mock() + && let Err(error) = app_state() .handle() - .block_on(async { workflow.reset_agents().await }) + .block_on(async { profile.reset_workflow_agents().await }) { error!( "Failed to reset workflow agents for profile {}: {:?}", - id, e + id, error ); } registry diff --git a/crates/scouter_events/src/queue/types.rs b/crates/scouter_events/src/queue/types.rs index 10a98f21a..d0541c1e6 100644 --- a/crates/scouter_events/src/queue/types.rs +++ b/crates/scouter_events/src/queue/types.rs @@ -49,6 +49,10 @@ impl TransportConfig { }) } + pub fn is_mock(&self) -> bool { + matches!(self, TransportConfig::Mock(_)) + } + /// Create a TransportConfig from a python config object. /// Function will extract the transport type and then extract the corresponding config /// before returning the TransportConfig. diff --git a/crates/scouter_types/src/agent/profile.rs b/crates/scouter_types/src/agent/profile.rs index da87c9362..e827441cd 100644 --- a/crates/scouter_types/src/agent/profile.rs +++ b/crates/scouter_types/src/agent/profile.rs @@ -843,6 +843,13 @@ impl Default for AgentEvalProfile { } impl AgentEvalProfile { + pub async fn reset_workflow_agents(&mut self) -> Result<(), ProfileError> { + if let Some(workflow) = &mut self.workflow { + workflow.reset_agents().await?; + } + Ok(()) + } + /// Helper method to build profile from given tasks pub fn build_from_parts( config: AgentEvalConfig,