Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 165 additions & 5 deletions crates/buzz-db/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use nostr::Event;
use sqlx::{PgPool, QueryBuilder, Row};
use uuid::Uuid;

use buzz_core::kind::{event_kind_i32, is_ephemeral, is_parameterized_replaceable, KIND_AUTH};
use buzz_core::kind::{
event_kind_i32, is_ephemeral, is_parameterized_replaceable, KIND_AUTH, KIND_EVENT_REMINDER,
};
use buzz_core::StoredEvent;

use crate::error::{DbError, Result};
Expand Down Expand Up @@ -96,6 +98,26 @@ pub fn extract_d_tag(event: &Event) -> Option<String> {
Some(val)
}

/// Extract the `not_before` timestamp for materialization in the `events` table.
///
/// Only applies to `kind:30300` (NIP-ER event reminders). Returns the first
/// valid `not_before` tag value as an `i64` Unix timestamp, or `None` if the
/// event is not a reminder or has no `not_before` tag.
pub fn extract_not_before(event: &Event) -> Option<i64> {
let kind_u32 = event.kind.as_u16() as u32;
if kind_u32 != KIND_EVENT_REMINDER {
return None;
}
event.tags.iter().find_map(|tag| {
let parts = tag.as_slice();
if parts.len() >= 2 && parts[0] == "not_before" {
parts[1].parse::<i64>().ok()
} else {
None
}
})
}

