Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 3 additions & 25 deletions crates/lib/docs_rs_storage/src/archive_index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
PathNotFoundError, blob::StreamingBlob, config::ArchiveIndexCacheConfig, file::FolderEntry,
types::FileRange, utils::file_list::walk_dir_recursive,
metrics::FILE_SIZE_HISTOGRAM_BUCKETS, types::FileRange, utils::file_list::walk_dir_recursive,
};
use anyhow::{Context as _, Result, anyhow, bail};
use async_stream::try_stream;
Expand Down Expand Up @@ -68,28 +68,6 @@ impl Metrics {
fn new(meter_provider: &AnyMeterProvider) -> Self {
let meter = meter_provider.meter("storage");
const PREFIX: &str = "docsrs.storage.archive_index_cache";
const KIB: f64 = 1024.0;
const MIB: f64 = 1024.0 * KIB;
const GIB: f64 = 1024.0 * MIB;

let entry_size_boundaries = vec![
500.0 * KIB,
1.0 * MIB,
2.0 * MIB,
4.0 * MIB,
8.0 * MIB,
16.0 * MIB,
32.0 * MIB,
64.0 * MIB,
128.0 * MIB,
256.0 * MIB,
512.0 * MIB,
1.0 * GIB,
2.0 * GIB,
4.0 * GIB,
8.0 * GIB,
10.0 * GIB,
];

Self {
find_calls: meter
Expand All @@ -115,12 +93,12 @@ impl Metrics {
evicted_entry_size: meter
.u64_histogram(format!("{PREFIX}.evicted_entry_size"))
.with_unit("By")
.with_boundaries(entry_size_boundaries.clone())
.with_boundaries(FILE_SIZE_HISTOGRAM_BUCKETS.to_vec())
.build(),
downloaded_entry_size: meter
.u64_histogram(format!("{PREFIX}.downloaded_entry_size"))
.with_unit("By")
.with_boundaries(entry_size_boundaries)
.with_boundaries(FILE_SIZE_HISTOGRAM_BUCKETS.to_vec())
.build(),
weighted_size_bytes: meter
.u64_gauge(format!("{PREFIX}.weighted_size_bytes"))
Expand Down
21 changes: 18 additions & 3 deletions crates/lib/docs_rs_storage/src/backends/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
backends::StorageBackendMethods,
blob::{StreamUpload, StreamUploadSource, StreamingBlob},
errors::PathNotFoundError,
metrics::StorageMetrics,
metrics::{StorageMetrics, UploadType},
types::FileRange,
};
use anyhow::{Result, anyhow};
Expand All @@ -30,21 +30,26 @@ impl MemoryBackend {

impl StorageBackendMethods for MemoryBackend {
async fn exists(&self, path: &str) -> Result<bool> {
self.otel_metrics.exist_calls.add(1, &[]);
Ok(self.objects.contains_key(path))
}

async fn get_stream(&self, path: &str, range: Option<FileRange>) -> Result<StreamingBlob> {
let mut blob = self.objects.get(path).ok_or(PathNotFoundError)?.clone();
debug_assert!(blob.etag.is_some());

if let Some(r) = range {
if let Some(r) = &range {
blob.content = blob
.content
.get(*r.start() as usize..=*r.end() as usize)
.ok_or_else(|| anyhow!("invalid range"))?
.to_vec();
blob.etag = Some(compute_etag(&blob.content));
}

self.otel_metrics
.record_download_metrics(blob.content.len() as u64, range.as_ref());

Ok(blob.into())
}

Expand All @@ -60,6 +65,7 @@ impl StorageBackendMethods for MemoryBackend {
StreamUploadSource::Bytes(content) => content.to_vec(),
StreamUploadSource::File(path) => fs::read(&path).await?,
};
let content_len = content.len();

let blob = Blob {
path,
Expand All @@ -70,7 +76,9 @@ impl StorageBackendMethods for MemoryBackend {
compression,
};

self.otel_metrics.uploaded_files.add(1, &[]);
self.otel_metrics
.record_upload_metrics(content_len as u64, UploadType::Single);

self.objects.insert(blob.path.clone(), blob);
Ok(())
}
Expand All @@ -93,7 +101,14 @@ impl StorageBackendMethods for MemoryBackend {
}

async fn delete_prefix(&self, prefix: &str) -> Result<()> {
let object_count = self.objects.len();

self.objects.retain(|key, _| !key.starts_with(prefix));

self.otel_metrics
.deleted_files
.add((object_count - self.objects.len()) as u64, &[]);

Ok(())
}
}
17 changes: 17 additions & 0 deletions crates/lib/docs_rs_storage/src/backends/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod s3;
use crate::{StreamingBlob, blob::StreamUpload, types::FileRange};
use anyhow::Result;
use futures_util::stream::BoxStream;
use tracing::instrument;

pub(crate) trait StorageBackendMethods {
async fn exists(&self, path: &str) -> Result<bool>;
Expand Down Expand Up @@ -35,10 +36,26 @@ impl StorageBackendMethods for StorageBackend {
call_inner!(self, exists(path))
}

#[instrument(
skip_all,
fields(
// fields are populated in StorageMetrics::record_download_metrics
storage.download_type = tracing::field::Empty,
storage.content_length = tracing::field::Empty
)
)]
async fn get_stream(&self, path: &str, range: Option<FileRange>) -> Result<StreamingBlob> {
call_inner!(self, get_stream(path, range))
}

#[instrument(
skip_all,
fields(
// fields are populated in StorageMetrics::record_upload_metrics
storage.upload_type = tracing::field::Empty,
storage.content_length = tracing::field::Empty
)
)]
async fn upload_stream(&self, upload: StreamUpload) -> Result<()> {
call_inner!(self, upload_stream(upload))
}
Expand Down
39 changes: 31 additions & 8 deletions crates/lib/docs_rs_storage/src/backends/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
blob::{StreamUpload, StreamUploadSource, StreamingBlob},
crc32_for_path,
errors::PathNotFoundError,
metrics::StorageMetrics,
metrics::{StorageMetrics, UploadType},
types::FileRange,
utils::crc32::crc32_for_path_range,
};
Expand Down Expand Up @@ -200,7 +200,8 @@ impl S3Backend {

request.send().await?;

self.otel_metrics.uploaded_files.add(1, &[]);
self.otel_metrics
.record_upload_metrics(content_length, UploadType::Single);

Ok(())
}
Expand Down Expand Up @@ -256,7 +257,8 @@ impl S3Backend {

match result {
Ok(()) => {
self.otel_metrics.uploaded_files.add(1, &[]);
self.otel_metrics
.record_upload_metrics(content_length, UploadType::MultiPart);
Ok(())
}
Err(err) => {
Expand Down Expand Up @@ -358,6 +360,8 @@ impl S3Backend {

impl StorageBackendMethods for S3Backend {
async fn exists(&self, path: &str) -> Result<bool, Error> {
self.otel_metrics.exist_calls.add(1, &[]);

match self
.client
.head_object()
Expand Down Expand Up @@ -404,7 +408,7 @@ impl StorageBackendMethods for S3Backend {
let etag = if let Some(s3_etag) = res.e_tag
&& !s3_etag.is_empty()
{
if let Some(range) = range {
if let Some(range) = &range {
// we can generate a unique etag for a range of the remote object too,
// by just concatenating the original etag with the range start and end.
//
Expand Down Expand Up @@ -436,6 +440,17 @@ impl StorageBackendMethods for S3Backend {
None
};

let content_length: usize = res
.content_length
.and_then(|length| length.try_into().ok())
.unwrap_or(0);

// NOTE: we record the download, even though we don't know if the caller
// actually consumes the stream.
// For the current usage, that's fine.
self.otel_metrics
.record_download_metrics(content_length as u64, range.as_ref());

Ok(StreamingBlob {
path: path.into(),
mime: res
Expand All @@ -446,10 +461,7 @@ impl StorageBackendMethods for S3Backend {
.unwrap_or(mime::APPLICATION_OCTET_STREAM),
date_updated,
etag,
content_length: res
.content_length
.and_then(|length| length.try_into().ok())
.unwrap_or(0),
content_length,
content: Box::new(res.body.into_async_read()),
compression,
})
Expand Down Expand Up @@ -544,6 +556,8 @@ impl StorageBackendMethods for S3Backend {
while let Some(batch) = chunks.next().await {
let batch: Vec<_> = batch.into_iter().collect::<anyhow::Result<_>>()?;

let batch_size = batch.len() as u64;

let to_delete = Delete::builder()
.set_objects(Some(
batch
Expand All @@ -563,12 +577,21 @@ impl StorageBackendMethods for S3Backend {
.await?;

if let Some(errs) = resp.errors {
let successful_deletes = batch_size - errs.len() as u64;
if successful_deletes > 0 {
// we can have partial success, where some of the objects were deleted,
// and some not.
self.otel_metrics.deleted_files.add(successful_deletes, &[]);
}

for err in &errs {
error!("error deleting file from s3: {:?}", err);
}

anyhow::bail!("deleting from s3 failed");
}

self.otel_metrics.deleted_files.add(batch_size, &[]);
}
Ok(())
}
Expand Down
111 changes: 110 additions & 1 deletion crates/lib/docs_rs_storage/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,63 @@
use crate::types::FileRange;
use docs_rs_opentelemetry::AnyMeterProvider;
use opentelemetry::metrics::Counter;
use opentelemetry::{
KeyValue,
metrics::{Counter, Histogram},
};
use strum::IntoStaticStr;
use tracing::Span;

const KIB: f64 = 1024.0;
const MIB: f64 = 1024.0 * KIB;
const GIB: f64 = 1024.0 * MIB;

/// boundaries for histogram metrics where we track
/// file sizes on our S3 bucket.
/// This has to include:
/// * zip archives (between 500 KiB & 10 GiB)
/// * archive indexes (between <100 KiB & 500 MiB)
/// * plain old html / css files (mostly super small)
pub(crate) const FILE_SIZE_HISTOGRAM_BUCKETS: &[f64] = &[
KIB,
4.0 * KIB,
16.0 * KIB,
64.0 * KIB,
256.0 * KIB,
512.0 * KIB,
MIB,
2.0 * MIB,
4.0 * MIB,
8.0 * MIB,
16.0 * MIB,
32.0 * MIB,
64.0 * MIB,
128.0 * MIB,
256.0 * MIB,
512.0 * MIB,
GIB,
2.0 * GIB,
4.0 * GIB,
8.0 * GIB,
10.0 * GIB,
];

#[derive(Copy, Clone, IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
pub(crate) enum UploadType {
Single,
MultiPart,
}

#[derive(Debug)]
pub(crate) struct StorageMetrics {
pub(crate) exist_calls: Counter<u64>,
pub(crate) uploaded_files: Counter<u64>,
pub(crate) uploaded_bytes: Counter<u64>,
pub(crate) uploaded_entry_size: Histogram<u64>,
pub(crate) downloaded_files: Counter<u64>,
pub(crate) downloaded_bytes: Counter<u64>,
pub(crate) downloaded_entry_size: Histogram<u64>,
pub(crate) deleted_files: Counter<u64>,
}

impl StorageMetrics {
Expand All @@ -12,10 +66,65 @@ impl StorageMetrics {
const PREFIX: &str = "docsrs.storage";

Self {
exist_calls: meter
.u64_counter(format!("{PREFIX}.exist_calls"))
.with_unit("1")
.build(),
uploaded_files: meter
.u64_counter(format!("{PREFIX}.uploaded_files"))
.with_unit("1")
.build(),
uploaded_bytes: meter
.u64_counter(format!("{PREFIX}.uploaded_bytes"))
.with_unit("By")
.build(),
uploaded_entry_size: meter
.u64_histogram(format!("{PREFIX}.uploaded_entry_size"))
.with_unit("By")
.with_boundaries(FILE_SIZE_HISTOGRAM_BUCKETS.to_vec())
.build(),
downloaded_files: meter
.u64_counter(format!("{PREFIX}.downloaded_files"))
.with_unit("1")
.build(),
downloaded_bytes: meter
.u64_counter(format!("{PREFIX}.downloaded_bytes"))
.with_unit("By")
.build(),
downloaded_entry_size: meter
.u64_histogram(format!("{PREFIX}.downloaded_entry_size"))
.with_unit("By")
.with_boundaries(FILE_SIZE_HISTOGRAM_BUCKETS.to_vec())
.build(),
deleted_files: meter
.u64_counter(format!("{PREFIX}.deleted_files"))
.with_unit("1")
.build(),
}
}

pub(crate) fn record_download_metrics(&self, content_length: u64, range: Option<&FileRange>) {
let download_type = if range.is_some() { "range" } else { "full" };
Span::current()
.record("storage.download_type", download_type)
.record("storage.content_length", content_length);

let attrs = [KeyValue::new("download_type", download_type)];
self.downloaded_files.add(1, &attrs);
self.downloaded_bytes.add(content_length, &attrs);
self.downloaded_entry_size.record(content_length, &attrs);
}

pub(crate) fn record_upload_metrics(&self, content_length: u64, upload_type: UploadType) {
let upload_type: &str = upload_type.into();

Span::current()
.record("storage.upload_type", upload_type)
.record("storage.content_length", content_length);

let attrs = [KeyValue::new("upload_type", upload_type)];
self.uploaded_files.add(1, &attrs);
self.uploaded_bytes.add(content_length, &attrs);
self.uploaded_entry_size.record(content_length, &attrs);
}
}
Loading