diff --git a/crates/bin/docs_rs_watcher/Cargo.toml b/crates/bin/docs_rs_watcher/Cargo.toml index 0b0f9ba6a..caeaefa97 100644 --- a/crates/bin/docs_rs_watcher/Cargo.toml +++ b/crates/bin/docs_rs_watcher/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } +aws-sdk-sqs = { version = "1.99.0", default-features = false, features = ["default-https-client", "rt-tokio"] } clap = { workspace = true } # NOTE: on the new infra, switch back from `git-https-reqwest` to `git-https` (curl) once the curl version is new enough crates-index = { version = "3.0.0", default-features = false, features = ["git", "git-https-reqwest", "git-performance", "parallel"] } @@ -32,6 +33,7 @@ rayon = "1.6.1" sqlx = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +url = { workspace = true } [dev-dependencies] docs_rs_config = { path = "../../lib/docs_rs_config", features = ["testing"] } diff --git a/crates/bin/docs_rs_watcher/src/config.rs b/crates/bin/docs_rs_watcher/src/config.rs index 7b5f17976..404ade8f5 100644 --- a/crates/bin/docs_rs_watcher/src/config.rs +++ b/crates/bin/docs_rs_watcher/src/config.rs @@ -2,11 +2,14 @@ use anyhow::Result; use docs_rs_config::AppConfig; use docs_rs_env_vars::{env, maybe_env, require_env}; use std::{path::PathBuf, time::Duration}; +use url::Url; #[derive(Debug)] pub struct Config { pub registry_index_path: PathBuf, pub registry_url: Option, + pub sqs_queue_url: Option, + pub sqs_region: Option, /// How long to wait between registry checks pub delay_between_registry_fetches: Duration, @@ -29,6 +32,8 @@ impl AppConfig for Config { Ok(Self { registry_index_path: env("REGISTRY_INDEX_PATH", prefix.join("crates.io-index"))?, registry_url: maybe_env("REGISTRY_URL")?, + sqs_queue_url: maybe_env("DOCSRS_SQS_QUEUE_URL")?, + sqs_region: maybe_env("DOCSRS_SQS_REGION")?, delay_between_registry_fetches: Duration::from_secs(env::( "DOCSRS_DELAY_BETWEEN_REGISTRY_FETCHES", 60, diff --git a/crates/bin/docs_rs_watcher/src/db/delete.rs b/crates/bin/docs_rs_watcher/src/db/delete.rs index dbfa0e58e..742bbb634 100644 --- a/crates/bin/docs_rs_watcher/src/db/delete.rs +++ b/crates/bin/docs_rs_watcher/src/db/delete.rs @@ -170,14 +170,18 @@ async fn delete_version_from_database( format!("DELETE FROM {table} WHERE {column} IN (SELECT id FROM releases WHERE crate_id = $1 AND version = $2)").as_str()) .bind(crate_id).bind(version).execute(&mut *transaction).await?; } - let is_library: bool = sqlx::query_scalar!( + let Some(is_library) = sqlx::query_scalar!( "DELETE FROM releases WHERE crate_id = $1 AND version = $2 RETURNING is_library", crate_id.0, version as _, ) - .fetch_one(&mut *transaction) + .fetch_optional(&mut *transaction) .await? - .unwrap_or(false); + else { + transaction.commit().await?; + return Ok(false); + }; + let is_library = is_library.unwrap_or(false); sqlx::query!( "DELETE FROM queue WHERE name = $1 AND version = $2;", @@ -690,6 +694,32 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread")] + async fn test_delete_already_deleted_version_doesnt_error() -> Result<()> { + let env = TestEnvironment::new().await?; + let mut conn = env.async_conn().await?; + + env.fake_release() + .await + .name(&KRATE) + .version(V1) + .create() + .await?; + env.fake_release() + .await + .name(&KRATE) + .version(V2) + .create() + .await?; + + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; + + assert!(crate_exists(&mut conn, &KRATE).await?); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_delete_version_waits_for_locked_queue_rows() -> Result<()> { let env = TestEnvironment::new().await?; diff --git a/crates/lib/docs_rs_crates_io/Cargo.toml b/crates/lib/docs_rs_crates_io/Cargo.toml new file mode 100644 index 000000000..96b373c89 --- /dev/null +++ b/crates/lib/docs_rs_crates_io/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "docs_rs_crates_io" +version = "0.1.0" +description = "types & logic for the direct integration between docs.rs & crates.io" + +authors.workspace = true +license.workspace = true +repository.workspace = true +edition.workspace = true + +[dependencies] +chrono = { version = "0.4", features = ["serde"] } +serde = { version = "1", features = ["derive"] } +semver = { version = "1", features = ["serde"] } + +[dev-dependencies] +serde_json = "1.0" + +[lints] +workspace = true diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs new file mode 100644 index 000000000..f01933db6 --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -0,0 +1,207 @@ +#![allow(clippy::disallowed_types)] + +use chrono::{DateTime, Utc}; +use std::fmt; + +/// A change that can happen to a crate on our index. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +#[serde(tag = "type", content = "payload", rename_all = "snake_case")] +pub enum IndexChangeV1 { + /// A crate version was added. + Added(CrateVersion), + /// A crate version was unyanked. + Unyanked(CrateVersion), + /// A crate version was yanked. + Yanked(CrateVersion), + /// The name of the crate whose file was deleted, which implies all versions were deleted as well. + CrateDeleted { name: String }, + /// A crate version was deleted. + VersionDeleted(CrateVersion), +} + +impl fmt::Display for IndexChangeV1 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}", + match *self { + IndexChangeV1::Added(_) => "added", + IndexChangeV1::Yanked(_) => "yanked", + IndexChangeV1::CrateDeleted { .. } => "crate deleted", + IndexChangeV1::VersionDeleted(_) => "version deleted", + IndexChangeV1::Unyanked(_) => "unyanked", + } + ) + } +} + +/// A conventional event envelope for our events between crates.io & docs.rs +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct Event { + /// Unique event identifier for deduplication and tracing. + pub id: String, + /// Timestamp when the event occured + pub occurred_at: DateTime, + /// The typed payload. + #[serde(flatten)] + pub change: T, +} + +/// The first version of the public event wire format. +pub type IndexChangeEventV1 = Event; + +/// Pack all information we know about a change made to a version of a crate. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct CrateVersion { + /// The crate name, i.e. `clap`. + pub name: String, + /// is the release yanked? + pub yanked: bool, + /// The semantic version of the crate. + #[serde(rename = "vers")] + pub version: semver::Version, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn crate_version() -> CrateVersion { + CrateVersion { + name: "clap".into(), + yanked: false, + version: semver::Version::new(4, 5, 0), + } + } + + fn event(change: IndexChangeV1) -> IndexChangeEventV1 { + IndexChangeEventV1 { + id: "evt_123".into(), + occurred_at: DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc), + change, + } + } + + #[test] + fn crate_version_serializes_with_vers_field() { + let event = crate_version(); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "name": "clap", + "yanked": false, + "vers": "4.5.0", + }) + ); + } + + #[test] + fn change_serializes_with_expected_variant_shapes() { + let crate_version = crate_version(); + + let cases = [ + ( + IndexChangeV1::Added(crate_version.clone()), + json!({ + "type": "added", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::Unyanked(crate_version.clone()), + json!({ + "type": "unyanked", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::Yanked(crate_version.clone()), + json!({ + "type": "yanked", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::CrateDeleted { + name: "old-crate".into(), + }, + json!({ + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }), + ), + ( + IndexChangeV1::VersionDeleted(crate_version), + json!({ + "type": "version_deleted", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ]; + + for (event, expected) in cases { + assert_eq!(serde_json::to_value(&event).unwrap(), expected); + } + } + + #[test] + fn event_serializes_with_minimum_metadata() { + let event = event(IndexChangeV1::CrateDeleted { + name: "old-crate".into(), + }); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }) + ); + } + + #[test] + fn event_deserializes_rfc3339_occurred_at() { + let event: IndexChangeEventV1 = serde_json::from_value(json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + })) + .unwrap(); + + assert_eq!( + event.occurred_at, + DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc) + ); + } +} diff --git a/crates/lib/docs_rs_crates_io/src/lib.rs b/crates/lib/docs_rs_crates_io/src/lib.rs new file mode 100644 index 000000000..a9970c28f --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/lib.rs @@ -0,0 +1 @@ +pub mod events;