/// Insert a Nostr event. Rejects AUTH and ephemeral kinds.
///
/// Returns `(StoredEvent, was_inserted)` — `was_inserted` is `false` on duplicate.
Expand Down Expand Up @@ -125,10 +147,11 @@ pub async fn insert_event(
.ok_or(DbError::InvalidTimestamp(created_at_secs))?;
let received_at = Utc::now();
let d_tag = extract_d_tag(event);
let not_before = extract_not_before(event);
let result = sqlx::query(
r#"
INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag, not_before)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT DO NOTHING
"#,
)
Expand All @@ -142,6 +165,7 @@ pub async fn insert_event(
.bind(received_at)
.bind(channel_id)
.bind(d_tag.as_deref())
.bind(not_before)
.execute(pool)
.await?;

Expand Down Expand Up @@ -842,13 +866,14 @@ pub async fn insert_event_with_thread_metadata(
.ok_or(DbError::InvalidTimestamp(created_at_secs))?;
let received_at = Utc::now();
let d_tag = extract_d_tag(event);
let not_before = extract_not_before(event);
let mut tx = pool.begin().await?;

// ── Insert event ──────────────────────────────────────────────────────────
let result = sqlx::query(
r#"
INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
INSERT INTO events (id, pubkey, created_at, kind, tags, content, sig, received_at, channel_id, d_tag, not_before)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT DO NOTHING
"#,
)
Expand All @@ -862,6 +887,7 @@ pub async fn insert_event_with_thread_metadata(
.bind(received_at)
.bind(channel_id)
.bind(d_tag.as_deref())
.bind(not_before)
.execute(&mut *tx)
.await?;

Expand Down Expand Up @@ -981,6 +1007,101 @@ pub async fn insert_event_with_thread_metadata(
))
}

/// A due reminder row returned by [`query_due_reminders`].
#[derive(Debug)]
pub struct DueReminder {
/// The event's raw ID bytes.
pub id: Vec<u8>,
/// The event's pubkey bytes.
pub pubkey: Vec<u8>,
/// The event's `created_at` timestamp.
pub created_at: DateTime<Utc>,
/// The event's kind (always 30300).
pub kind: i32,
/// The event's JSONB tags.
pub tags: serde_json::Value,
/// The event's encrypted content.
pub content: String,
/// The event's signature bytes.
pub sig: Vec<u8>,
/// The channel ID (always None for reminders — global events).
pub channel_id: Option<Uuid>,
}

/// Query due reminders: latest-per-address `kind:30300` rows where
/// `not_before <= now`, `deleted_at IS NULL`, `delivered_at IS NULL`.
///
/// Returns the latest head per `(pubkey, d_tag)` using canonical NIP-16
/// ordering (`created_at DESC, id ASC`).
pub async fn query_due_reminders(
pool: &PgPool,
now_secs: i64,
batch_limit: i64,
) -> Result<Vec<DueReminder>> {
let kind_i32 = KIND_EVENT_REMINDER as i32;
let rows = sqlx::query(
r#"
SELECT DISTINCT ON (pubkey, d_tag)
id, pubkey, created_at, kind, tags, content, sig, channel_id
FROM events
WHERE kind = $1
AND not_before IS NOT NULL
AND not_before <= $2
AND deleted_at IS NULL
AND delivered_at IS NULL
ORDER BY pubkey, d_tag, created_at DESC, id ASC
LIMIT $3
"#,
)
.bind(kind_i32)
.bind(now_secs)
.bind(batch_limit)
.fetch_all(pool)
.await?;

let results = rows
.into_iter()
.map(|row| DueReminder {
id: row.get("id"),
pubkey: row.get("pubkey"),
created_at: row.get("created_at"),
kind: row.get("kind"),
tags: row.get("tags"),
content: row.get("content"),
sig: row.get("sig"),
channel_id: row.get("channel_id"),
})
.collect();

Ok(results)
}

/// Atomically claim a due reminder for delivery. Returns `Some(id)` if this
/// caller won the claim (set `delivered_at`), or `None` if another pod already
/// claimed it. Mirrors the reaper's `archived_at IS NULL` guard for cross-pod
/// idempotency.
pub async fn claim_due_reminder(
pool: &PgPool,
event_id: &[u8],
event_created_at: DateTime<Utc>,
) -> Result<bool> {
let now_epoch = Utc::now().timestamp();
let result = sqlx::query(
r#"
UPDATE events
SET delivered_at = $1
WHERE created_at = $2 AND id = $3 AND delivered_at IS NULL
"#,
)
.bind(now_epoch)
.bind(event_created_at)
.bind(event_id)
.execute(pool)
.await?;

Ok(result.rows_affected() > 0)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1082,4 +1203,43 @@ mod tests {
assert_eq!(result.len(), 2048);
assert_eq!(result, long_val);
}

#[test]
fn extract_not_before_from_reminder() {
let event = make_event_with_kind_and_tags(
KIND_EVENT_REMINDER as u16,
vec![Tag::parse(["not_before", "1717000000"]).unwrap()],
);
assert_eq!(extract_not_before(&event), Some(1_717_000_000));
}

#[test]
fn extract_not_before_absent_returns_none() {
// A bookmark/terminal reminder carries no `not_before` tag.
let event = make_event_with_kind_and_tags(
KIND_EVENT_REMINDER as u16,
vec![Tag::parse(["d", "abc"]).unwrap()],
);
assert_eq!(extract_not_before(&event), None);
}

#[test]
fn extract_not_before_non_reminder_returns_none() {
// Only kind:30300 materializes `not_before`; other kinds stay NULL.
let event = make_event_with_kind_and_tags(
30023,
vec![Tag::parse(["not_before", "1717000000"]).unwrap()],
);
assert_eq!(extract_not_before(&event), None);
}

#[test]
fn extract_not_before_non_numeric_returns_none() {
// Malformed values are rejected by ingest; materialization just skips them.
let event = make_event_with_kind_and_tags(
KIND_EVENT_REMINDER as u16,
vec![Tag::parse(["not_before", "not-a-number"]).unwrap()],
);
assert_eq!(extract_not_before(&event), None);
}
}
20 changes: 20 additions & 0 deletions crates/buzz-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,26 @@ impl Db {
channel::reap_expired_ephemeral_channels(&self.pool).await
}

// ── Reminder scheduler ───────────────────────────────────────────────────

/// Query due reminders ready for delivery.
pub async fn query_due_reminders(
&self,
now_secs: i64,
batch_limit: i64,
) -> Result<Vec<event::DueReminder>> {
event::query_due_reminders(&self.pool, now_secs, batch_limit).await
}

/// Atomically claim a due reminder for delivery (cross-pod dedup).
pub async fn claim_due_reminder(
&self,
event_id: &[u8],
event_created_at: chrono::DateTime<chrono::Utc>,
) -> Result<bool> {
event::claim_due_reminder(&self.pool, event_id, event_created_at).await
}

// ── Users ────────────────────────────────────────────────────────────────

/// Ensure a user record exists (upsert).
Expand Down
12 changes: 4 additions & 8 deletions crates/buzz-relay/src/api/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,10 +593,8 @@ pub async fn count_events(
match state.db.query_events(&q).await {
Ok(stored_events) => {
for se in stored_events {
if !buzz_core::filter::filters_match(
std::slice::from_ref(filter),
&se,
) {
if !buzz_core::filter::filters_match(std::slice::from_ref(filter), &se)
{
continue;
}
if crate::handlers::req::is_author_only_event(&se.event, &pubkey_bytes)
Expand Down Expand Up @@ -642,10 +640,8 @@ pub async fn count_events(
match state.db.query_events(&query).await {
Ok(stored_events) => {
for se in stored_events {
if !buzz_core::filter::filters_match(
std::slice::from_ref(filter),
&se,
) {
if !buzz_core::filter::filters_match(std::slice::from_ref(filter), &se)
{
continue;
}
if crate::handlers::req::is_author_only_event(&se.event, &pubkey_bytes)
Expand Down
12 changes: 4 additions & 8 deletions crates/buzz-relay/src/handlers/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,8 @@ pub async fn handle_count(
match state.db.query_events(&q).await {
Ok(stored_events) => {
for se in stored_events {
if !buzz_core::filter::filters_match(
std::slice::from_ref(filter),
&se,
) {
if !buzz_core::filter::filters_match(std::slice::from_ref(filter), &se)
{
continue;
}
if is_author_only_event(&se.event, &pubkey_bytes) {
Expand Down Expand Up @@ -173,10 +171,8 @@ pub async fn handle_count(
match state.db.query_events(&query).await {
Ok(stored_events) => {
for se in stored_events {
if !buzz_core::filter::filters_match(
std::slice::from_ref(filter),
&se,
) {
if !buzz_core::filter::filters_match(std::slice::from_ref(filter), &se)
{
continue;
}
if is_author_only_event(&se.event, &pubkey_bytes) {
Expand Down
43 changes: 43 additions & 0 deletions crates/buzz-relay/src/handlers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,23 @@ pub async fn filter_fanout_by_access(
stored_event: &StoredEvent,
matches: Vec<(crate::subscription::ConnId, crate::subscription::SubId)>,
) -> Vec<(crate::subscription::ConnId, crate::subscription::SubId)> {
// Author-only kinds: only the event's author may receive fan-out.
// Checked before channel gating so it applies to all delivery paths
// (local dispatch, cross-pod subscribe_local, scheduler-published events).
let kind_u32 = event_kind_u32(&stored_event.event);
if AUTHOR_ONLY_KINDS.contains(&kind_u32) {
let author_bytes = stored_event.event.pubkey.to_bytes();
return matches
.into_iter()
.filter(|(conn_id, _)| {
state
.conn_manager
.pubkey_for_conn(*conn_id)
.is_some_and(|pk| pk.as_slice() == author_bytes.as_slice())
})
.collect();
}

let Some(channel_id) = stored_event.channel_id else {
return matches;
};
Expand Down Expand Up @@ -1129,5 +1146,31 @@ mod tests {
filter_fanout_by_access(&state, &channel_event(Some(channel_id)), matches).await;
assert_eq!(out, vec![(member, "m".to_string())]);
}

#[tokio::test]
async fn author_only_kind_delivers_only_to_author() {
let state = test_state().await;
let author_keys = Keys::generate();
let author_pk = author_keys.public_key().to_bytes().to_vec();
let other_pk = vec![99u8; 32];

let author_conn = register_conn(&state, Some(author_pk.clone()));
let other_conn = register_conn(&state, Some(other_pk));
let unauthed_conn = register_conn(&state, None);

// Build a kind:30300 (event reminder) — an author-only kind.
let event = EventBuilder::new(Kind::Custom(30300), "encrypted-content")
.sign_with_keys(&author_keys)
.expect("sign event");
let stored = StoredEvent::new(event, None);

let matches = vec![
(author_conn, "a".to_string()),
(other_conn, "o".to_string()),
(unauthed_conn, "u".to_string()),
];
let out = filter_fanout_by_access(&state, &stored, matches).await;
assert_eq!(out, vec![(author_conn, "a".to_string())]);
}
}
}
Loading