From d68f883078ed07f6a1f0e2d5f5a0ac6d16d98575 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Wed, 4 Mar 2026 13:51:41 -0800 Subject: [PATCH 01/22] feat(metrics): Add Unix domain socket support for DogStatsd Add UDS as an alternative transport for reporting metrics to DogStatsd, alongside the existing UDP path. In Python, this is controlled by the `use_dogstatsd_uds` runtime config flag. In Rust, the socket path is passed through the consumer config pipeline and preferred over UDP when present. The default socket path is `/var/run/datadog/dsd.socket`, configurable via the `SNUBA_DOGSTATSD_SOCKET_PATH` environment variable. Co-Authored-By: Claude Agent transcript: https://claudescope.sentry.dev/share/df5Kw0WzWjehG9JtCoOzCyP26xppU-Mei2HymuQJatM --- rust_snuba/src/config.rs | 2 + rust_snuba/src/consumer.rs | 13 +++- rust_snuba/src/metrics/mod.rs | 1 + rust_snuba/src/metrics/statsd.rs | 23 +++++++ rust_snuba/src/metrics/unix_upstream.rs | 91 +++++++++++++++++++++++++ snuba/consumers/consumer_config.py | 2 + snuba/settings/__init__.py | 3 + snuba/utils/metrics/util.py | 27 ++++++-- 8 files changed, 156 insertions(+), 6 deletions(-) create mode 100644 rust_snuba/src/metrics/unix_upstream.rs diff --git a/rust_snuba/src/config.rs b/rust_snuba/src/config.rs index 08b9769506c..47cf422ebb1 100644 --- a/rust_snuba/src/config.rs +++ b/rust_snuba/src/config.rs @@ -104,6 +104,7 @@ pub struct EnvConfig { pub sentry_dsn: Option, pub dogstatsd_host: Option, pub dogstatsd_port: Option, + pub dogstatsd_socket_path: Option, pub default_retention_days: u16, pub lower_retention_days: u16, pub valid_retention_days: HashSet, @@ -117,6 +118,7 @@ impl Default for EnvConfig { sentry_dsn: None, dogstatsd_host: None, dogstatsd_port: None, + dogstatsd_socket_path: None, default_retention_days: 90, lower_retention_days: 30, valid_retention_days: [30, 60, 90].iter().cloned().collect(), diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 524e0ecbee2..ed6e023dd2e 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -136,7 +136,18 @@ pub fn consumer_impl( } // setup arroyo metrics - if let (Some(host), Some(port)) = ( + if let Some(socket_path) = env_config.dogstatsd_socket_path.clone() { + let storage_name = consumer_config + .storages + .iter() + .map(|s| s.name.clone()) + .collect::>() + .join(","); + set_global_tag("storage".to_owned(), storage_name); + set_global_tag("consumer_group".to_owned(), consumer_group.to_owned()); + + metrics::init(StatsDBackend::new_uds(&socket_path, "snuba.consumer")).unwrap(); + } else if let (Some(host), Some(port)) = ( consumer_config.env.dogstatsd_host, consumer_config.env.dogstatsd_port, ) { diff --git a/rust_snuba/src/metrics/mod.rs b/rust_snuba/src/metrics/mod.rs index 078dc29e13f..89308392ed2 100644 --- a/rust_snuba/src/metrics/mod.rs +++ b/rust_snuba/src/metrics/mod.rs @@ -1,2 +1,3 @@ pub mod global_tags; pub mod statsd; +pub mod unix_upstream; diff --git a/rust_snuba/src/metrics/statsd.rs b/rust_snuba/src/metrics/statsd.rs index 53c24d774df..851032af2cc 100644 --- a/rust_snuba/src/metrics/statsd.rs +++ b/rust_snuba/src/metrics/statsd.rs @@ -7,6 +7,7 @@ use statsdproxy::middleware::aggregate::AggregateMetrics; use statsdproxy::middleware::upstream::Upstream; use crate::metrics::global_tags::AddGlobalTags; +use crate::metrics::unix_upstream::UnixUpstream; #[derive(Debug)] pub struct StatsDBackend { @@ -52,6 +53,28 @@ impl StatsDBackend { Self { recorder } } + + pub fn new_uds(socket_path: &str, prefix: &str) -> Self { + let path = socket_path.to_owned(); + let aggregator_sink = StatsdProxyMetricSink::new(move || { + let upstream = UnixUpstream::new(path.clone()).unwrap(); + + let config = AggregateMetricsConfig { + aggregate_counters: true, + flush_offset: 0, + flush_interval: Duration::from_secs(1), + aggregate_gauges: true, + max_map_size: None, + }; + let aggregate = AggregateMetrics::new(config, upstream); + + AddGlobalTags::new(aggregate) + }); + + let recorder = StatsdRecorder::new(prefix, Wrapper(Box::new(aggregator_sink))); + + Self { recorder } + } } #[cfg(test)] diff --git a/rust_snuba/src/metrics/unix_upstream.rs b/rust_snuba/src/metrics/unix_upstream.rs new file mode 100644 index 00000000000..9d4704c4d13 --- /dev/null +++ b/rust_snuba/src/metrics/unix_upstream.rs @@ -0,0 +1,91 @@ +use std::os::unix::net::UnixDatagram; +use std::path::Path; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use statsdproxy::middleware::Middleware; +use statsdproxy::types::Metric; + +const BUFSIZE: usize = 512; + +pub struct UnixUpstream { + socket: UnixDatagram, + buffer: [u8; BUFSIZE], + buf_used: usize, + last_sent_at: SystemTime, +} + +impl UnixUpstream { + pub fn new>(path: P) -> Result { + let socket = UnixDatagram::unbound()?; + socket.connect(path)?; + socket.set_nonblocking(true)?; + Ok(UnixUpstream { + socket, + buffer: [0; BUFSIZE], + buf_used: 0, + last_sent_at: UNIX_EPOCH, + }) + } + + fn flush(&mut self) { + if self.buf_used > 0 { + self.send_buffer(&self.buffer[..self.buf_used]); + self.buf_used = 0; + } + self.last_sent_at = SystemTime::now(); + } + + fn timed_flush(&mut self) { + let now = SystemTime::now(); + if now + .duration_since(self.last_sent_at) + .map_or(true, |x| x > Duration::from_secs(1)) + { + self.flush(); + } + } + + fn send_buffer(&self, buf: &[u8]) { + match self.socket.send(buf) { + Ok(bytes) => { + if bytes != buf.len() { + tracing::error!("tried to send {} bytes but only sent {}.", buf.len(), bytes); + } + } + Err(e) => { + tracing::error!("failed to send to UDS upstream: {}", e); + } + } + } +} + +impl Middleware for UnixUpstream { + fn poll(&mut self) { + self.timed_flush(); + } + + fn submit(&mut self, metric: &mut Metric) { + let metric_len = metric.raw.len(); + + if metric_len + 1 > BUFSIZE - self.buf_used { + self.flush(); + } + + if metric_len > BUFSIZE { + self.send_buffer(&metric.raw); + } else { + if self.buf_used > 0 { + self.buffer[self.buf_used] = b'\n'; + self.buf_used += 1; + } + self.buffer[self.buf_used..self.buf_used + metric_len].copy_from_slice(&metric.raw); + self.buf_used += metric_len; + } + } +} + +impl Drop for UnixUpstream { + fn drop(&mut self) { + self.flush(); + } +} diff --git a/snuba/consumers/consumer_config.py b/snuba/consumers/consumer_config.py index 391052aa369..a4dd98e1231 100644 --- a/snuba/consumers/consumer_config.py +++ b/snuba/consumers/consumer_config.py @@ -49,6 +49,7 @@ class EnvConfig: sentry_dsn: Optional[str] dogstatsd_host: Optional[str] dogstatsd_port: Optional[int] + dogstatsd_socket_path: Optional[str] default_retention_days: int lower_retention_days: int valid_retention_days: list[int] @@ -131,6 +132,7 @@ def _resolve_env_config() -> EnvConfig: sentry_dsn=sentry_dsn, dogstatsd_host=dogstatsd_host, dogstatsd_port=dogstatsd_port, + dogstatsd_socket_path=settings.DOGSTATSD_SOCKET_PATH, default_retention_days=default_retention_days, lower_retention_days=lower_retention_days, valid_retention_days=valid_retention_days, diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index 02700d9c903..336f0e04b9b 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -147,6 +147,9 @@ NEW_DOGSTATSD_HOST: str | None = os.environ.get("SNUBA_NEW_STATSD_HOST") or None NEW_DOGSTATSD_PORT: int | None = int(os.environ.get("SNUBA_NEW_STATSD_PORT") or 0) or None +DOGSTATSD_SOCKET_PATH: str | None = os.environ.get( + "SNUBA_DOGSTATSD_SOCKET_PATH", "/var/run/datadog/dsd.socket" +) CLICKHOUSE_READONLY_USER = os.environ.get("CLICKHOUSE_READONLY_USER", "default") CLICKHOUSE_READONLY_PASSWORD = os.environ.get("CLICKHOUSE_READONLY_PASSWORD", "") diff --git a/snuba/utils/metrics/util.py b/snuba/utils/metrics/util.py index 5a897af055c..fc5c559523b 100644 --- a/snuba/utils/metrics/util.py +++ b/snuba/utils/metrics/util.py @@ -1,8 +1,8 @@ +import _strptime # NOQA fixes _strptime deferred import issue import inspect from functools import partial, wraps from typing import Any, Callable, Mapping, Optional, TypeVar, cast -import _strptime # NOQA fixes _strptime deferred import issue import sentry_sdk from snuba import settings @@ -35,12 +35,31 @@ def create_metrics( f"DOGSTATSD_HOST and DOGSTATSD_PORT should both be None or not None. Found DOGSTATSD_HOST: {host}, DOGSTATSD_PORT: {port} instead." ) - from datadog import DogStatsd + from datadog import DogStatsd # type: ignore[attr-defined] + from snuba import state from snuba.utils.metrics.backends.datadog import DatadogMetricsBackend from snuba.utils.metrics.backends.dualwrite import SentryDatadogMetricsBackend from snuba.utils.metrics.backends.sentry import SentryMetricsBackend + constant_tags = [f"{key}:{value}" for key, value in tags.items()] if tags is not None else None + + use_uds = str(state.get_config("use_dogstatsd_uds", "0")) == "1" + + if use_uds: + return SentryDatadogMetricsBackend( + DatadogMetricsBackend( + partial( + DogStatsd, + socket_path=settings.DOGSTATSD_SOCKET_PATH, + namespace=prefix, + constant_tags=constant_tags, + ), + sample_rates, + ), + SentryMetricsBackend(), + ) + return SentryDatadogMetricsBackend( DatadogMetricsBackend( partial( @@ -48,9 +67,7 @@ def create_metrics( host=host, port=port, namespace=prefix, - constant_tags=( - [f"{key}:{value}" for key, value in tags.items()] if tags is not None else None - ), + constant_tags=constant_tags, ), sample_rates, ), From c90c6f987ca612541ab15d2dda1c3e527a268206 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Wed, 4 Mar 2026 15:19:46 -0800 Subject: [PATCH 02/22] ref(metrics): DRY up statsd backend initialization in consumer Extract the common global tag setup into a single block, choosing the backend (UDS vs UDP) first and then applying shared configuration. Co-Authored-By: Claude Agent transcript: https://claudescope.sentry.dev/share/PXyfvNv6PB8v5UP2payHqvUh6lSTy_8lE7QH6H8tU9o --- rust_snuba/src/consumer.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ed6e023dd2e..f76a102c350 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -136,21 +136,18 @@ pub fn consumer_impl( } // setup arroyo metrics - if let Some(socket_path) = env_config.dogstatsd_socket_path.clone() { - let storage_name = consumer_config - .storages - .iter() - .map(|s| s.name.clone()) - .collect::>() - .join(","); - set_global_tag("storage".to_owned(), storage_name); - set_global_tag("consumer_group".to_owned(), consumer_group.to_owned()); - - metrics::init(StatsDBackend::new_uds(&socket_path, "snuba.consumer")).unwrap(); + let statsd_backend = if let Some(socket_path) = env_config.dogstatsd_socket_path.clone() { + Some(StatsDBackend::new_uds(&socket_path, "snuba.consumer")) } else if let (Some(host), Some(port)) = ( consumer_config.env.dogstatsd_host, consumer_config.env.dogstatsd_port, ) { + Some(StatsDBackend::new(&host, port, "snuba.consumer")) + } else { + None + }; + + if let Some(backend) = statsd_backend { let storage_name = consumer_config .storages .iter() @@ -160,7 +157,7 @@ pub fn consumer_impl( set_global_tag("storage".to_owned(), storage_name); set_global_tag("consumer_group".to_owned(), consumer_group.to_owned()); - metrics::init(StatsDBackend::new(&host, port, "snuba.consumer")).unwrap(); + metrics::init(backend).unwrap(); } if !use_rust_processor { From c217abc260e473792b9b08ba11e3573384741af0 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Wed, 4 Mar 2026 15:28:10 -0800 Subject: [PATCH 03/22] ref(metrics): Use cadence BufferedUnixMetricSink instead of custom UnixUpstream Replace the custom UnixUpstream middleware with cadence's built-in BufferedUnixMetricSink, which already provides the same 512-byte buffered UDS transport. This removes a file and leverages a well-tested library instead. Co-Authored-By: Claude Agent transcript: https://claudescope.sentry.dev/share/mpfDfVQ7ALBfclRYP4cw_R3ypl2IhChN8cxM7pcaUyY --- rust_snuba/src/metrics/mod.rs | 1 - rust_snuba/src/metrics/statsd.rs | 23 ++----- rust_snuba/src/metrics/unix_upstream.rs | 91 ------------------------- 3 files changed, 5 insertions(+), 110 deletions(-) delete mode 100644 rust_snuba/src/metrics/unix_upstream.rs diff --git a/rust_snuba/src/metrics/mod.rs b/rust_snuba/src/metrics/mod.rs index 89308392ed2..078dc29e13f 100644 --- a/rust_snuba/src/metrics/mod.rs +++ b/rust_snuba/src/metrics/mod.rs @@ -1,3 +1,2 @@ pub mod global_tags; pub mod statsd; -pub mod unix_upstream; diff --git a/rust_snuba/src/metrics/statsd.rs b/rust_snuba/src/metrics/statsd.rs index 851032af2cc..19581c91cf9 100644 --- a/rust_snuba/src/metrics/statsd.rs +++ b/rust_snuba/src/metrics/statsd.rs @@ -1,3 +1,4 @@ +use std::os::unix::net::UnixDatagram; use std::time::Duration; use sentry_arroyo::metrics::{Metric, MetricSink, Recorder, StatsdRecorder}; @@ -7,7 +8,6 @@ use statsdproxy::middleware::aggregate::AggregateMetrics; use statsdproxy::middleware::upstream::Upstream; use crate::metrics::global_tags::AddGlobalTags; -use crate::metrics::unix_upstream::UnixUpstream; #[derive(Debug)] pub struct StatsDBackend { @@ -55,23 +55,10 @@ impl StatsDBackend { } pub fn new_uds(socket_path: &str, prefix: &str) -> Self { - let path = socket_path.to_owned(); - let aggregator_sink = StatsdProxyMetricSink::new(move || { - let upstream = UnixUpstream::new(path.clone()).unwrap(); - - let config = AggregateMetricsConfig { - aggregate_counters: true, - flush_offset: 0, - flush_interval: Duration::from_secs(1), - aggregate_gauges: true, - max_map_size: None, - }; - let aggregate = AggregateMetrics::new(config, upstream); - - AddGlobalTags::new(aggregate) - }); - - let recorder = StatsdRecorder::new(prefix, Wrapper(Box::new(aggregator_sink))); + let socket = UnixDatagram::unbound().unwrap(); + socket.set_nonblocking(true).unwrap(); + let sink = cadence::BufferedUnixMetricSink::from(socket_path, socket); + let recorder = StatsdRecorder::new(prefix, Wrapper(Box::new(sink))); Self { recorder } } diff --git a/rust_snuba/src/metrics/unix_upstream.rs b/rust_snuba/src/metrics/unix_upstream.rs deleted file mode 100644 index 9d4704c4d13..00000000000 --- a/rust_snuba/src/metrics/unix_upstream.rs +++ /dev/null @@ -1,91 +0,0 @@ -use std::os::unix::net::UnixDatagram; -use std::path::Path; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use statsdproxy::middleware::Middleware; -use statsdproxy::types::Metric; - -const BUFSIZE: usize = 512; - -pub struct UnixUpstream { - socket: UnixDatagram, - buffer: [u8; BUFSIZE], - buf_used: usize, - last_sent_at: SystemTime, -} - -impl UnixUpstream { - pub fn new>(path: P) -> Result { - let socket = UnixDatagram::unbound()?; - socket.connect(path)?; - socket.set_nonblocking(true)?; - Ok(UnixUpstream { - socket, - buffer: [0; BUFSIZE], - buf_used: 0, - last_sent_at: UNIX_EPOCH, - }) - } - - fn flush(&mut self) { - if self.buf_used > 0 { - self.send_buffer(&self.buffer[..self.buf_used]); - self.buf_used = 0; - } - self.last_sent_at = SystemTime::now(); - } - - fn timed_flush(&mut self) { - let now = SystemTime::now(); - if now - .duration_since(self.last_sent_at) - .map_or(true, |x| x > Duration::from_secs(1)) - { - self.flush(); - } - } - - fn send_buffer(&self, buf: &[u8]) { - match self.socket.send(buf) { - Ok(bytes) => { - if bytes != buf.len() { - tracing::error!("tried to send {} bytes but only sent {}.", buf.len(), bytes); - } - } - Err(e) => { - tracing::error!("failed to send to UDS upstream: {}", e); - } - } - } -} - -impl Middleware for UnixUpstream { - fn poll(&mut self) { - self.timed_flush(); - } - - fn submit(&mut self, metric: &mut Metric) { - let metric_len = metric.raw.len(); - - if metric_len + 1 > BUFSIZE - self.buf_used { - self.flush(); - } - - if metric_len > BUFSIZE { - self.send_buffer(&metric.raw); - } else { - if self.buf_used > 0 { - self.buffer[self.buf_used] = b'\n'; - self.buf_used += 1; - } - self.buffer[self.buf_used..self.buf_used + metric_len].copy_from_slice(&metric.raw); - self.buf_used += metric_len; - } - } -} - -impl Drop for UnixUpstream { - fn drop(&mut self) { - self.flush(); - } -} From bb6f3088e86910bc10228fd87733e50c06c35154 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Wed, 4 Mar 2026 15:29:29 -0800 Subject: [PATCH 04/22] fix(metrics): Restore global tags for UDS statsd backend Pass storage and consumer_group tags to the UDS backend via StatsdRecorder::with_tag instead of the statsdproxy AddGlobalTags middleware. set_global_tag is still called for Sentry scope tags. Co-Authored-By: Claude Agent transcript: https://claudescope.sentry.dev/share/V26YRfVXmvlDYDieSQ0UFx5soTUFz1D5coqoNKUGApk --- rust_snuba/src/consumer.rs | 40 ++++++++++++++++++++------------ rust_snuba/src/metrics/statsd.rs | 7 ++++-- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index f76a102c350..a6de10619ee 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -136,28 +136,38 @@ pub fn consumer_impl( } // setup arroyo metrics - let statsd_backend = if let Some(socket_path) = env_config.dogstatsd_socket_path.clone() { - Some(StatsDBackend::new_uds(&socket_path, "snuba.consumer")) - } else if let (Some(host), Some(port)) = ( - consumer_config.env.dogstatsd_host, - consumer_config.env.dogstatsd_port, - ) { - Some(StatsDBackend::new(&host, port, "snuba.consumer")) - } else { - None - }; - - if let Some(backend) = statsd_backend { + { let storage_name = consumer_config .storages .iter() .map(|s| s.name.clone()) .collect::>() .join(","); - set_global_tag("storage".to_owned(), storage_name); - set_global_tag("consumer_group".to_owned(), consumer_group.to_owned()); - metrics::init(backend).unwrap(); + let statsd_backend = if let Some(socket_path) = env_config.dogstatsd_socket_path.clone() { + Some(StatsDBackend::new_uds( + &socket_path, + "snuba.consumer", + &[ + ("storage", storage_name.clone()), + ("consumer_group", consumer_group.to_owned()), + ], + )) + } else if let (Some(host), Some(port)) = ( + consumer_config.env.dogstatsd_host, + consumer_config.env.dogstatsd_port, + ) { + Some(StatsDBackend::new(&host, port, "snuba.consumer")) + } else { + None + }; + + if let Some(backend) = statsd_backend { + set_global_tag("storage".to_owned(), storage_name); + set_global_tag("consumer_group".to_owned(), consumer_group.to_owned()); + + metrics::init(backend).unwrap(); + } } if !use_rust_processor { diff --git a/rust_snuba/src/metrics/statsd.rs b/rust_snuba/src/metrics/statsd.rs index 19581c91cf9..af591c1685e 100644 --- a/rust_snuba/src/metrics/statsd.rs +++ b/rust_snuba/src/metrics/statsd.rs @@ -54,11 +54,14 @@ impl StatsDBackend { Self { recorder } } - pub fn new_uds(socket_path: &str, prefix: &str) -> Self { + pub fn new_uds(socket_path: &str, prefix: &str, tags: &[(&'static str, String)]) -> Self { let socket = UnixDatagram::unbound().unwrap(); socket.set_nonblocking(true).unwrap(); let sink = cadence::BufferedUnixMetricSink::from(socket_path, socket); - let recorder = StatsdRecorder::new(prefix, Wrapper(Box::new(sink))); + let mut recorder = StatsdRecorder::new(prefix, Wrapper(Box::new(sink))); + for (key, value) in tags { + recorder = recorder.with_tag(key, value); + } Self { recorder } } From 63f5ffc89a827efbd4f4019d97352348e7308c08 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 5 Mar 2026 08:58:03 -0800 Subject: [PATCH 05/22] ref(metrics): Replace cadence and statsdproxy with metrics-exporter-dogstatsd Use metrics-exporter-dogstatsd for both UDP and UDS transports, removing the cadence and statsdproxy dependencies. The new DogStatsDBackend provides native DogStatsD protocol support with client-side aggregation built in. This simplifies the metrics pipeline from statsdproxy (Upstream/AggregateMetrics/AddGlobalTags) + cadence to a single exporter with built-in aggregation and global labels. Co-Authored-By: Claude Agent transcript: https://claudescope.sentry.dev/share/wuD5d2s6g_PVGjhylOUjPEALVpYSHWc_Uqq4OQ8RD6Q --- rust_snuba/Cargo.lock | 344 ++++++++++++-------------- rust_snuba/Cargo.toml | 4 +- rust_snuba/src/consumer.rs | 25 +- rust_snuba/src/lib.rs | 2 +- rust_snuba/src/metrics/global_tags.rs | 121 +-------- rust_snuba/src/metrics/statsd.rs | 145 ++++++----- 6 files changed, 265 insertions(+), 376 deletions(-) diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index 2107a1caf1b..d1d4abd12f1 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -67,54 +67,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" -[[package]] -name = "anstream" -version = "0.6.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" -dependencies = [ - "anstyle", - "anstyle-parse", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "utf8parse", -] - [[package]] name = "anstyle" version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" -[[package]] -name = "anstyle-parse" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" -dependencies = [ - "utf8parse", -] - -[[package]] -name = "anstyle-query" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" -dependencies = [ - "windows-sys 0.52.0", -] - -[[package]] -name = "anstyle-wincon" -version = "3.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" -dependencies = [ - "anstyle", - "windows-sys 0.52.0", -] - [[package]] name = "anyhow" version = "1.0.82" @@ -541,15 +499,6 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" -[[package]] -name = "cadence" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3075f133bee430b7644c54fb629b9b4420346ffa275a45c81a6babe8b09b4f51" -dependencies = [ - "crossbeam-channel", -] - [[package]] name = "cast" version = "0.3.0" @@ -635,7 +584,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" dependencies = [ "clap_builder", - "clap_derive", ] [[package]] @@ -644,22 +592,8 @@ version = "4.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" dependencies = [ - "anstream", "anstyle", "clap_lex", - "strsim", -] - -[[package]] -name = "clap_derive" -version = "4.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" -dependencies = [ - "heck 0.5.0", - "proc-macro2", - "quote", - "syn 2.0.104", ] [[package]] @@ -688,12 +622,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "colorchoice" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" - [[package]] name = "concurrent-queue" version = "2.4.0" @@ -740,15 +668,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc32fast" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" -dependencies = [ - "cfg-if 1.0.0", -] - [[package]] name = "criterion" version = "0.5.1" @@ -1017,27 +936,10 @@ dependencies = [ ] [[package]] -name = "env_filter" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" -dependencies = [ - "log", - "regex", -] - -[[package]] -name = "env_logger" -version = "0.11.8" +name = "endian-type" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "jiff", - "log", -] +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" [[package]] name = "equivalent" @@ -1170,6 +1072,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1413,7 +1321,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.2.6", + "indexmap 2.13.0", "slab", "tokio", "tokio-util", @@ -1432,7 +1340,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.2.6", + "indexmap 2.13.0", "slab", "tokio", "tokio-util", @@ -1465,6 +1373,15 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash", +] + [[package]] name = "heck" version = "0.4.1" @@ -1884,13 +1801,14 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", - "hashbrown 0.14.3", + "hashbrown 0.16.1", "serde", + "serde_core", ] [[package]] @@ -2021,30 +1939,6 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" -[[package]] -name = "jiff" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49" -dependencies = [ - "jiff-static", - "log", - "portable-atomic", - "portable-atomic-util", - "serde", -] - -[[package]] -name = "jiff-static" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.104", -] - [[package]] name = "jobserver" version = "0.1.31" @@ -2273,6 +2167,51 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d5312e9ba3771cfa961b585728215e3d972c950a3eed9252aa093d6301277e8" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-dogstatsd" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "961f3712d8a7cfe14caaf74c3af503fe701cee6439ff49a7a3ebd04bf49c0502" +dependencies = [ + "bytes", + "itoa", + "metrics", + "metrics-util", + "ryu", + "thiserror 2.0.12", + "tracing", +] + +[[package]] +name = "metrics-util" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.16.1", + "indexmap 2.13.0", + "metrics", + "ordered-float", + "quanta", + "radix_trie", + "rand 0.9.2", + "rand_xoshiro", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -2370,6 +2309,15 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.28.0" @@ -2572,6 +2520,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "5.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4779c6901a562440c3786d08192c6fbda7c1c2060edd10006b05ee35d10f2d" +dependencies = [ + "num-traits", +] + [[package]] name = "os_info" version = "3.8.2" @@ -2620,7 +2577,7 @@ checksum = "f0a21c30f03223ae4a4c892f077b3189133689b8a659a84372f8422384ce94c9" dependencies = [ "deprecate-until", "fixedbitset", - "indexmap 2.2.6", + "indexmap 2.13.0", "integer-sqrt", "num-traits", "rustc-hash", @@ -2694,7 +2651,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.2.6", + "indexmap 2.13.0", ] [[package]] @@ -2826,15 +2783,6 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" -[[package]] -name = "portable-atomic-util" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" -dependencies = [ - "portable-atomic", -] - [[package]] name = "powerfmt" version = "0.2.0" @@ -2994,6 +2942,21 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "quote" version = "1.0.40" @@ -3009,6 +2972,16 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.7.3" @@ -3109,6 +3082,24 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.3", +] + +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -3310,7 +3301,6 @@ dependencies = [ "adler", "anyhow", "bytes", - "cadence", "chrono", "criterion", "ctrlc", @@ -3321,6 +3311,8 @@ dependencies = [ "insta", "json-schema-diff", "md5", + "metrics", + "metrics-exporter-dogstatsd", "once_cell", "parking_lot", "procspawn", @@ -3342,7 +3334,6 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_with", - "statsdproxy", "thiserror 1.0.58", "tikv-jemallocator", "tokio", @@ -3722,18 +3713,28 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -3816,7 +3817,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.2.6", + "indexmap 2.13.0", "serde", "serde_derive", "serde_json", @@ -3842,7 +3843,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.13.0", "itoa", "ryu", "serde", @@ -3875,16 +3876,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" -[[package]] -name = "signal-hook" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" -dependencies = [ - "libc", - "signal-hook-registry", -] - [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -3906,6 +3897,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "sketches-ddsketch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b" + [[package]] name = "slab" version = "0.4.9" @@ -3957,25 +3954,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" -[[package]] -name = "statsdproxy" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ad88f372ad4e60c8235f74feb2fcc047377c55b1872d0b9247b8a6aed8dcba9" -dependencies = [ - "anyhow", - "cadence", - "clap", - "crc32fast", - "env_logger", - "log", - "rand 0.8.5", - "serde", - "serde_yaml", - "signal-hook", - "thread_local", -] - [[package]] name = "string_cache" version = "0.8.7" @@ -4296,7 +4274,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.13.0", "toml_datetime", "winnow", ] @@ -4364,7 +4342,7 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", - "indexmap 2.2.6", + "indexmap 2.13.0", "pin-project-lite", "slab", "sync_wrapper 1.0.2", @@ -4616,12 +4594,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" -[[package]] -name = "utf8parse" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" - [[package]] name = "uuid" version = "1.8.0" diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index 942f540ac31..d96ab3a840a 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -24,7 +24,6 @@ crate-type = ["cdylib", "rlib"] [dependencies] adler = "1.0.2" anyhow = { version = "1.0.69", features = ["backtrace"] } -cadence = "1.0.0" chrono = { version = "0.4.26", features = ["serde"] } ctrlc = { version = "3.2.5", features = ["termination"] } data-encoding = "2.5.0" @@ -32,6 +31,8 @@ futures = "0.3.21" hyper = "1.2.0" json-schema-diff = "0.1.7" md5 = "0.7.0" +metrics = "0.24" +metrics-exporter-dogstatsd = "0.9.6" parking_lot = "0.12.1" procspawn = { version = "1.0.0", features = ["json"] } prost = "0.14" @@ -61,7 +62,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } serde_path_to_error = "0.1.15" serde_with = "3.8.1" -statsdproxy = { version = "0.4.1", features = ["cadence"] } thiserror = "1.0" tokio = { version = "1.38.2", features = ["full"] } tokio-stream = "0.1.15" diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index a6de10619ee..97e449993ba 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -20,7 +20,7 @@ use crate::config; use crate::factory_v2::ConsumerStrategyFactoryV2; use crate::logging::{setup_logging, setup_sentry}; use crate::metrics::global_tags::set_global_tag; -use crate::metrics::statsd::StatsDBackend; +use crate::metrics::statsd::DogStatsDBackend; use crate::processors; use crate::rebalancing; use crate::types::{InsertOrReplacement, KafkaMessageMetadata}; @@ -143,29 +143,34 @@ pub fn consumer_impl( .map(|s| s.name.clone()) .collect::>() .join(","); + let tags = [ + ("storage", storage_name.clone()), + ("consumer_group", consumer_group.to_owned()), + ]; - let statsd_backend = if let Some(socket_path) = env_config.dogstatsd_socket_path.clone() { - Some(StatsDBackend::new_uds( + let backend = if let Some(socket_path) = env_config.dogstatsd_socket_path.clone() { + Some(DogStatsDBackend::new_uds( &socket_path, "snuba.consumer", - &[ - ("storage", storage_name.clone()), - ("consumer_group", consumer_group.to_owned()), - ], + &tags, )) } else if let (Some(host), Some(port)) = ( consumer_config.env.dogstatsd_host, consumer_config.env.dogstatsd_port, ) { - Some(StatsDBackend::new(&host, port, "snuba.consumer")) + Some(DogStatsDBackend::new_udp( + &host, + port, + "snuba.consumer", + &tags, + )) } else { None }; - if let Some(backend) = statsd_backend { + if let Some(backend) = backend { set_global_tag("storage".to_owned(), storage_name); set_global_tag("consumer_group".to_owned(), consumer_group.to_owned()); - metrics::init(backend).unwrap(); } } diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index fcb19d2f811..a232ec138b1 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -27,7 +27,7 @@ pub use config::{ StorageConfig, TopicConfig, }; pub use factory_v2::ConsumerStrategyFactoryV2; -pub use metrics::statsd::StatsDBackend; +pub use metrics::statsd::DogStatsDBackend; pub use processors::{ProcessingFunction, ProcessingFunctionType, PROCESSORS}; pub use strategies::noop::Noop; pub use strategies::python::PythonTransformStep; diff --git a/rust_snuba/src/metrics/global_tags.rs b/rust_snuba/src/metrics/global_tags.rs index 8d5177ae568..6ac39ace5c9 100644 --- a/rust_snuba/src/metrics/global_tags.rs +++ b/rust_snuba/src/metrics/global_tags.rs @@ -1,125 +1,6 @@ -use std::collections::BTreeMap; - -use parking_lot::RwLock; -use statsdproxy::middleware::Middleware; -use statsdproxy::types::Metric; - -static GLOBAL_TAGS: RwLock> = RwLock::new(BTreeMap::new()); - +/// Sets a tag on the current Sentry scope. pub fn set_global_tag(key: String, value: String) { sentry::configure_scope(|scope| { scope.set_tag(&key, &value); }); - GLOBAL_TAGS.write().insert(key, value); -} - -pub struct AddGlobalTags<'a, M> { - next: M, - global_tags: &'a RwLock>, -} - -impl AddGlobalTags<'static, M> -where - M: Middleware, -{ - pub fn new(next: M) -> Self { - Self::new_with_tagmap(next, &GLOBAL_TAGS) - } -} - -impl<'a, M> AddGlobalTags<'a, M> -where - M: Middleware, -{ - fn new_with_tagmap(next: M, global_tags: &'a RwLock>) -> Self { - AddGlobalTags { next, global_tags } - } -} - -impl Middleware for AddGlobalTags<'_, M> -where - M: Middleware, -{ - fn poll(&mut self) { - self.next.poll() - } - - fn submit(&mut self, metric: &mut Metric) { - let global_tags = self.global_tags.read(); - - if global_tags.is_empty() { - return self.next.submit(metric); - } - - let mut tag_buffer: Vec = Vec::new(); - let mut add_comma = false; - match metric.tags() { - Some(tags) if !tags.is_empty() => { - tag_buffer.extend(tags); - add_comma = true; - } - _ => (), - } - for (k, v) in global_tags.iter() { - if add_comma { - tag_buffer.push(b','); - } - tag_buffer.extend(k.as_bytes()); - tag_buffer.push(b':'); - tag_buffer.extend(v.as_bytes()); - add_comma = true; - } - - metric.set_tags(&tag_buffer); - - self.next.submit(metric) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::cell::RefCell; - use std::collections::BTreeMap; - - use statsdproxy::{middleware::Middleware, types::Metric}; - - struct FnStep(pub F); - - impl Middleware for FnStep - where - F: FnMut(&mut Metric), - { - fn submit(&mut self, metric: &mut Metric) { - (self.0)(metric) - } - } - - #[test] - fn test_basic() { - let test_cases = [ - // Without tags - ("users.online:1|c", "users.online:1|c|#env:prod"), - // With tags - ( - "users.online:1|c|#tag1:a", - "users.online:1|c|#tag1:a,env:prod", - ), - ]; - - for test_case in test_cases { - let results = RefCell::new(vec![]); - let global_tags = RwLock::new(BTreeMap::from([("env".to_owned(), "prod".to_owned())])); - - let step = FnStep(|metric: &mut Metric| results.borrow_mut().push(metric.clone())); - let mut middleware = AddGlobalTags::new_with_tagmap(step, &global_tags); - - let mut metric = Metric::new(test_case.0.as_bytes().to_vec()); - middleware.submit(&mut metric); - assert_eq!(results.borrow().len(), 1); - let updated_metric = Metric::new(results.borrow_mut()[0].raw.clone()); - assert_eq!(updated_metric.raw, test_case.1.as_bytes()); - } - } } diff --git a/rust_snuba/src/metrics/statsd.rs b/rust_snuba/src/metrics/statsd.rs index af591c1685e..d706f751ba6 100644 --- a/rust_snuba/src/metrics/statsd.rs +++ b/rust_snuba/src/metrics/statsd.rs @@ -1,69 +1,100 @@ -use std::os::unix::net::UnixDatagram; -use std::time::Duration; - -use sentry_arroyo::metrics::{Metric, MetricSink, Recorder, StatsdRecorder}; -use statsdproxy::cadence::StatsdProxyMetricSink; -use statsdproxy::config::AggregateMetricsConfig; -use statsdproxy::middleware::aggregate::AggregateMetrics; -use statsdproxy::middleware::upstream::Upstream; - -use crate::metrics::global_tags::AddGlobalTags; +use metrics::Label; +use metrics_exporter_dogstatsd::DogStatsDBuilder; +use sentry_arroyo::metrics::{Metric, MetricType, MetricValue, Recorder}; +/// A metrics backend that uses `metrics-exporter-dogstatsd` to send metrics +/// to DogStatsD over UDP or Unix domain sockets. Adapts arroyo's [`Recorder`] +/// trait to the `metrics` crate facade installed by the exporter. #[derive(Debug)] -pub struct StatsDBackend { - recorder: StatsdRecorder, +pub struct DogStatsDBackend { + prefix: String, } -impl Recorder for StatsDBackend { - fn record_metric(&self, metric: Metric<'_>) { - self.recorder.record_metric(metric) +impl DogStatsDBackend { + pub fn new_udp(host: &str, port: u16, prefix: &str, tags: &[(&str, String)]) -> Self { + let addr = format!("{}:{}", host, port); + Self::build(&addr, prefix, tags) } -} -struct Wrapper(Box); - -impl MetricSink for Wrapper { - fn emit(&self, metric: &str) { - let _ = self.0.emit(metric); + pub fn new_uds(socket_path: &str, prefix: &str, tags: &[(&str, String)]) -> Self { + let addr = format!("unixgram://{}", socket_path); + Self::build(&addr, prefix, tags) } -} - -impl StatsDBackend { - pub fn new(host: &str, port: u16, prefix: &str) -> Self { - let upstream_addr = format!("{}:{}", host, port); - let aggregator_sink = StatsdProxyMetricSink::new(move || { - let upstream = Upstream::new(upstream_addr.clone()).unwrap(); - let config = AggregateMetricsConfig { - aggregate_counters: true, - flush_offset: 0, - flush_interval: Duration::from_secs(1), - aggregate_gauges: true, - max_map_size: None, - }; - let aggregate = AggregateMetrics::new(config, upstream); - - // adding global tags *after* aggregation is more performant than trying to do the same - // in cadence, as it means more bytes and more memory to deal with in - // AggregateMetricsConfig - AddGlobalTags::new(aggregate) - }); - - let recorder = StatsdRecorder::new(prefix, Wrapper(Box::new(aggregator_sink))); - - Self { recorder } + fn build(addr: &str, prefix: &str, tags: &[(&str, String)]) -> Self { + let global_labels: Vec