Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ unacceptable behavior to **conduct@buzz-relay.org**.
| Docker | 24+ | For Postgres, Redis, Typesense |
| `just` | latest | Task runner — `cargo install just` |
| `lefthook` | latest | Optional; run `lefthook install` for local Git hooks |
| `pgschema` | latest | Schema tool — `just migrate` applies `schema/schema.sql` declaratively |
| `sqlx` migrations | workspace crate | `just migrate` applies embedded migrations from `migrations/` |

This repo uses [Hermit](https://cashapp.github.io/hermit/) for toolchain
pinning. Activate it once per shell session:
Expand Down
7 changes: 7 additions & 0 deletions crates/buzz-admin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ enum Command {
ListMembers,
/// Generate a new Nostr keypair (for bootstrapping).
GenerateKey,
/// Run pending database migrations.
Migrate,
/// Emit kind:39000/39002 events for channels missing them.
///
/// Channels created via direct SQL (seed scripts, pre-migration data) won't
Expand All @@ -59,6 +61,11 @@ async fn main() -> Result<()> {
println!("Secret key: {}", keys.secret_key().display_secret());
println!("\nSet BUZZ_PRIVATE_KEY to the secret key to use this identity.");
}
Command::Migrate => {
let db = connect_db().await?;
db.migrate().await?;
println!("Database migrations complete.");
}
Command::AddMember { pubkey, role } => {
let db = connect_db().await?;
let pk_bytes = hex::decode(&pubkey)?;
Expand Down
4 changes: 4 additions & 0 deletions crates/buzz-db/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub enum DbError {
#[error("database error: {0}")]
Sqlx(#[from] sqlx::Error),

/// A SQLx migration error.
#[error("migration error: {0}")]
Migrate(#[from] sqlx::migrate::MigrateError),

/// Attempted to store an AUTH event (kind 22242), which is forbidden.
#[error("AUTH events (kind 22242) must not be stored")]
AuthEventRejected,
Expand Down
7 changes: 7 additions & 0 deletions crates/buzz-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub mod error;
pub mod event;
/// Home feed queries.
pub mod feed;
/// Embedded database migrations.
pub mod migration;
/// Monthly table partition management.
pub mod partition;
/// Reaction persistence.
Expand Down Expand Up @@ -196,6 +198,11 @@ impl Db {
Self { pool }
}

/// Run pending database migrations.
pub async fn migrate(&self) -> Result<()> {
migration::run_migrations(&self.pool).await
}

/// Returns `true` if the database is reachable (used by readiness probes).
pub async fn ping(&self) -> bool {
sqlx::query("SELECT 1").execute(&self.pool).await.is_ok()
Expand Down
244 changes: 244 additions & 0 deletions crates/buzz-db/src/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
//! Embedded SQLx migrations for Buzz.
//!
//! Fresh deployments apply the checked-in SQL files under `migrations/`.
//! Existing pre-SQLx deployments are baselined when core Buzz tables already
//! exist but `_sqlx_migrations` does not, so startup will not try to replay the
//! initial schema over a live database.

use sqlx::PgPool;

use crate::Result;

static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("../../migrations");

#[cfg(test)]
static SCHEMA_SQL: &str = include_str!("../../../schema/schema.sql");

const BASELINE_MIGRATION_VERSIONS: &[i64] = &[1, 2];

/// Run all pending Buzz database migrations.
pub async fn run_migrations(pool: &PgPool) -> Result<()> {
baseline_existing_database(pool).await?;
MIGRATOR.run(pool).await?;
Ok(())
}

async fn baseline_existing_database(pool: &PgPool) -> Result<()> {
if migrations_table_exists(pool).await? || !pre_sqlx_schema_exists(pool).await? {
return Ok(());
}

ensure_migrations_table(pool).await?;

for version in BASELINE_MIGRATION_VERSIONS {
let migration = MIGRATOR
.iter()
.find(|migration| migration.version == *version)
.expect("baseline migration version must exist in embedded migrator");

sqlx::query(
r#"
INSERT INTO _sqlx_migrations
(version, description, success, checksum, execution_time)
VALUES ($1, $2, TRUE, $3, 0)
ON CONFLICT (version) DO NOTHING
"#,
)
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
.execute(pool)
.await?;
}

tracing::info!(
versions = ?BASELINE_MIGRATION_VERSIONS,
"Baselined existing Buzz database for SQLx migrations"
);

Ok(())
}

async fn migrations_table_exists(pool: &PgPool) -> Result<bool> {
let exists = sqlx::query_scalar::<_, bool>(
r#"
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = '_sqlx_migrations'
)
"#,
)
.fetch_one(pool)
.await?;

Ok(exists)
}

async fn pre_sqlx_schema_exists(pool: &PgPool) -> Result<bool> {
let exists = sqlx::query_scalar::<_, bool>(
r#"
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'events'
) AND EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'channels'
)
"#,
)
.fetch_one(pool)
.await?;

Ok(exists)
}

async fn ensure_migrations_table(pool: &PgPool) -> Result<()> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS _sqlx_migrations (
version BIGINT PRIMARY KEY,
description TEXT NOT NULL,
installed_on TIMESTAMPTZ NOT NULL DEFAULT now(),
success BOOLEAN NOT NULL,
checksum BYTEA NOT NULL,
execution_time BIGINT NOT NULL
)
"#,
)
.execute(pool)
.await?;

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use sqlx::PgPool;

const TEST_DB_URL: &str = "postgres://buzz:buzz_dev@localhost:5432/buzz";

#[test]
fn embedded_migrator_contains_initial_schema_and_d_tag_backfill() {
let migrations: Vec<_> = MIGRATOR.iter().collect();

assert_eq!(migrations.len(), 2);
assert_eq!(migrations[0].version, 1);
assert_eq!(&*migrations[0].description, "initial schema");
assert!(
migrations[0].sql.as_str().contains("CREATE TABLE channels"),
"initial schema migration should include Buzz core tables"
);
assert!(
migrations[0]
.sql
.as_str()
.contains("CREATE TABLE IF NOT EXISTS relay_members"),
"initial schema migration should include relay_members"
);

assert_eq!(migrations[1].version, 2);
assert_eq!(&*migrations[1].description, "backfill d tag");
assert!(
migrations[1].sql.as_str().contains("UPDATE events"),
"second migration should backfill existing event rows"
);
}

async fn connect_test_pool() -> PgPool {
let database_url = std::env::var("BUZZ_TEST_DATABASE_URL")
.or_else(|_| std::env::var("DATABASE_URL"))
.unwrap_or_else(|_| TEST_DB_URL.to_owned());

PgPool::connect(&database_url)
.await
.expect("connect to test DB")
}

async fn reset_public_schema(pool: &PgPool) {
sqlx::query("DROP SCHEMA IF EXISTS public CASCADE")
.execute(pool)
.await
.expect("drop public schema");
sqlx::query("CREATE SCHEMA IF NOT EXISTS public")
.execute(pool)
.await
.expect("create public schema");
}

async fn applied_versions(pool: &PgPool) -> Vec<i64> {
sqlx::query_scalar::<_, i64>(
"SELECT version FROM _sqlx_migrations WHERE success ORDER BY version",
)
.fetch_all(pool)
.await
.expect("read applied migrations")
}

#[tokio::test]
#[ignore = "requires Postgres"]
async fn run_migrations_applies_embedded_versions_on_fresh_database() {
let pool = connect_test_pool().await;
reset_public_schema(&pool).await;

run_migrations(&pool).await.expect("run migrations");

assert_eq!(applied_versions(&pool).await, vec![1, 2]);
let events_exists = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'events')",
)
.fetch_one(&pool)
.await
.expect("check events table");
assert!(events_exists);
}

#[tokio::test]
#[ignore = "requires Postgres"]
async fn run_migrations_baselines_existing_schema_and_preserves_allowlist_backfill_path() {
let pool = connect_test_pool().await;
reset_public_schema(&pool).await;
sqlx::raw_sql(SCHEMA_SQL)
.execute(&pool)
.await
.expect("load pre-SQLx schema snapshot");
sqlx::query(
"INSERT INTO pubkey_allowlist (pubkey, added_at) VALUES (decode($1, 'hex'), now())",
)
.bind("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
.execute(&pool)
.await
.expect("seed legacy allowlist row");

run_migrations(&pool).await.expect("baseline migrations");

assert_eq!(applied_versions(&pool).await, vec![1, 2]);
let allowlist_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM pubkey_allowlist")
.fetch_one(&pool)
.await
.expect("count allowlist rows");
assert_eq!(
allowlist_count, 1,
"baseline must not drop legacy allowlist rows before relay startup backfills them"
);

let inserted = crate::relay_members::backfill_from_allowlist(&pool)
.await
.expect("backfill legacy allowlist rows");
assert_eq!(inserted, 1);
let relay_member_count = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM relay_members WHERE pubkey = $1 AND role = 'member'",
)
.bind("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
.fetch_one(&pool)
.await
.expect("count backfilled relay member");
assert_eq!(relay_member_count, 1);
}
}
13 changes: 13 additions & 0 deletions crates/buzz-relay/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ async fn main() -> anyhow::Result<()> {
})?;
info!("Postgres connected");

