diff --git a/crates/lib/docs_rs_storage/src/archive_index.rs b/crates/lib/docs_rs_storage/src/archive_index.rs index 21fa0ba46..2b3fe6319 100644 --- a/crates/lib/docs_rs_storage/src/archive_index.rs +++ b/crates/lib/docs_rs_storage/src/archive_index.rs @@ -1,6 +1,6 @@ use crate::{ PathNotFoundError, blob::StreamingBlob, config::ArchiveIndexCacheConfig, file::FolderEntry, - types::FileRange, utils::file_list::walk_dir_recursive, + metrics::FILE_SIZE_HISTOGRAM_BUCKETS, types::FileRange, utils::file_list::walk_dir_recursive, }; use anyhow::{Context as _, Result, anyhow, bail}; use async_stream::try_stream; @@ -68,28 +68,6 @@ impl Metrics { fn new(meter_provider: &AnyMeterProvider) -> Self { let meter = meter_provider.meter("storage"); const PREFIX: &str = "docsrs.storage.archive_index_cache"; - const KIB: f64 = 1024.0; - const MIB: f64 = 1024.0 * KIB; - const GIB: f64 = 1024.0 * MIB; - - let entry_size_boundaries = vec![ - 500.0 * KIB, - 1.0 * MIB, - 2.0 * MIB, - 4.0 * MIB, - 8.0 * MIB, - 16.0 * MIB, - 32.0 * MIB, - 64.0 * MIB, - 128.0 * MIB, - 256.0 * MIB, - 512.0 * MIB, - 1.0 * GIB, - 2.0 * GIB, - 4.0 * GIB, - 8.0 * GIB, - 10.0 * GIB, - ]; Self { find_calls: meter @@ -115,12 +93,12 @@ impl Metrics { evicted_entry_size: meter .u64_histogram(format!("{PREFIX}.evicted_entry_size")) .with_unit("By") - .with_boundaries(entry_size_boundaries.clone()) + .with_boundaries(FILE_SIZE_HISTOGRAM_BUCKETS.to_vec()) .build(), downloaded_entry_size: meter .u64_histogram(format!("{PREFIX}.downloaded_entry_size")) .with_unit("By") - .with_boundaries(entry_size_boundaries) + .with_boundaries(FILE_SIZE_HISTOGRAM_BUCKETS.to_vec()) .build(), weighted_size_bytes: meter .u64_gauge(format!("{PREFIX}.weighted_size_bytes")) diff --git a/crates/lib/docs_rs_storage/src/backends/memory.rs b/crates/lib/docs_rs_storage/src/backends/memory.rs index 818c1e4f4..c5db6bbdc 100644 --- a/crates/lib/docs_rs_storage/src/backends/memory.rs +++ b/crates/lib/docs_rs_storage/src/backends/memory.rs @@ -3,7 +3,7 @@ use crate::{ backends::StorageBackendMethods, blob::{StreamUpload, StreamUploadSource, StreamingBlob}, errors::PathNotFoundError, - metrics::StorageMetrics, + metrics::{StorageMetrics, UploadType}, types::FileRange, }; use anyhow::{Result, anyhow}; @@ -30,6 +30,7 @@ impl MemoryBackend { impl StorageBackendMethods for MemoryBackend { async fn exists(&self, path: &str) -> Result { + self.otel_metrics.exist_calls.add(1, &[]); Ok(self.objects.contains_key(path)) } @@ -37,7 +38,7 @@ impl StorageBackendMethods for MemoryBackend { let mut blob = self.objects.get(path).ok_or(PathNotFoundError)?.clone(); debug_assert!(blob.etag.is_some()); - if let Some(r) = range { + if let Some(r) = &range { blob.content = blob .content .get(*r.start() as usize..=*r.end() as usize) @@ -45,6 +46,10 @@ impl StorageBackendMethods for MemoryBackend { .to_vec(); blob.etag = Some(compute_etag(&blob.content)); } + + self.otel_metrics + .record_download_metrics(blob.content.len() as u64, range.as_ref()); + Ok(blob.into()) } @@ -60,6 +65,7 @@ impl StorageBackendMethods for MemoryBackend { StreamUploadSource::Bytes(content) => content.to_vec(), StreamUploadSource::File(path) => fs::read(&path).await?, }; + let content_len = content.len(); let blob = Blob { path, @@ -70,7 +76,9 @@ impl StorageBackendMethods for MemoryBackend { compression, }; - self.otel_metrics.uploaded_files.add(1, &[]); + self.otel_metrics + .record_upload_metrics(content_len as u64, UploadType::Single); + self.objects.insert(blob.path.clone(), blob); Ok(()) } @@ -93,7 +101,14 @@ impl StorageBackendMethods for MemoryBackend { } async fn delete_prefix(&self, prefix: &str) -> Result<()> { + let object_count = self.objects.len(); + self.objects.retain(|key, _| !key.starts_with(prefix)); + + self.otel_metrics + .deleted_files + .add((object_count - self.objects.len()) as u64, &[]); + Ok(()) } } diff --git a/crates/lib/docs_rs_storage/src/backends/mod.rs b/crates/lib/docs_rs_storage/src/backends/mod.rs index effb27dc5..d523a82ed 100644 --- a/crates/lib/docs_rs_storage/src/backends/mod.rs +++ b/crates/lib/docs_rs_storage/src/backends/mod.rs @@ -5,6 +5,7 @@ pub(crate) mod s3; use crate::{StreamingBlob, blob::StreamUpload, types::FileRange}; use anyhow::Result; use futures_util::stream::BoxStream; +use tracing::instrument; pub(crate) trait StorageBackendMethods { async fn exists(&self, path: &str) -> Result; @@ -35,10 +36,26 @@ impl StorageBackendMethods for StorageBackend { call_inner!(self, exists(path)) } + #[instrument( + skip_all, + fields( + // fields are populated in StorageMetrics::record_download_metrics + storage.download_type = tracing::field::Empty, + storage.content_length = tracing::field::Empty + ) + )] async fn get_stream(&self, path: &str, range: Option) -> Result { call_inner!(self, get_stream(path, range)) } + #[instrument( + skip_all, + fields( + // fields are populated in StorageMetrics::record_upload_metrics + storage.upload_type = tracing::field::Empty, + storage.content_length = tracing::field::Empty + ) + )] async fn upload_stream(&self, upload: StreamUpload) -> Result<()> { call_inner!(self, upload_stream(upload)) } diff --git a/crates/lib/docs_rs_storage/src/backends/s3.rs b/crates/lib/docs_rs_storage/src/backends/s3.rs index c1cd6ccd6..f7f2561e3 100644 --- a/crates/lib/docs_rs_storage/src/backends/s3.rs +++ b/crates/lib/docs_rs_storage/src/backends/s3.rs @@ -4,7 +4,7 @@ use crate::{ blob::{StreamUpload, StreamUploadSource, StreamingBlob}, crc32_for_path, errors::PathNotFoundError, - metrics::StorageMetrics, + metrics::{StorageMetrics, UploadType}, types::FileRange, utils::crc32::crc32_for_path_range, }; @@ -200,7 +200,8 @@ impl S3Backend { request.send().await?; - self.otel_metrics.uploaded_files.add(1, &[]); + self.otel_metrics + .record_upload_metrics(content_length, UploadType::Single); Ok(()) } @@ -256,7 +257,8 @@ impl S3Backend { match result { Ok(()) => { - self.otel_metrics.uploaded_files.add(1, &[]); + self.otel_metrics + .record_upload_metrics(content_length, UploadType::MultiPart); Ok(()) } Err(err) => { @@ -358,6 +360,8 @@ impl S3Backend { impl StorageBackendMethods for S3Backend { async fn exists(&self, path: &str) -> Result { + self.otel_metrics.exist_calls.add(1, &[]); + match self .client .head_object() @@ -404,7 +408,7 @@ impl StorageBackendMethods for S3Backend { let etag = if let Some(s3_etag) = res.e_tag && !s3_etag.is_empty() { - if let Some(range) = range { + if let Some(range) = &range { // we can generate a unique etag for a range of the remote object too, // by just concatenating the original etag with the range start and end. // @@ -436,6 +440,17 @@ impl StorageBackendMethods for S3Backend { None }; + let content_length: usize = res + .content_length + .and_then(|length| length.try_into().ok()) + .unwrap_or(0); + + // NOTE: we record the download, even though we don't know if the caller + // actually consumes the stream. + // For the current usage, that's fine. + self.otel_metrics + .record_download_metrics(content_length as u64, range.as_ref()); + Ok(StreamingBlob { path: path.into(), mime: res @@ -446,10 +461,7 @@ impl StorageBackendMethods for S3Backend { .unwrap_or(mime::APPLICATION_OCTET_STREAM), date_updated, etag, - content_length: res - .content_length - .and_then(|length| length.try_into().ok()) - .unwrap_or(0), + content_length, content: Box::new(res.body.into_async_read()), compression, }) @@ -544,6 +556,8 @@ impl StorageBackendMethods for S3Backend { while let Some(batch) = chunks.next().await { let batch: Vec<_> = batch.into_iter().collect::>()?; + let batch_size = batch.len() as u64; + let to_delete = Delete::builder() .set_objects(Some( batch @@ -563,12 +577,21 @@ impl StorageBackendMethods for S3Backend { .await?; if let Some(errs) = resp.errors { + let successful_deletes = batch_size - errs.len() as u64; + if successful_deletes > 0 { + // we can have partial success, where some of the objects were deleted, + // and some not. + self.otel_metrics.deleted_files.add(successful_deletes, &[]); + } + for err in &errs { error!("error deleting file from s3: {:?}", err); } anyhow::bail!("deleting from s3 failed"); } + + self.otel_metrics.deleted_files.add(batch_size, &[]); } Ok(()) } diff --git a/crates/lib/docs_rs_storage/src/metrics.rs b/crates/lib/docs_rs_storage/src/metrics.rs index 53bcd34f6..1359bad01 100644 --- a/crates/lib/docs_rs_storage/src/metrics.rs +++ b/crates/lib/docs_rs_storage/src/metrics.rs @@ -1,9 +1,63 @@ +use crate::types::FileRange; use docs_rs_opentelemetry::AnyMeterProvider; -use opentelemetry::metrics::Counter; +use opentelemetry::{ + KeyValue, + metrics::{Counter, Histogram}, +}; +use strum::IntoStaticStr; +use tracing::Span; + +const KIB: f64 = 1024.0; +const MIB: f64 = 1024.0 * KIB; +const GIB: f64 = 1024.0 * MIB; + +/// boundaries for histogram metrics where we track +/// file sizes on our S3 bucket. +/// This has to include: +/// * zip archives (between 500 KiB & 10 GiB) +/// * archive indexes (between <100 KiB & 500 MiB) +/// * plain old html / css files (mostly super small) +pub(crate) const FILE_SIZE_HISTOGRAM_BUCKETS: &[f64] = &[ + KIB, + 4.0 * KIB, + 16.0 * KIB, + 64.0 * KIB, + 256.0 * KIB, + 512.0 * KIB, + MIB, + 2.0 * MIB, + 4.0 * MIB, + 8.0 * MIB, + 16.0 * MIB, + 32.0 * MIB, + 64.0 * MIB, + 128.0 * MIB, + 256.0 * MIB, + 512.0 * MIB, + GIB, + 2.0 * GIB, + 4.0 * GIB, + 8.0 * GIB, + 10.0 * GIB, +]; + +#[derive(Copy, Clone, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub(crate) enum UploadType { + Single, + MultiPart, +} #[derive(Debug)] pub(crate) struct StorageMetrics { + pub(crate) exist_calls: Counter, pub(crate) uploaded_files: Counter, + pub(crate) uploaded_bytes: Counter, + pub(crate) uploaded_entry_size: Histogram, + pub(crate) downloaded_files: Counter, + pub(crate) downloaded_bytes: Counter, + pub(crate) downloaded_entry_size: Histogram, + pub(crate) deleted_files: Counter, } impl StorageMetrics { @@ -12,10 +66,65 @@ impl StorageMetrics { const PREFIX: &str = "docsrs.storage"; Self { + exist_calls: meter + .u64_counter(format!("{PREFIX}.exist_calls")) + .with_unit("1") + .build(), uploaded_files: meter .u64_counter(format!("{PREFIX}.uploaded_files")) .with_unit("1") .build(), + uploaded_bytes: meter + .u64_counter(format!("{PREFIX}.uploaded_bytes")) + .with_unit("By") + .build(), + uploaded_entry_size: meter + .u64_histogram(format!("{PREFIX}.uploaded_entry_size")) + .with_unit("By") + .with_boundaries(FILE_SIZE_HISTOGRAM_BUCKETS.to_vec()) + .build(), + downloaded_files: meter + .u64_counter(format!("{PREFIX}.downloaded_files")) + .with_unit("1") + .build(), + downloaded_bytes: meter + .u64_counter(format!("{PREFIX}.downloaded_bytes")) + .with_unit("By") + .build(), + downloaded_entry_size: meter + .u64_histogram(format!("{PREFIX}.downloaded_entry_size")) + .with_unit("By") + .with_boundaries(FILE_SIZE_HISTOGRAM_BUCKETS.to_vec()) + .build(), + deleted_files: meter + .u64_counter(format!("{PREFIX}.deleted_files")) + .with_unit("1") + .build(), } } + + pub(crate) fn record_download_metrics(&self, content_length: u64, range: Option<&FileRange>) { + let download_type = if range.is_some() { "range" } else { "full" }; + Span::current() + .record("storage.download_type", download_type) + .record("storage.content_length", content_length); + + let attrs = [KeyValue::new("download_type", download_type)]; + self.downloaded_files.add(1, &attrs); + self.downloaded_bytes.add(content_length, &attrs); + self.downloaded_entry_size.record(content_length, &attrs); + } + + pub(crate) fn record_upload_metrics(&self, content_length: u64, upload_type: UploadType) { + let upload_type: &str = upload_type.into(); + + Span::current() + .record("storage.upload_type", upload_type) + .record("storage.content_length", content_length); + + let attrs = [KeyValue::new("upload_type", upload_type)]; + self.uploaded_files.add(1, &attrs); + self.uploaded_bytes.add(content_length, &attrs); + self.uploaded_entry_size.record(content_length, &attrs); + } } diff --git a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs index f2a7036a5..346fb4e09 100644 --- a/crates/lib/docs_rs_storage/src/storage/non_blocking.rs +++ b/crates/lib/docs_rs_storage/src/storage/non_blocking.rs @@ -596,7 +596,7 @@ mod backend_tests { files.iter().find(|info| info.path == path) } - async fn test_exists(storage: &AsyncStorage) -> Result<()> { + async fn test_exists(storage: &AsyncStorage, metrics: &TestMetrics) -> Result<()> { assert!(!storage.exists("path/to/file.txt").await.unwrap()); let blob = BlobUpload { path: "path/to/file.txt".into(), @@ -607,10 +607,19 @@ mod backend_tests { storage.store_blobs(vec![blob]).await?; assert!(storage.exists("path/to/file.txt").await?); + let collected_metrics = metrics.collected_metrics(); + assert_eq!( + collected_metrics + .get_metric("storage", "docsrs.storage.exist_calls")? + .get_u64_counter() + .value(), + 2, + ); + Ok(()) } - async fn test_get_object(storage: &AsyncStorage) -> Result<()> { + async fn test_get_object(storage: &AsyncStorage, metrics: &TestMetrics) -> Result<()> { let path: &str = "foo/bar.txt"; let blob = BlobUpload { path: path.into(), @@ -639,10 +648,26 @@ mod backend_tests { ); } + let collected_metrics = metrics.collected_metrics(); + assert_eq!( + collected_metrics + .get_metric("storage", "docsrs.storage.downloaded_files")? + .get_u64_counter() + .value(), + 1, + ); + assert_eq!( + collected_metrics + .get_metric("storage", "docsrs.storage.downloaded_bytes")? + .get_u64_counter() + .value(), + blob.content.len() as u64, + ); + Ok(()) } - async fn test_get_range(storage: &AsyncStorage) -> Result<()> { + async fn test_get_range(storage: &AsyncStorage, metrics: &TestMetrics) -> Result<()> { let blob = BlobUpload { path: "foo/bar.txt".into(), mime: mime::TEXT_PLAIN, @@ -684,6 +709,22 @@ mod backend_tests { ); } + let collected_metrics = metrics.collected_metrics(); + assert_eq!( + collected_metrics + .get_metric("storage", "docsrs.storage.downloaded_files")? + .get_u64_counter() + .value(), + 2, + ); + assert_eq!( + collected_metrics + .get_metric("storage", "docsrs.storage.downloaded_bytes")? + .get_u64_counter() + .value(), + blob.content.len() as u64, + ); + Ok(()) } @@ -814,6 +855,27 @@ mod backend_tests { .value(), NAMES.len() as u64, ); + assert_eq!( + collected_metrics + .get_metric("storage", "docsrs.storage.uploaded_bytes")? + .get_u64_counter() + .value(), + (NAMES.len() * "Hello world!\n".len()) as u64, + ); + assert_eq!( + collected_metrics + .get_metric("storage", "docsrs.storage.downloaded_files")? + .get_u64_counter() + .value(), + NAMES.len() as u64, + ); + assert_eq!( + collected_metrics + .get_metric("storage", "docsrs.storage.downloaded_bytes")? + .get_u64_counter() + .value(), + (NAMES.len() * "Hello world!\n".len()) as u64, + ); Ok(()) } @@ -1033,7 +1095,7 @@ mod backend_tests { storage.delete_prefix("prefix_without_objects").await } - async fn test_delete_prefix(storage: &AsyncStorage) -> Result<()> { + async fn test_delete_prefix(storage: &AsyncStorage, metrics: &TestMetrics) -> Result<()> { test_deletion( storage, "foo/bar/", @@ -1047,7 +1109,18 @@ mod backend_tests { &["foo.txt", "foo/bar.txt", "bar.txt"], &["foo/bar/baz.txt", "foo/bar/foobar.txt"], ) - .await + .await?; + + let collected_metrics = metrics.collected_metrics(); + assert_eq!( + collected_metrics + .get_metric("storage", "docsrs.storage.deleted_files")? + .get_u64_counter() + .value(), + 2, + ); + + Ok(()) } async fn test_delete_percent(storage: &AsyncStorage) -> Result<()> { @@ -1157,13 +1230,9 @@ mod backend_tests { tests { test_batched_uploads, - test_exists, - test_get_object, - test_get_range, test_get_too_big, test_too_long_filename, test_list_prefix, - test_delete_prefix, test_delete_prefix_without_matches, test_delete_percent, test_exists_without_remote_archive, @@ -1171,6 +1240,10 @@ mod backend_tests { } tests_with_metrics { + test_exists, + test_get_object, + test_get_range, + test_delete_prefix, test_store_blobs, test_store_all, test_store_all_in_archive,