Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/bin/docs_rs_watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition.workspace = true

[dependencies]
anyhow = { workspace = true }
aws-sdk-sqs = { version = "1.99.0", default-features = false, features = ["default-https-client", "rt-tokio"] }
clap = { workspace = true }
# NOTE: on the new infra, switch back from `git-https-reqwest` to `git-https` (curl) once the curl version is new enough
crates-index = { version = "3.0.0", default-features = false, features = ["git", "git-https-reqwest", "git-performance", "parallel"] }
Expand All @@ -32,6 +33,7 @@ rayon = "1.6.1"
sqlx = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

[dev-dependencies]
docs_rs_config = { path = "../../lib/docs_rs_config", features = ["testing"] }
Expand Down
5 changes: 5 additions & 0 deletions crates/bin/docs_rs_watcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ use anyhow::Result;
use docs_rs_config::AppConfig;
use docs_rs_env_vars::{env, maybe_env, require_env};
use std::{path::PathBuf, time::Duration};
use url::Url;

#[derive(Debug)]
pub struct Config {
pub registry_index_path: PathBuf,
pub registry_url: Option<String>,
pub sqs_queue_url: Option<Url>,
pub sqs_region: Option<String>,

/// How long to wait between registry checks
pub delay_between_registry_fetches: Duration,
Expand All @@ -29,6 +32,8 @@ impl AppConfig for Config {
Ok(Self {
registry_index_path: env("REGISTRY_INDEX_PATH", prefix.join("crates.io-index"))?,
registry_url: maybe_env("REGISTRY_URL")?,
sqs_queue_url: maybe_env("DOCSRS_SQS_QUEUE_URL")?,
sqs_region: maybe_env("DOCSRS_SQS_REGION")?,
delay_between_registry_fetches: Duration::from_secs(env::<u64>(
"DOCSRS_DELAY_BETWEEN_REGISTRY_FETCHES",
60,
Expand Down
36 changes: 33 additions & 3 deletions crates/bin/docs_rs_watcher/src/db/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,18 @@ async fn delete_version_from_database(
format!("DELETE FROM {table} WHERE {column} IN (SELECT id FROM releases WHERE crate_id = $1 AND version = $2)").as_str())
.bind(crate_id).bind(version).execute(&mut *transaction).await?;
}
let is_library: bool = sqlx::query_scalar!(
let Some(is_library) = sqlx::query_scalar!(
"DELETE FROM releases WHERE crate_id = $1 AND version = $2 RETURNING is_library",
crate_id.0,
version as _,
)
.fetch_one(&mut *transaction)
.fetch_optional(&mut *transaction)
.await?
.unwrap_or(false);
else {
transaction.commit().await?;
return Ok(false);
};
let is_library = is_library.unwrap_or(false);

sqlx::query!(
"DELETE FROM queue WHERE name = $1 AND version = $2;",
Expand Down Expand Up @@ -690,6 +694,32 @@ mod tests {
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_delete_already_deleted_version_doesnt_error() -> Result<()> {
let env = TestEnvironment::new().await?;
let mut conn = env.async_conn().await?;

env.fake_release()
.await
.name(&KRATE)
.version(V1)
.create()
.await?;
env.fake_release()
.await
.name(&KRATE)
.version(V2)
.create()
.await?;

delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?;
delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?;

assert!(crate_exists(&mut conn, &KRATE).await?);

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_delete_version_waits_for_locked_queue_rows() -> Result<()> {
let env = TestEnvironment::new().await?;
Expand Down
20 changes: 20 additions & 0 deletions crates/lib/docs_rs_crates_io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "docs_rs_crates_io"
version = "0.1.0"
description = "types & logic for the direct integration between docs.rs & crates.io"

authors.workspace = true
license.workspace = true
repository.workspace = true
edition.workspace = true

[dependencies]
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
semver = { version = "1", features = ["serde"] }

[dev-dependencies]
serde_json = "1.0"

[lints]
workspace = true
207 changes: 207 additions & 0 deletions crates/lib/docs_rs_crates_io/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#![allow(clippy::disallowed_types)]

use chrono::{DateTime, Utc};
use std::fmt;

/// A change that can happen to a crate on our index.
#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)]
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
pub enum IndexChangeV1 {
/// A crate version was added.
Added(CrateVersion),

@syphar syphar Jun 3, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to myself:
this needs either pub or accessors

View changes since the review

/// A crate version was unyanked.
Unyanked(CrateVersion),
/// A crate version was yanked.
Yanked(CrateVersion),
/// The name of the crate whose file was deleted, which implies all versions were deleted as well.
CrateDeleted { name: String },
/// A crate version was deleted.
VersionDeleted(CrateVersion),
}

impl fmt::Display for IndexChangeV1 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match *self {
IndexChangeV1::Added(_) => "added",
IndexChangeV1::Yanked(_) => "yanked",
IndexChangeV1::CrateDeleted { .. } => "crate deleted",
IndexChangeV1::VersionDeleted(_) => "version deleted",
IndexChangeV1::Unyanked(_) => "unyanked",
}
)
}
}

/// A conventional event envelope for our events between crates.io & docs.rs
#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)]
pub struct Event<T> {
/// Unique event identifier for deduplication and tracing.
pub id: String,
/// Timestamp when the event occured
pub occurred_at: DateTime<Utc>,
/// The typed payload.
#[serde(flatten)]
pub change: T,
}

/// The first version of the public event wire format.
pub type IndexChangeEventV1 = Event<IndexChangeV1>;

/// Pack all information we know about a change made to a version of a crate.
#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)]
pub struct CrateVersion {
/// The crate name, i.e. `clap`.
pub name: String,
/// is the release yanked?
pub yanked: bool,
/// The semantic version of the crate.
#[serde(rename = "vers")]
pub version: semver::Version,
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;

fn crate_version() -> CrateVersion {
CrateVersion {
name: "clap".into(),
yanked: false,
version: semver::Version::new(4, 5, 0),
}
}

fn event(change: IndexChangeV1) -> IndexChangeEventV1 {
IndexChangeEventV1 {
id: "evt_123".into(),
occurred_at: DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z")
.unwrap()
.with_timezone(&Utc),
change,
}
}

#[test]
fn crate_version_serializes_with_vers_field() {
let event = crate_version();

assert_eq!(
serde_json::to_value(&event).unwrap(),
json!({
"name": "clap",
"yanked": false,
"vers": "4.5.0",
})
);
}

#[test]
fn change_serializes_with_expected_variant_shapes() {
let crate_version = crate_version();

let cases = [
(
IndexChangeV1::Added(crate_version.clone()),
json!({
"type": "added",
"payload": {
"name": "clap",
"yanked": false,
"vers": "4.5.0",
}
}),
),
(
IndexChangeV1::Unyanked(crate_version.clone()),
json!({
"type": "unyanked",
"payload": {
"name": "clap",
"yanked": false,
"vers": "4.5.0",
}
}),
),
(
IndexChangeV1::Yanked(crate_version.clone()),
json!({
"type": "yanked",
"payload": {
"name": "clap",
"yanked": false,
"vers": "4.5.0",
}
}),
),
(
IndexChangeV1::CrateDeleted {
name: "old-crate".into(),
},
json!({
"type": "crate_deleted",
"payload": {
"name": "old-crate"
}
}),
),
(
IndexChangeV1::VersionDeleted(crate_version),
json!({
"type": "version_deleted",
"payload": {
"name": "clap",
"yanked": false,
"vers": "4.5.0",
}
}),
),
];

for (event, expected) in cases {
assert_eq!(serde_json::to_value(&event).unwrap(), expected);
}
}

#[test]
fn event_serializes_with_minimum_metadata() {
let event = event(IndexChangeV1::CrateDeleted {
name: "old-crate".into(),
});

assert_eq!(
serde_json::to_value(&event).unwrap(),
json!({
"id": "evt_123",
"occurred_at": "2026-05-22T12:34:56Z",
"type": "crate_deleted",
"payload": {
"name": "old-crate"
}
})
);
}

#[test]
fn event_deserializes_rfc3339_occurred_at() {
let event: IndexChangeEventV1 = serde_json::from_value(json!({
"id": "evt_123",
"occurred_at": "2026-05-22T12:34:56Z",
"type": "crate_deleted",
"payload": {
"name": "old-crate"
}
}))
.unwrap();

assert_eq!(
event.occurred_at,
DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z")
.unwrap()
.with_timezone(&Utc)
);
}
}
1 change: 1 addition & 0 deletions crates/lib/docs_rs_crates_io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod events;
Loading