let auto_migrate = std::env::var("BUZZ_AUTO_MIGRATE")
.map(|value| value != "false")
.unwrap_or(true);
if auto_migrate {
db.migrate().await.map_err(|e| {
error!("Failed to run database migrations: {e}");
anyhow::anyhow!("Database migration failed: {e}")
})?;
info!("Database migrations complete");
} else {
info!("Skipping database migrations because BUZZ_AUTO_MIGRATE=false");
}

if let Err(e) = db.ensure_future_partitions(3).await {
error!("Failed to ensure partitions: {e}");
}
Expand Down
12 changes: 4 additions & 8 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,9 @@ _ensure-services:
echo " timed out"
exit 1

# Apply database migrations if pgschema is available
# Apply database migrations if the dev database is running
_ensure-migrations: _ensure-services
#!/usr/bin/env bash
set -euo pipefail
if [[ -x bin/pgschema && -f schema/schema.sql ]]; then
bin/pgschema apply --file schema/schema.sql --auto-approve || true
fi
cargo run -p buzz-admin -- migrate

# Run clippy on the desktop Tauri Rust crate
desktop-tauri-clippy: _ensure-sidecar-stubs
Expand Down Expand Up @@ -388,9 +384,9 @@ mobile-dev:

# ─── Database ─────────────────────────────────────────────────────────────────

# Apply schema migrations via pgschema
# Apply database migrations
migrate: _ensure-services
./bin/pgschema apply --file schema/schema.sql --auto-approve
cargo run -p buzz-admin -- migrate

# ─── Utilities ────────────────────────────────────────────────────────────────

Expand Down
Loading