diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6d9ff3c0d..c4fa19978 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -46,7 +46,7 @@ unacceptable behavior to **conduct@buzz-relay.org**. | Docker | 24+ | For Postgres, Redis, Typesense | | `just` | latest | Task runner — `cargo install just` | | `lefthook` | latest | Optional; run `lefthook install` for local Git hooks | -| `pgschema` | latest | Schema tool — `just migrate` applies `schema/schema.sql` declaratively | +| `sqlx` migrations | workspace crate | `just migrate` applies embedded migrations from `migrations/` | This repo uses [Hermit](https://cashapp.github.io/hermit/) for toolchain pinning. Activate it once per shell session: diff --git a/crates/buzz-admin/src/main.rs b/crates/buzz-admin/src/main.rs index 4a259b110..73a5f7e9c 100644 --- a/crates/buzz-admin/src/main.rs +++ b/crates/buzz-admin/src/main.rs @@ -34,6 +34,8 @@ enum Command { ListMembers, /// Generate a new Nostr keypair (for bootstrapping). GenerateKey, + /// Run pending database migrations. + Migrate, /// Emit kind:39000/39002 events for channels missing them. /// /// Channels created via direct SQL (seed scripts, pre-migration data) won't @@ -59,6 +61,11 @@ async fn main() -> Result<()> { println!("Secret key: {}", keys.secret_key().display_secret()); println!("\nSet BUZZ_PRIVATE_KEY to the secret key to use this identity."); } + Command::Migrate => { + let db = connect_db().await?; + db.migrate().await?; + println!("Database migrations complete."); + } Command::AddMember { pubkey, role } => { let db = connect_db().await?; let pk_bytes = hex::decode(&pubkey)?; diff --git a/crates/buzz-db/src/error.rs b/crates/buzz-db/src/error.rs index 58ee34494..f8b8a2eb5 100644 --- a/crates/buzz-db/src/error.rs +++ b/crates/buzz-db/src/error.rs @@ -9,6 +9,10 @@ pub enum DbError { #[error("database error: {0}")] Sqlx(#[from] sqlx::Error), + /// A SQLx migration error. + #[error("migration error: {0}")] + Migrate(#[from] sqlx::migrate::MigrateError), + /// Attempted to store an AUTH event (kind 22242), which is forbidden. #[error("AUTH events (kind 22242) must not be stored")] AuthEventRejected, diff --git a/crates/buzz-db/src/lib.rs b/crates/buzz-db/src/lib.rs index 84ffc5c67..e51a61d7f 100644 --- a/crates/buzz-db/src/lib.rs +++ b/crates/buzz-db/src/lib.rs @@ -23,6 +23,8 @@ pub mod error; pub mod event; /// Home feed queries. pub mod feed; +/// Embedded database migrations. +pub mod migration; /// Monthly table partition management. pub mod partition; /// Reaction persistence. @@ -196,6 +198,11 @@ impl Db { Self { pool } } + /// Run pending database migrations. + pub async fn migrate(&self) -> Result<()> { + migration::run_migrations(&self.pool).await + } + /// Returns `true` if the database is reachable (used by readiness probes). pub async fn ping(&self) -> bool { sqlx::query("SELECT 1").execute(&self.pool).await.is_ok() diff --git a/crates/buzz-db/src/migration.rs b/crates/buzz-db/src/migration.rs new file mode 100644 index 000000000..aa0a96a5e --- /dev/null +++ b/crates/buzz-db/src/migration.rs @@ -0,0 +1,244 @@ +//! Embedded SQLx migrations for Buzz. +//! +//! Fresh deployments apply the checked-in SQL files under `migrations/`. +//! Existing pre-SQLx deployments are baselined when core Buzz tables already +//! exist but `_sqlx_migrations` does not, so startup will not try to replay the +//! initial schema over a live database. + +use sqlx::PgPool; + +use crate::Result; + +static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("../../migrations"); + +#[cfg(test)] +static SCHEMA_SQL: &str = include_str!("../../../schema/schema.sql"); + +const BASELINE_MIGRATION_VERSIONS: &[i64] = &[1, 2]; + +/// Run all pending Buzz database migrations. +pub async fn run_migrations(pool: &PgPool) -> Result<()> { + baseline_existing_database(pool).await?; + MIGRATOR.run(pool).await?; + Ok(()) +} + +async fn baseline_existing_database(pool: &PgPool) -> Result<()> { + if migrations_table_exists(pool).await? || !pre_sqlx_schema_exists(pool).await? { + return Ok(()); + } + + ensure_migrations_table(pool).await?; + + for version in BASELINE_MIGRATION_VERSIONS { + let migration = MIGRATOR + .iter() + .find(|migration| migration.version == *version) + .expect("baseline migration version must exist in embedded migrator"); + + sqlx::query( + r#" + INSERT INTO _sqlx_migrations + (version, description, success, checksum, execution_time) + VALUES ($1, $2, TRUE, $3, 0) + ON CONFLICT (version) DO NOTHING + "#, + ) + .bind(migration.version) + .bind(&*migration.description) + .bind(&*migration.checksum) + .execute(pool) + .await?; + } + + tracing::info!( + versions = ?BASELINE_MIGRATION_VERSIONS, + "Baselined existing Buzz database for SQLx migrations" + ); + + Ok(()) +} + +async fn migrations_table_exists(pool: &PgPool) -> Result { + let exists = sqlx::query_scalar::<_, bool>( + r#" + SELECT EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = '_sqlx_migrations' + ) + "#, + ) + .fetch_one(pool) + .await?; + + Ok(exists) +} + +async fn pre_sqlx_schema_exists(pool: &PgPool) -> Result { + let exists = sqlx::query_scalar::<_, bool>( + r#" + SELECT EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'events' + ) AND EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = 'channels' + ) + "#, + ) + .fetch_one(pool) + .await?; + + Ok(exists) +} + +async fn ensure_migrations_table(pool: &PgPool) -> Result<()> { + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS _sqlx_migrations ( + version BIGINT PRIMARY KEY, + description TEXT NOT NULL, + installed_on TIMESTAMPTZ NOT NULL DEFAULT now(), + success BOOLEAN NOT NULL, + checksum BYTEA NOT NULL, + execution_time BIGINT NOT NULL + ) + "#, + ) + .execute(pool) + .await?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::PgPool; + + const TEST_DB_URL: &str = "postgres://buzz:buzz_dev@localhost:5432/buzz"; + + #[test] + fn embedded_migrator_contains_initial_schema_and_d_tag_backfill() { + let migrations: Vec<_> = MIGRATOR.iter().collect(); + + assert_eq!(migrations.len(), 2); + assert_eq!(migrations[0].version, 1); + assert_eq!(&*migrations[0].description, "initial schema"); + assert!( + migrations[0].sql.as_str().contains("CREATE TABLE channels"), + "initial schema migration should include Buzz core tables" + ); + assert!( + migrations[0] + .sql + .as_str() + .contains("CREATE TABLE IF NOT EXISTS relay_members"), + "initial schema migration should include relay_members" + ); + + assert_eq!(migrations[1].version, 2); + assert_eq!(&*migrations[1].description, "backfill d tag"); + assert!( + migrations[1].sql.as_str().contains("UPDATE events"), + "second migration should backfill existing event rows" + ); + } + + async fn connect_test_pool() -> PgPool { + let database_url = std::env::var("BUZZ_TEST_DATABASE_URL") + .or_else(|_| std::env::var("DATABASE_URL")) + .unwrap_or_else(|_| TEST_DB_URL.to_owned()); + + PgPool::connect(&database_url) + .await + .expect("connect to test DB") + } + + async fn reset_public_schema(pool: &PgPool) { + sqlx::query("DROP SCHEMA IF EXISTS public CASCADE") + .execute(pool) + .await + .expect("drop public schema"); + sqlx::query("CREATE SCHEMA IF NOT EXISTS public") + .execute(pool) + .await + .expect("create public schema"); + } + + async fn applied_versions(pool: &PgPool) -> Vec { + sqlx::query_scalar::<_, i64>( + "SELECT version FROM _sqlx_migrations WHERE success ORDER BY version", + ) + .fetch_all(pool) + .await + .expect("read applied migrations") + } + + #[tokio::test] + #[ignore = "requires Postgres"] + async fn run_migrations_applies_embedded_versions_on_fresh_database() { + let pool = connect_test_pool().await; + reset_public_schema(&pool).await; + + run_migrations(&pool).await.expect("run migrations"); + + assert_eq!(applied_versions(&pool).await, vec![1, 2]); + let events_exists = sqlx::query_scalar::<_, bool>( + "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'events')", + ) + .fetch_one(&pool) + .await + .expect("check events table"); + assert!(events_exists); + } + + #[tokio::test] + #[ignore = "requires Postgres"] + async fn run_migrations_baselines_existing_schema_and_preserves_allowlist_backfill_path() { + let pool = connect_test_pool().await; + reset_public_schema(&pool).await; + sqlx::raw_sql(SCHEMA_SQL) + .execute(&pool) + .await + .expect("load pre-SQLx schema snapshot"); + sqlx::query( + "INSERT INTO pubkey_allowlist (pubkey, added_at) VALUES (decode($1, 'hex'), now())", + ) + .bind("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + .execute(&pool) + .await + .expect("seed legacy allowlist row"); + + run_migrations(&pool).await.expect("baseline migrations"); + + assert_eq!(applied_versions(&pool).await, vec![1, 2]); + let allowlist_count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM pubkey_allowlist") + .fetch_one(&pool) + .await + .expect("count allowlist rows"); + assert_eq!( + allowlist_count, 1, + "baseline must not drop legacy allowlist rows before relay startup backfills them" + ); + + let inserted = crate::relay_members::backfill_from_allowlist(&pool) + .await + .expect("backfill legacy allowlist rows"); + assert_eq!(inserted, 1); + let relay_member_count = sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) FROM relay_members WHERE pubkey = $1 AND role = 'member'", + ) + .bind("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + .fetch_one(&pool) + .await + .expect("count backfilled relay member"); + assert_eq!(relay_member_count, 1); + } +} diff --git a/crates/buzz-relay/src/main.rs b/crates/buzz-relay/src/main.rs index db60e3201..d4fa046ac 100644 --- a/crates/buzz-relay/src/main.rs +++ b/crates/buzz-relay/src/main.rs @@ -55,6 +55,19 @@ async fn main() -> anyhow::Result<()> { })?; info!("Postgres connected"); + let auto_migrate = std::env::var("BUZZ_AUTO_MIGRATE") + .map(|value| value != "false") + .unwrap_or(true); + if auto_migrate { + db.migrate().await.map_err(|e| { + error!("Failed to run database migrations: {e}"); + anyhow::anyhow!("Database migration failed: {e}") + })?; + info!("Database migrations complete"); + } else { + info!("Skipping database migrations because BUZZ_AUTO_MIGRATE=false"); + } + if let Err(e) = db.ensure_future_partitions(3).await { error!("Failed to ensure partitions: {e}"); } diff --git a/justfile b/justfile index e73e321e4..6c9dbb71a 100644 --- a/justfile +++ b/justfile @@ -148,13 +148,9 @@ _ensure-services: echo " timed out" exit 1 -# Apply database migrations if pgschema is available +# Apply database migrations if the dev database is running _ensure-migrations: _ensure-services - #!/usr/bin/env bash - set -euo pipefail - if [[ -x bin/pgschema && -f schema/schema.sql ]]; then - bin/pgschema apply --file schema/schema.sql --auto-approve || true - fi + cargo run -p buzz-admin -- migrate # Run clippy on the desktop Tauri Rust crate desktop-tauri-clippy: _ensure-sidecar-stubs @@ -388,9 +384,9 @@ mobile-dev: # ─── Database ───────────────────────────────────────────────────────────────── -# Apply schema migrations via pgschema +# Apply database migrations migrate: _ensure-services - ./bin/pgschema apply --file schema/schema.sql --auto-approve + cargo run -p buzz-admin -- migrate # ─── Utilities ──────────────────────────────────────────────────────────────── diff --git a/migrations/0001_initial_schema.sql b/migrations/0001_initial_schema.sql new file mode 100644 index 000000000..93644cba2 --- /dev/null +++ b/migrations/0001_initial_schema.sql @@ -0,0 +1,356 @@ +-- Buzz initial Postgres schema. +-- +-- This migration is the source of truth for fresh database setup. + +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +-- ── Custom types ────────────────────────────────────────────────────────────── + +CREATE TYPE channel_type AS ENUM ('stream', 'forum', 'dm', 'workflow'); +CREATE TYPE channel_visibility AS ENUM ('open', 'private'); +CREATE TYPE member_role AS ENUM ('owner', 'admin', 'member', 'guest', 'bot'); +CREATE TYPE workflow_status AS ENUM ('active', 'disabled', 'archived'); +CREATE TYPE run_status AS ENUM ('pending', 'running', 'waiting_approval', 'completed', 'failed', 'cancelled'); +CREATE TYPE approval_status AS ENUM ('pending', 'granted', 'denied', 'expired'); +CREATE TYPE delivery_method AS ENUM ('webhook', 'websocket'); +CREATE TYPE subscription_status AS ENUM ('active', 'paused', 'deleted'); +CREATE TYPE pause_reason AS ENUM ('user', 'system', 'rate_limit'); +CREATE TYPE channel_add_policy AS ENUM ('anyone', 'owner_only', 'nobody'); + +-- ── Channels ────────────────────────────────────────────────────────────────── + +CREATE TABLE channels ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(255) NOT NULL, + channel_type channel_type NOT NULL DEFAULT 'stream', + visibility channel_visibility NOT NULL DEFAULT 'open', + description TEXT, + canvas TEXT, + created_by BYTEA NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + archived_at TIMESTAMPTZ, + deleted_at TIMESTAMPTZ, + nip29_group_id VARCHAR(255) UNIQUE, + topic_required BOOLEAN NOT NULL DEFAULT FALSE, + max_members INT, + topic TEXT, + topic_set_by BYTEA, + topic_set_at TIMESTAMPTZ, + purpose TEXT, + purpose_set_by BYTEA, + purpose_set_at TIMESTAMPTZ, + participant_hash BYTEA, + ttl_seconds INT, + ttl_deadline TIMESTAMPTZ, + CONSTRAINT chk_channels_id_not_nil CHECK (id <> '00000000-0000-0000-0000-000000000000'::uuid) +); + +CREATE INDEX idx_channels_type ON channels (channel_type); +CREATE INDEX idx_channels_visibility ON channels (visibility); +CREATE INDEX idx_channels_created_by ON channels (created_by); +CREATE UNIQUE INDEX idx_channels_dm_hash ON channels (participant_hash); +CREATE INDEX idx_channels_ttl_expiry ON channels (ttl_deadline) + WHERE ttl_seconds IS NOT NULL AND archived_at IS NULL AND deleted_at IS NULL; + +-- ── Channel members ─────────────────────────────────────────────────────────── + +CREATE TABLE channel_members ( + channel_id UUID NOT NULL REFERENCES channels(id) ON DELETE CASCADE, + pubkey BYTEA NOT NULL, + role member_role NOT NULL DEFAULT 'member', + joined_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + invited_by BYTEA, + removed_at TIMESTAMPTZ, + removed_by BYTEA, + hidden_at TIMESTAMPTZ, + PRIMARY KEY (channel_id, pubkey) +); + +CREATE INDEX idx_channel_members_pubkey ON channel_members (pubkey) + WHERE removed_at IS NULL; + +-- ── Users ───────────────────────────────────────────────────────────────────── + +CREATE TABLE users ( + pubkey BYTEA PRIMARY KEY, + nip05_handle VARCHAR(255) UNIQUE, + display_name VARCHAR(255), + avatar_url TEXT, + about TEXT, + agent_type VARCHAR(255), + capabilities JSONB, + okta_user_id VARCHAR(255) UNIQUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + deactivated_at TIMESTAMPTZ, + metadata_event_id BYTEA, + agent_owner_pubkey BYTEA REFERENCES users(pubkey) ON DELETE SET NULL, + channel_add_policy channel_add_policy NOT NULL DEFAULT 'anyone', + CONSTRAINT chk_users_pubkey_len CHECK (LENGTH(pubkey) = 32) +); + +-- ── Events (partitioned by month on created_at) ────────────────────────────── + +CREATE TABLE events ( + id BYTEA NOT NULL, + pubkey BYTEA NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + kind INT NOT NULL, + tags JSONB NOT NULL, + content TEXT NOT NULL, + sig BYTEA NOT NULL, + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + channel_id UUID, + deleted_at TIMESTAMPTZ, + d_tag TEXT, + PRIMARY KEY (created_at, id) +) PARTITION BY RANGE (created_at); + +CREATE TABLE events_p_past PARTITION OF events + FOR VALUES FROM (MINVALUE) TO ('2026-01-01'); +CREATE TABLE events_p2026_01 PARTITION OF events + FOR VALUES FROM ('2026-01-01') TO ('2026-02-01'); +CREATE TABLE events_p2026_02 PARTITION OF events + FOR VALUES FROM ('2026-02-01') TO ('2026-03-01'); +CREATE TABLE events_p2026_03 PARTITION OF events + FOR VALUES FROM ('2026-03-01') TO ('2026-04-01'); +CREATE TABLE events_p2026_04 PARTITION OF events + FOR VALUES FROM ('2026-04-01') TO ('2026-05-01'); +CREATE TABLE events_p2026_05 PARTITION OF events + FOR VALUES FROM ('2026-05-01') TO ('2026-06-01'); +CREATE TABLE events_p2026_06 PARTITION OF events + FOR VALUES FROM ('2026-06-01') TO ('2026-07-01'); +CREATE TABLE events_p_future PARTITION OF events + FOR VALUES FROM ('2026-07-01') TO (MAXVALUE); + +CREATE INDEX idx_events_pubkey_kind_created ON events (pubkey, kind, created_at); +CREATE INDEX idx_events_channel_created ON events (channel_id, created_at); +CREATE INDEX idx_events_kind_created ON events (kind, created_at); +CREATE INDEX idx_events_id ON events (id); +CREATE INDEX idx_events_deleted ON events (deleted_at); +CREATE INDEX idx_events_addressable ON events (kind, pubkey, channel_id, deleted_at); +CREATE INDEX idx_events_parameterized ON events (kind, pubkey, d_tag, deleted_at) WHERE d_tag IS NOT NULL; + +-- ── Event mentions ──────────────────────────────────────────────────────────── + +CREATE TABLE event_mentions ( + pubkey_hex VARCHAR(64) NOT NULL, + event_id BYTEA NOT NULL, + event_created_at TIMESTAMPTZ NOT NULL, + channel_id UUID, + event_kind INT, + PRIMARY KEY (pubkey_hex, event_id) +); + +CREATE INDEX idx_event_mentions_pubkey_created ON event_mentions (pubkey_hex, event_created_at DESC); +CREATE INDEX idx_event_mentions_pubkey_kind_created ON event_mentions (pubkey_hex, event_kind, event_created_at DESC); + +-- ── Subscriptions ───────────────────────────────────────────────────────────── + +CREATE TABLE subscriptions ( + id VARCHAR(255) PRIMARY KEY, + owner_pubkey BYTEA NOT NULL REFERENCES users(pubkey), + filter_kinds JSONB, + filter_authors JSONB, + filter_channel_ids JSONB, + filter_since TIMESTAMPTZ, + filter_until TIMESTAMPTZ, + delivery_method delivery_method NOT NULL DEFAULT 'webhook', + delivery_url TEXT, + status subscription_status NOT NULL DEFAULT 'active', + pause_reason pause_reason, + delivered_count BIGINT NOT NULL DEFAULT 0, + error_count BIGINT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- ── Delivery log (partitioned by month on delivered_at) ────────────────────── + +CREATE TABLE delivery_log ( + id BIGINT GENERATED ALWAYS AS IDENTITY, + subscription_id VARCHAR(255), + event_id BYTEA, + method delivery_method, + delivered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + success BOOLEAN, + http_status INT, + error_message TEXT, + attempt_number INT DEFAULT 1, + PRIMARY KEY (delivered_at, id) +) PARTITION BY RANGE (delivered_at); + +CREATE TABLE delivery_log_p_past PARTITION OF delivery_log + FOR VALUES FROM (MINVALUE) TO ('2026-03-01'); +CREATE TABLE delivery_log_p2026_03 PARTITION OF delivery_log + FOR VALUES FROM ('2026-03-01') TO ('2026-04-01'); +CREATE TABLE delivery_log_p2026_04 PARTITION OF delivery_log + FOR VALUES FROM ('2026-04-01') TO ('2026-05-01'); +CREATE TABLE delivery_log_p2026_05 PARTITION OF delivery_log + FOR VALUES FROM ('2026-05-01') TO ('2026-06-01'); +CREATE TABLE delivery_log_p2026_06 PARTITION OF delivery_log + FOR VALUES FROM ('2026-06-01') TO ('2026-07-01'); +CREATE TABLE delivery_log_p_future PARTITION OF delivery_log + FOR VALUES FROM ('2026-07-01') TO (MAXVALUE); + +-- ── Workflows ───────────────────────────────────────────────────────────────── + +CREATE TABLE workflows ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(255) NOT NULL, + owner_pubkey BYTEA NOT NULL REFERENCES users(pubkey), + channel_id UUID REFERENCES channels(id), + definition JSONB NOT NULL, + definition_hash BYTEA NOT NULL, + status workflow_status NOT NULL DEFAULT 'active', + enabled BOOLEAN NOT NULL DEFAULT TRUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_workflows_channel_active ON workflows (channel_id, status, enabled); + +-- ── Workflow runs ───────────────────────────────────────────────────────────── + +CREATE TABLE workflow_runs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE, + status run_status NOT NULL DEFAULT 'pending', + trigger_event_id BYTEA, + current_step INT NOT NULL DEFAULT 0, + execution_trace JSONB NOT NULL DEFAULT '[]', + trigger_context JSONB, + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + error_message TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_workflow_runs_workflow ON workflow_runs (workflow_id); +CREATE INDEX idx_workflow_runs_status ON workflow_runs (status); + +-- ── Workflow approvals ──────────────────────────────────────────────────────── + +CREATE TABLE workflow_approvals ( + token BYTEA PRIMARY KEY, + workflow_id UUID NOT NULL REFERENCES workflows(id) ON DELETE CASCADE, + run_id UUID NOT NULL REFERENCES workflow_runs(id) ON DELETE CASCADE, + step_id VARCHAR(64) NOT NULL, + step_index INT NOT NULL, + approver_spec TEXT NOT NULL, + status approval_status NOT NULL DEFAULT 'pending', + approver_pubkey BYTEA, + note TEXT, + granted_at TIMESTAMPTZ, + denied_at TIMESTAMPTZ, + expires_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_workflow_approvals_workflow ON workflow_approvals (workflow_id); +CREATE INDEX idx_workflow_approvals_run ON workflow_approvals (run_id); +CREATE INDEX idx_workflow_approvals_status ON workflow_approvals (status); + +-- ── API tokens ──────────────────────────────────────────────────────────────── + +CREATE TABLE api_tokens ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + token_hash BYTEA NOT NULL UNIQUE, + owner_pubkey BYTEA NOT NULL REFERENCES users(pubkey), + name VARCHAR(255) NOT NULL, + scopes JSONB NOT NULL, + channel_ids JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + expires_at TIMESTAMPTZ, + last_used_at TIMESTAMPTZ, + revoked_at TIMESTAMPTZ, + revoked_by BYTEA, + created_by_self_mint BOOLEAN NOT NULL DEFAULT FALSE, + CONSTRAINT chk_api_tokens_hash_len CHECK (LENGTH(token_hash) = 32) +); + +-- ── Rate limit violations ───────────────────────────────────────────────────── + +CREATE TABLE rate_limit_violations ( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + pubkey BYTEA, + violation_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + limit_type VARCHAR(64), + limit_value INT, + actual_value INT, + action_taken VARCHAR(64) +); + +-- ── Thread metadata ─────────────────────────────────────────────────────────── + +CREATE TABLE thread_metadata ( + event_created_at TIMESTAMPTZ NOT NULL, + event_id BYTEA NOT NULL, + channel_id UUID NOT NULL REFERENCES channels(id), + parent_event_id BYTEA, + parent_event_created_at TIMESTAMPTZ, + root_event_id BYTEA, + root_event_created_at TIMESTAMPTZ, + depth INT NOT NULL DEFAULT 0, + reply_count INT NOT NULL DEFAULT 0, + descendant_count INT NOT NULL DEFAULT 0, + last_reply_at TIMESTAMPTZ, + broadcast BOOLEAN NOT NULL DEFAULT FALSE, + PRIMARY KEY (event_created_at, event_id) +); + +CREATE INDEX idx_thread_metadata_parent ON thread_metadata (parent_event_id); +CREATE INDEX idx_thread_metadata_root ON thread_metadata (root_event_id); +CREATE INDEX idx_thread_metadata_channel_depth ON thread_metadata (channel_id, depth, event_created_at); +CREATE INDEX idx_thread_metadata_event_id ON thread_metadata (event_id); + +-- ── Reactions ───────────────────────────────────────────────────────────────── + +CREATE TABLE reactions ( + event_created_at TIMESTAMPTZ NOT NULL, + event_id BYTEA NOT NULL, + pubkey BYTEA NOT NULL, + emoji VARCHAR(64) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + removed_at TIMESTAMPTZ, + reaction_event_id BYTEA, + PRIMARY KEY (event_created_at, event_id, pubkey, emoji) +); + +CREATE INDEX idx_reactions_event ON reactions (event_id, event_created_at); +CREATE INDEX idx_reactions_pubkey ON reactions (pubkey); +CREATE UNIQUE INDEX idx_reactions_source_event ON reactions (reaction_event_id); + +-- ── Pubkey allowlist ────────────────────────────────────────────────────────── + +CREATE TABLE pubkey_allowlist ( + pubkey BYTEA PRIMARY KEY, + added_by BYTEA, + added_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + note TEXT +); + +-- ── Relay members (NIP-43) ──────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS relay_members ( + pubkey TEXT PRIMARY KEY, + role TEXT NOT NULL CHECK (role IN ('owner', 'admin', 'member')), + added_by TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_relay_members_role ON relay_members(role); + +-- ── Archived identities (NIP-IA) ────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS archived_identities ( + pubkey TEXT PRIMARY KEY, + consent_path TEXT NOT NULL CHECK (consent_path IN ('self', 'owner', 'admin')), + actor TEXT NOT NULL, + reason TEXT, + replaced_by TEXT, + request_event_id TEXT NOT NULL, + archived_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/migrations/0001_relay_members.sql b/migrations/0001_relay_members.sql deleted file mode 100644 index dc6dd1578..000000000 --- a/migrations/0001_relay_members.sql +++ /dev/null @@ -1,51 +0,0 @@ --- Migration 0001: relay_members --- --- Introduces the relay-level membership table (NIP-43). --- Replaces the old pubkey_allowlist with a richer model that tracks role --- (owner / admin / member), who added the entry, and timestamps. --- --- Idempotent: safe to run more than once. - --- ── 1. Create relay_members ─────────────────────────────────────────────────── - -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 FROM information_schema.tables - WHERE table_schema = 'public' - AND table_name = 'relay_members' - ) THEN - CREATE TABLE relay_members ( - pubkey TEXT PRIMARY KEY, - role TEXT NOT NULL CHECK (role IN ('owner', 'admin', 'member')), - added_by TEXT, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now() - ); - - CREATE INDEX idx_relay_members_role ON relay_members(role); - END IF; -END $$; - --- ── 2. Migrate existing allowlist rows ──────────────────────────────────────── --- --- pubkey_allowlist stores pubkeys as BYTEA and timestamps as added_at. --- Convert BYTEA to lowercase hex text via encode(..., 'hex'). - -DO $$ -BEGIN - IF EXISTS ( - SELECT 1 FROM information_schema.tables - WHERE table_schema = 'public' - AND table_name = 'pubkey_allowlist' - ) THEN - INSERT INTO relay_members (pubkey, role, added_by, created_at) - SELECT encode(pubkey, 'hex'), 'member', NULL, added_at - FROM pubkey_allowlist - ON CONFLICT (pubkey) DO NOTHING; - END IF; -END $$; - --- NOTE: pubkey_allowlist is intentionally NOT dropped here. --- The old allowlist code still references it. Drop it in a future migration --- once all references have been removed. diff --git a/migrations/0002_backfill_d_tag.sql b/migrations/0002_backfill_d_tag.sql new file mode 100644 index 000000000..74301271f --- /dev/null +++ b/migrations/0002_backfill_d_tag.sql @@ -0,0 +1,17 @@ +-- Backfill d_tag for existing NIP-33 range events (kind 30000–39999). +-- Idempotent: only updates rows where d_tag is still NULL. +-- Includes soft-deleted rows so the column is fully populated. +-- Run once after adding the d_tag column to the events table. +-- +-- Managed by sqlx migrations. + +UPDATE events +SET d_tag = COALESCE( + (SELECT elem->>1 + FROM jsonb_array_elements(tags) AS elem + WHERE elem->>0 = 'd' + LIMIT 1), + '' +) +WHERE kind BETWEEN 30000 AND 39999 + AND d_tag IS NULL; diff --git a/schema/schema.sql b/schema/schema.sql index 8224c0cae..274fd6774 100644 --- a/schema/schema.sql +++ b/schema/schema.sql @@ -1,7 +1,9 @@ --- Buzz — Declarative Postgres schema (managed by pgschema) +-- Buzz Postgres schema reference. -- --- This file represents the desired state of the database schema. --- Use `pgschema apply --file schema/schema.sql` to bring the database up to date. +-- Runtime migrations live under migrations/ and are applied by sqlx. Keep this +-- file in sync as a human-readable snapshot of the desired schema. + +CREATE EXTENSION IF NOT EXISTS pgcrypto; -- ── Custom types ────────────────────────────────────────────────────────────── diff --git a/scripts/dev-setup.sh b/scripts/dev-setup.sh index 7a3d6e8e7..41c09fe44 100755 --- a/scripts/dev-setup.sh +++ b/scripts/dev-setup.sh @@ -119,59 +119,20 @@ log "Starting services and waiting for health..." # ---- Run migrations --------------------------------------------------------- log "Running database migrations..." - -PGSCHEMA="${REPO_ROOT}/bin/pgschema" -SCHEMA_FILE="${REPO_ROOT}/schema/schema.sql" - -if [[ ! -f "${SCHEMA_FILE}" ]]; then - warn "No schema.sql found at ${SCHEMA_FILE}. Skipping." -else - if [[ -x "${PGSCHEMA}" ]]; then - # pgschema uses CREATE INDEX CONCURRENTLY for new indexes, which Postgres - # does not support on partitioned tables. Pre-create any such indexes here - # so pgschema sees them as already existing and skips the CONCURRENTLY path. - log "Pre-creating indexes on partitioned tables (if needed)..." - docker exec buzz-postgres psql -U "${PGUSER}" -d "${PGDATABASE}" -q -c \ - "CREATE INDEX IF NOT EXISTS idx_events_parameterized ON events (kind, pubkey, d_tag, deleted_at) WHERE d_tag IS NOT NULL;" \ - 2>/dev/null || true - - log "Using pgschema for migrations..." - attempts=0 - max_attempts=10 - pgschema_output="$(mktemp)" - trap 'rm -f "${pgschema_output}"' EXIT - until "${PGSCHEMA}" apply --file "${SCHEMA_FILE}" --auto-approve >"${pgschema_output}" 2>&1; do - attempts=$((attempts + 1)) - if postgres_accepting_connections; then - error "pgschema failed even though Postgres is accepting connections" - cat "${pgschema_output}" >&2 - exit 1 - fi - if [[ ${attempts} -ge ${max_attempts} ]]; then - error "Failed to run migrations after ${max_attempts} attempts" - cat "${pgschema_output}" >&2 - exit 1 - fi - log "Postgres not ready for connections yet, retrying in 2s... (${attempts}/${max_attempts})" - sleep 2 - done - success "Migrations applied via pgschema" - - # Run data backfills (idempotent — safe to re-run). - BACKFILL_DIR="${REPO_ROOT}/scripts" - if [[ -f "${BACKFILL_DIR}/backfill-d-tag.sql" ]]; then - log "Running d_tag backfill for NIP-33 events..." - if psql "${DATABASE_URL}" -f "${BACKFILL_DIR}/backfill-d-tag.sql" 2>/dev/null; then - success "d_tag backfill complete" - else - warn "d_tag backfill failed (relay startup will retry automatically)" - fi - fi - else - error "pgschema not found at ${PGSCHEMA}. Run: ./bin/hermit install pgschema" +attempts=0 +max_attempts=10 +until postgres_accepting_connections; do + attempts=$((attempts + 1)) + if [[ ${attempts} -ge ${max_attempts} ]]; then + error "Postgres did not accept connections after ${max_attempts} attempts" exit 1 fi -fi + log "Postgres not ready for connections yet, retrying in 2s... (${attempts}/${max_attempts})" + sleep 2 +done + +"${REPO_ROOT}/bin/cargo" run -p buzz-admin -- migrate +success "Database migrations complete" # ---- Install desktop dependencies -------------------------------------------