From 4ffd3a7f81c9b17239f09aefa6d443b414c82078 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Wed, 10 Jun 2026 17:04:06 +1000 Subject: [PATCH 01/11] perf(staged): coalesce PR-status events, bound backend fetch concurrency, yield background polling (plan phase 0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 0 of moving PR polling into a backend-owned scheduler. Three self-contained changes that fix the intermittent project-switch freeze caused by a PR-polling storm starving the renderer's event loop: - ProjectHome.svelte: buffer pr-status-changed events and apply a single branchesByProject rebuild per animation frame instead of one Map allocation per branch; the pending flush is cancelled on destroy. A storm of N branch events now collapses into one reactive flush. - prs.rs: fan the per-branch fetches in refresh_all_pr_statuses out across a Semaphore-bounded pool (cap 6) instead of awaiting them serially, so a project resolves in ~1 round-trip's wall-clock instead of N. Per-branch and final emissions, error handling, and return value are unchanged — only the scheduling is now parallel. - prPollingService.ts: yield to the event loop between projects in the poll() drain loop, and hold background-tier polling for a short cooldown after setSelectedProject so a switch can interleave instead of waiting out the serial chain. Interval tiers, focus/blur gating, failure backoff, and the due logic are unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Matt Toohey --- apps/staged/src-tauri/src/prs.rs | 136 +++++++++++------- .../lib/features/projects/ProjectHome.svelte | 72 +++++++--- .../src/lib/services/prPollingService.ts | 25 ++++ 3 files changed, 163 insertions(+), 70 deletions(-) diff --git a/apps/staged/src-tauri/src/prs.rs b/apps/staged/src-tauri/src/prs.rs index 9cf1af050..a8f7e4662 100644 --- a/apps/staged/src-tauri/src/prs.rs +++ b/apps/staged/src-tauri/src/prs.rs @@ -824,6 +824,12 @@ pub async fn refresh_pr_status( Ok(()) } +/// Max concurrent PR-status fetches inside a single `refresh_all_pr_statuses` +/// call. Each fetch spawns a `gh` subprocess + GitHub round-trip, so we cap the +/// fan-out to avoid a subprocess thundering herd while still resolving a +/// project's PRs in ~1 round-trip's wall-clock instead of N (fully serial). +const PR_REFRESH_CONCURRENCY: usize = 6; + /// Refresh PR status for all branches in a project. #[tauri::command(rename_all = "camelCase")] pub async fn refresh_all_pr_statuses( @@ -844,7 +850,14 @@ pub async fn refresh_all_pr_statuses( .filter(|b| b.pr_number.is_some()) .collect(); - let mut refreshed_count = 0u32; + // Fan the per-branch fetches out across a bounded pool instead of awaiting + // them one at a time. Repo resolution is a cheap local DB read so it stays + // on this task; only the network fetch + DB write + per-branch emit move + // into the spawned tasks, gated by the semaphore. The per-branch + // `pr-status-changed` emissions and final `pr-statuses-refreshed` event are + // unchanged — only the scheduling is now parallel rather than serial. + let semaphore = Arc::new(tokio::sync::Semaphore::new(PR_REFRESH_CONCURRENCY)); + let mut tasks = Vec::new(); for branch in branches_with_prs { let pr_number = branch.pr_number.unwrap(); @@ -861,60 +874,83 @@ pub async fn refresh_all_pr_statuses( } }; - let pr_result = { - let github_repo = github_repo.clone(); - tauri::async_runtime::spawn_blocking(move || { + let store = Arc::clone(&store); + let app_handle = app_handle.clone(); + let semaphore = Arc::clone(&semaphore); + let branch_id = branch.id.clone(); + + tasks.push(tauri::async_runtime::spawn(async move { + // Hold a permit for the whole fetch so no more than + // PR_REFRESH_CONCURRENCY `gh` round-trips are in flight at once. + let _permit = semaphore + .acquire_owned() + .await + .map_err(|e| format!("refresh_all_pr_statuses semaphore closed: {e}"))?; + + let pr_result = tauri::async_runtime::spawn_blocking(move || { git::fetch_pr_status_for_repo(&github_repo, pr_number) }) .await - .map_err(|e| format!("refresh_all_pr_statuses task failed: {e}"))? - }; - match pr_result { - Ok(pr_status) => { - let mergeable = pr_status.mergeable == "MERGEABLE"; - let pr_fetched_at = store::now_timestamp(); - - if let Err(e) = store.update_branch_pr_status( - &branch.id, - Some(pr_status.state.clone()), - Some(pr_status.checks_summary.state.clone()), - pr_status.review_decision.clone(), - Some(mergeable), - Some(pr_status.is_draft), - None, - None, - pr_status.head_sha.clone(), - ) { - log::warn!("Failed to update PR status for branch {}: {}", branch.id, e); - continue; + .map_err(|e| format!("refresh_all_pr_statuses task failed: {e}"))?; + + match pr_result { + Ok(pr_status) => { + let mergeable = pr_status.mergeable == "MERGEABLE"; + let pr_fetched_at = store::now_timestamp(); + + if let Err(e) = store.update_branch_pr_status( + &branch_id, + Some(pr_status.state.clone()), + Some(pr_status.checks_summary.state.clone()), + pr_status.review_decision.clone(), + Some(mergeable), + Some(pr_status.is_draft), + None, + None, + pr_status.head_sha.clone(), + ) { + log::warn!("Failed to update PR status for branch {}: {}", branch_id, e); + return Ok::(false); + } + + crate::web_server::emit_to_all( + &app_handle, + "pr-status-changed", + PrStatusEvent { + branch_id: branch_id.clone(), + pr_state: pr_status.state, + pr_checks_status: pr_status.checks_summary.state, + pr_review_decision: pr_status.review_decision, + pr_mergeable: mergeable, + pr_draft: pr_status.is_draft, + pr_head_sha: pr_status.head_sha, + pr_fetched_at, + failed_checks: pr_status.failed_checks, + }, + ); + + Ok(true) + } + Err(e) => { + log::warn!( + "Failed to fetch PR status for branch {} (PR #{}): {}", + branch_id, + pr_number, + e + ); + Ok(false) } - - refreshed_count += 1; - - crate::web_server::emit_to_all( - &app_handle, - "pr-status-changed", - PrStatusEvent { - branch_id: branch.id.clone(), - pr_state: pr_status.state, - pr_checks_status: pr_status.checks_summary.state, - pr_review_decision: pr_status.review_decision, - pr_mergeable: mergeable, - pr_draft: pr_status.is_draft, - pr_head_sha: pr_status.head_sha, - pr_fetched_at, - failed_checks: pr_status.failed_checks, - }, - ); - } - Err(e) => { - log::warn!( - "Failed to fetch PR status for branch {} (PR #{}): {}", - branch.id, - pr_number, - e - ); } + })); + } + + let mut refreshed_count = 0u32; + for task in tasks { + let refreshed = task + .await + .map_err(|e| format!("refresh_all_pr_statuses join failed: {e}"))??; + if refreshed { + refreshed_count += 1; } } diff --git a/apps/staged/src/lib/features/projects/ProjectHome.svelte b/apps/staged/src/lib/features/projects/ProjectHome.svelte index 1107088b0..f3def0c6a 100644 --- a/apps/staged/src/lib/features/projects/ProjectHome.svelte +++ b/apps/staged/src/lib/features/projects/ProjectHome.svelte @@ -158,28 +158,55 @@ } ); - // Listen for PR status changes to update branch state - const unlistenPrStatus = listenToEvent('pr-status-changed', (payload) => { - // Find the project that contains this branch and update it - for (const [projectId, branches] of branchesByProject.entries()) { - const branchIndex = branches.findIndex((b) => b.id === payload.branchId); - if (branchIndex !== -1) { - // Update the branch with new PR status - const updatedBranches = [...branches]; - updatedBranches[branchIndex] = { - ...updatedBranches[branchIndex], - prState: payload.prState, - prChecksStatus: payload.prChecksStatus, - prReviewDecision: payload.prReviewDecision, - prMergeable: payload.prMergeable, - prDraft: payload.prDraft, - prHeadSha: payload.prHeadSha, - prFetchedAt: payload.prFetchedAt, - }; - branchesByProject = new Map(branchesByProject).set(projectId, updatedBranches); - break; + // Listen for PR status changes to update branch state. + // + // A PR-polling cycle emits one `pr-status-changed` per branch, so a storm + // of N branches arrives as N separate events. Rebuilding `branchesByProject` + // with a fresh `new Map(...)` per event means N allocations + N derivation + // re-runs, which can pile up on the main thread during a project switch. + // Buffer the events and apply a single rebuild per frame so a burst + // coalesces into one reactive flush without dropping any update. + let pendingPrStatusEvents: PrStatusChangedEvent[] = []; + let prStatusFlushHandle: number | null = null; + + const flushPrStatusEvents = () => { + prStatusFlushHandle = null; + if (pendingPrStatusEvents.length === 0) return; + const events = pendingPrStatusEvents; + pendingPrStatusEvents = []; + + // Apply every buffered event onto one fresh Map. Each event re-scans the + // in-progress map, so multiple updates to the same project compound + // correctly instead of clobbering one another. + const next = new Map(branchesByProject); + for (const payload of events) { + for (const [projectId, branches] of next) { + const branchIndex = branches.findIndex((b) => b.id === payload.branchId); + if (branchIndex !== -1) { + const updatedBranches = [...branches]; + updatedBranches[branchIndex] = { + ...updatedBranches[branchIndex], + prState: payload.prState, + prChecksStatus: payload.prChecksStatus, + prReviewDecision: payload.prReviewDecision, + prMergeable: payload.prMergeable, + prDraft: payload.prDraft, + prHeadSha: payload.prHeadSha, + prFetchedAt: payload.prFetchedAt, + }; + next.set(projectId, updatedBranches); + break; + } } } + branchesByProject = next; + }; + + const unlistenPrStatus = listenToEvent('pr-status-changed', (payload) => { + pendingPrStatusEvents.push(payload); + if (prStatusFlushHandle === null) { + prStatusFlushHandle = requestAnimationFrame(flushPrStatusEvents); + } }); // Refresh a project's branches when a commit session completes so the @@ -205,6 +232,11 @@ unlistenDetection(); unlistenProjectRepoAdded(); unlistenPrStatus(); + if (prStatusFlushHandle !== null) { + cancelAnimationFrame(prStatusFlushHandle); + prStatusFlushHandle = null; + } + pendingPrStatusEvents = []; unlistenSessionStatus(); workspaceLifecycle.stop(); projectRunActionsStore.stopListening(); diff --git a/apps/staged/src/lib/services/prPollingService.ts b/apps/staged/src/lib/services/prPollingService.ts index 0683e7321..eeeb07b94 100644 --- a/apps/staged/src/lib/services/prPollingService.ts +++ b/apps/staged/src/lib/services/prPollingService.ts @@ -25,6 +25,10 @@ const PENDING_INTERVAL = 15_000; // any project with pending CI checks const SELECTED_INTERVAL = 60_000; // selected project, no pending checks const BACKGROUND_INTERVAL = 5 * 60_000; // non-selected, no pending checks const MAX_CONSECUTIVE_FAILURES = 3; +// After a project switch, hold background-tier refreshes for a beat so the +// switch's reactive work isn't competing with a background poll cycle for the +// main thread. Selected/pending tiers still poll during the cooldown. +const SWITCH_COOLDOWN = 1_500; // --------------------------------------------------------------------------- // State @@ -59,6 +63,9 @@ let refreshInFlight = false; let windowFocused = true; let listenersAttached = false; +/** Background-tier polling is deprioritized until this timestamp (see SWITCH_COOLDOWN). */ +let switchCooldownUntil = 0; + /** Project IDs queued for immediate refresh while another refresh is in-flight. */ const pendingRefreshProjectIds = new Set(); @@ -79,6 +86,11 @@ function getProjectInterval(projectId: string): number { return BACKGROUND_INTERVAL; } +/** Yield to the macrotask queue so foreground work can interleave between projects. */ +function yieldToEventLoop(): Promise { + return new Promise((resolve) => setTimeout(resolve, 0)); +} + /** Return project IDs whose polling interval has elapsed. */ function getProjectsDue(): string[] { const now = Date.now(); @@ -103,8 +115,15 @@ async function poll() { refreshInFlight = true; const due = getProjectsDue(); + // Right after a switch, hold background-tier projects so the switch's + // reactive work isn't competing with a background poll cycle. They stay due + // and poll on the next cycle once the cooldown elapses. + const inSwitchCooldown = Date.now() < switchCooldownUntil; for (const projectId of due) { + if (inSwitchCooldown && getProjectInterval(projectId) === BACKGROUND_INTERVAL) { + continue; + } setProjectRefreshing(projectId, true); try { await refreshAllPrStatuses(projectId); @@ -128,6 +147,9 @@ async function poll() { } finally { setProjectRefreshing(projectId, false); } + // Yield between projects so a project switch's reactive flush can interleave + // instead of waiting out the whole serial chain of IPC round-trips. + await yieldToEventLoop(); } refreshInFlight = false; @@ -272,6 +294,9 @@ export function setProjects(projectIds: string[]): void { export function setSelectedProject(projectId: string | null): void { if (selectedProjectId === projectId) return; selectedProjectId = projectId; + // Give the switch's reactive work room to flush before background polling + // resumes competing for the main thread. + switchCooldownUntil = Date.now() + SWITCH_COOLDOWN; if (projectId && allProjectIds.has(projectId)) { // Selected project's interval just changed — trigger a poll if it's due poll(); From 89898bce90cce559a0a7f7665b1f859954fdad3a Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Wed, 10 Jun 2026 17:41:58 +1000 Subject: [PATCH 02/11] refactor(staged): move PR polling cadence and concurrency into a backend scheduler (plan phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 of moving PR polling into a backend-owned scheduler. Makes the backend the single owner of polling cadence and concurrency and collapses `prPollingService.ts` into a thin interest/hint layer. Behavior-preserving: the same effective tiers and triggers, now driven from the backend. Single frontend only — no multi-client registry (that is Phase 2). Backend: - New `pr_poll_scheduler` module: one long-lived `tokio::time::interval` tick loop (granularity 5s, well below the 15s pending tier). It owns the poll-state that used to live in the frontend — per-project `last_polled_at`, the three interval tiers (15s pending / 60s selected / 5m background, moved verbatim from the frontend), and failure/stale tracking. Poll-state is not persisted: on restart everything is "due" (per the plan's open question). On each tick it derives the project list from the DB (`list_projects`), computes which projects are due from owned state + interest, dedups in-flight work, and spawns refreshes. Interest changes and `refresh_now` nudges wake the loop immediately via a `Notify`. - All due/dedup/backoff decisions live in a pure `PollState` whose methods take an explicit `now` (ms) and in-flight set — the injectable seam that makes the logic unit-testable without `gh` calls or wall-clock sleeps. The loop is the only place that touches the real clock and fetcher. 7 unit tests cover the three tiers, the focus=off pause, in-flight dedup, `refresh_now` folding into dedup, the stale threshold, failure backoff, and pruning. - Reuses Phase 0's bounded concurrency: `refresh_all_pr_statuses` is split so its semaphore-bounded fan-out lives in a reusable `refresh_project_pr_statuses` core that both the command and the scheduler call. The scheduler passes one shared `Semaphore` (cap `PR_REFRESH_CONCURRENCY` = 6) across all projects it refreshes, so a focus-regain / launch burst still caps total `gh` subprocesses — the bounded pool is the herd control (explicit time-jitter was left out to keep the due logic pure/testable; a possible follow-up). - New interest/hint commands, registered in the invoke handler: `set_foreground_project`, `set_focus`, `set_branch_pending`, `refresh_now`. The effective tier for a project is the union of interest (any foregrounding => selected; any pending => fast; nothing focused => pause), structured as a union so Phase 2 can extend it. - The scheduler is managed unconditionally (so the hint commands resolve even during the needs-reset prompt) and the tick loop is spawned in setup's `Ok` branch alongside `background_sync`. - Per-branch `pr-status-changed` and final `pr-statuses-refreshed` events are unchanged. Two small events replace state the frontend used to compute in its own poll loop: `pr-refresh-state` (per-project refreshing on/off) and `pr-status-stale` (crossed/recovered the failure threshold). Frontend (`prPollingService.ts` + callers): - Removed the timer-driven drain loop, the interval tiers, the `getProjectsDue` math, and the failure backoff — these now live in the backend. Phase 0's yield helper and switch cooldown went with them (the foreground serial chain they mitigated no longer exists). - Rewired the public methods to backend commands, preserving the API callers use: `setSelectedProject` -> `set_foreground_project`, `updateChecksStatus` -> `set_branch_pending`, `refreshNow` -> `refresh_now`, focus/blur listeners -> `set_focus`. `setProjects` was removed (the backend derives the project list from the DB); `App.svelte`'s project-list `$effect` and its now-unused import were dropped accordingly. - `onStale` / `onRefreshing` / `isRefreshing` are unchanged for callers but are now driven by the new backend lifecycle events, subscribed in a new `init()` (with a matching `dispose()`), wired from `App.svelte`'s onMount/onDestroy. Verification: `cargo check` clean; `cargo test pr_poll_scheduler` 7/7 pass (291 existing tests unaffected); `pnpm check` 0 errors / 0 warnings. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Matt Toohey --- apps/staged/src-tauri/src/lib.rs | 19 + .../staged/src-tauri/src/pr_poll_scheduler.rs | 622 ++++++++++++++++++ apps/staged/src-tauri/src/prs.rs | 63 +- apps/staged/src/App.svelte | 13 +- apps/staged/src/lib/commands.ts | 32 + .../src/lib/services/prPollingService.ts | 361 +++------- 6 files changed, 816 insertions(+), 294 deletions(-) create mode 100644 apps/staged/src-tauri/src/pr_poll_scheduler.rs diff --git a/apps/staged/src-tauri/src/lib.rs b/apps/staged/src-tauri/src/lib.rs index f277605ec..988bebf73 100644 --- a/apps/staged/src-tauri/src/lib.rs +++ b/apps/staged/src-tauri/src/lib.rs @@ -17,6 +17,7 @@ pub mod image_commands; pub mod migrations; pub mod note_commands; pub mod paths; +pub mod pr_poll_scheduler; pub mod project_commands; pub mod project_mcp; pub mod prs; @@ -2002,6 +2003,11 @@ pub fn run() { let compat = store::check_db_compatibility(&db_path) .map_err(|e| format!("Cannot check database: {e}"))?; let session_registry = Arc::new(session_runner::SessionRegistry::new()); + // Backend-owned PR-poll scheduler. Managed unconditionally so the + // interest/hint commands resolve even before the store exists (e.g. + // during the needs-reset prompt); the tick loop is only spawned once + // the store is ready (the `Ok` branch below). + let pr_scheduler = Arc::new(pr_poll_scheduler::PrPollScheduler::new()); let (store_slot, reset_info) = match compat { store::DbCompatibility::Ok => { @@ -2024,6 +2030,13 @@ pub fn run() { } // Start the tiered background sync service for all cloned repos. background_sync::spawn(Arc::clone(&store_arc), app.handle().clone()); + // Start the backend PR-poll scheduler — it owns polling + // cadence/concurrency; the frontend only sends interest hints. + pr_poll_scheduler::spawn( + Arc::clone(&pr_scheduler), + Arc::clone(&store_arc), + app.handle().clone(), + ); // `fsmonitor-v1` only flips `.git/config` flags on stale // clones the user may not visit this session — per-project // `ensure_local_clone` already re-applies the same config @@ -2069,6 +2082,7 @@ pub fn run() { app.manage(store_slot); app.manage(session_registry); + app.manage(pr_scheduler); app.manage(Arc::new(actions::ActionExecutor::new())); app.manage(Arc::new(actions::ActionRegistry::new())); app.manage(ShutdownState::default()); @@ -2238,6 +2252,11 @@ pub fn run() { prs::squash_commits, prs::clear_branch_pr_status, prs::recover_branch_pr, + // PR poll scheduler (frontend interest/hint layer) + pr_poll_scheduler::set_foreground_project, + pr_poll_scheduler::set_focus, + pr_poll_scheduler::set_branch_pending, + pr_poll_scheduler::refresh_now, // Utilities util_commands::open_url, util_commands::is_sq_available, diff --git a/apps/staged/src-tauri/src/pr_poll_scheduler.rs b/apps/staged/src-tauri/src/pr_poll_scheduler.rs new file mode 100644 index 000000000..7d12b7e19 --- /dev/null +++ b/apps/staged/src-tauri/src/pr_poll_scheduler.rs @@ -0,0 +1,622 @@ +//! Backend-owned PR-status poll scheduler. +//! +//! Owns the *cadence and concurrency* of PR-status polling that used to live in +//! the frontend `prPollingService`. A single long-lived tick loop decides which +//! projects are due, dedups in-flight work, and drives every refresh through one +//! bounded pool (shared via [`crate::prs::refresh_project_pr_statuses`]). +//! +//! Frontends shrink to a thin *interest/hint* layer: they tell the backend which +//! project is foregrounded, which branches have pending checks, and whether a +//! window is focused, via the [`set_foreground_project`], [`set_branch_pending`], +//! and [`set_focus`] commands. The effective tier for a project is the union of +//! interest (any foregrounding ⇒ selected; any pending ⇒ fast; nothing focused ⇒ +//! pause). For a single client this is just that client's state, but it is +//! structured as a union so Phase 2 can extend it across connected clients. +//! +//! Poll-state (last-polled timestamps, failure counts) is intentionally **not +//! persisted** — on restart everything is "due", matching the frontend's +//! previous behaviour where the service started fresh each app launch. +//! +//! ## Testing seam +//! +//! All due/dedup/backoff *decisions* live in [`PollState`], whose methods take an +//! explicit `now` (ms since epoch) and in-flight set. The tick loop is the only +//! place that touches the real clock ([`crate::store::now_timestamp`]) and the +//! real fetcher, so the decision logic is unit-testable without `gh` calls or +//! wall-clock sleeps (see the tests at the bottom of this file). + +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use crate::store::Store; + +// --------------------------------------------------------------------------- +// Interval tiers (milliseconds) +// --------------------------------------------------------------------------- +// +// Moved verbatim from the frontend `prPollingService` so the backend is the +// single owner of polling cadence. + +/// Any project with a branch that has pending CI checks (fastest tier). +const PENDING_INTERVAL_MS: i64 = 15_000; +/// The foregrounded/selected project, no pending checks. +const SELECTED_INTERVAL_MS: i64 = 60_000; +/// Background (non-selected, no pending checks). +const BACKGROUND_INTERVAL_MS: i64 = 5 * 60_000; +/// Consecutive failures before a project is reported as stale to the frontend. +const MAX_CONSECUTIVE_FAILURES: u32 = 3; + +/// How often the tick loop wakes up to re-evaluate what is due. Kept well below +/// the fastest tier (15s) so the loop's granularity adds only a small bounded +/// jitter to each tier; interest changes and `refresh_now` nudges additionally +/// wake the loop immediately, so this only bounds the *periodic* re-poll delay. +const TICK_INTERVAL_SECS: u64 = 5; + +// --------------------------------------------------------------------------- +// Poll-state — pure decision logic, no clock / store / Tauri handles +// --------------------------------------------------------------------------- + +/// The poll-state and interest the scheduler owns. Pure: every method that needs +/// the current time takes `now` (ms since epoch) as a parameter, so the +/// due/dedup/backoff logic can be unit-tested deterministically. +struct PollState { + /// When each project was last polled (ms since epoch). Absent ⇒ never + /// polled ⇒ immediately due. Not persisted across restarts. + last_polled_at: HashMap, + /// Consecutive failure count per project. + failures: HashMap, + /// Projects currently reported as stale (failures ≥ threshold). Tracked so a + /// stale event is only emitted on transitions. + stale: HashSet, + /// The foregrounded/selected project (→ selected tier). `Option` models a + /// single client; Phase 2 will union a set across connected clients. + foreground_project: Option, + /// Whether any client window is focused. No focus ⇒ polling pauses. + focused: bool, + /// branch_id → project_id for branches with pending CI checks (→ pending + /// tier). Mirrors the granularity the frontend tracked via + /// `updateChecksStatus`. + pending_branches: HashMap, + /// Projects explicitly nudged via `refresh_now`; due on the next tick + /// regardless of interval or focus. + forced: HashSet, +} + +impl PollState { + fn new() -> Self { + Self { + last_polled_at: HashMap::new(), + failures: HashMap::new(), + stale: HashSet::new(), + foreground_project: None, + // Start focused so the very first tick polls at launch, matching the + // frontend which began with `windowFocused = true`. + focused: true, + pending_branches: HashMap::new(), + forced: HashSet::new(), + } + } + + fn project_has_pending(&self, project_id: &str) -> bool { + self.pending_branches.values().any(|p| p == project_id) + } + + /// The polling interval for a project, as the union of current interest. + /// Mirrors the frontend `getProjectInterval`. + fn interval_for(&self, project_id: &str) -> i64 { + if self.project_has_pending(project_id) { + PENDING_INTERVAL_MS + } else if self.foreground_project.as_deref() == Some(project_id) { + SELECTED_INTERVAL_MS + } else { + BACKGROUND_INTERVAL_MS + } + } + + /// Compute which of `project_ids` should be polled right now. + /// + /// A project is due when it is not already in flight (dedup) and either it + /// has been explicitly forced (`refresh_now` — bypasses focus and interval), + /// or a client is focused and its tier interval has elapsed. + fn due(&self, project_ids: &[String], now: i64, in_flight: &HashSet) -> Vec { + let mut due = Vec::new(); + for id in project_ids { + if in_flight.contains(id) { + continue; // dedup: a refresh for this project is already running + } + if self.forced.contains(id) { + due.push(id.clone()); + continue; + } + if !self.focused { + continue; // no focused client ⇒ pause periodic polling + } + let last = self.last_polled_at.get(id).copied().unwrap_or(0); + if now.saturating_sub(last) >= self.interval_for(id) { + due.push(id.clone()); + } + } + due + } + + /// Drop tracking for projects (and pending branches) that no longer exist. + fn prune(&mut self, known: &HashSet<&str>) { + self.last_polled_at + .retain(|k, _| known.contains(k.as_str())); + self.failures.retain(|k, _| known.contains(k.as_str())); + self.stale.retain(|k| known.contains(k.as_str())); + self.forced.retain(|k| known.contains(k.as_str())); + self.pending_branches + .retain(|_, p| known.contains(p.as_str())); + if let Some(fg) = &self.foreground_project { + if !known.contains(fg.as_str()) { + self.foreground_project = None; + } + } + } + + /// Record a successful poll. Returns `true` if the project transitioned out + /// of the stale state (so the caller should emit a stale-cleared event). + fn record_success(&mut self, project_id: &str, now: i64) -> bool { + self.last_polled_at.insert(project_id.to_string(), now); + self.failures.remove(project_id); + self.forced.remove(project_id); + self.stale.remove(project_id) + } + + /// Record a failed poll. Returns `true` if the project just transitioned + /// into the stale state (so the caller should emit a stale event). + /// + /// `last_polled_at` is advanced so a persistently failing project retries on + /// its normal tier cadence rather than every tick. (The old frontend left + /// `lastPolledAt` untouched on failure, which retried failing projects about + /// once a second — exactly the kind of churn the backend should damp.) + fn record_failure(&mut self, project_id: &str, now: i64) -> bool { + self.last_polled_at.insert(project_id.to_string(), now); + self.forced.remove(project_id); + let count = self.failures.entry(project_id.to_string()).or_insert(0); + *count += 1; + if *count == MAX_CONSECUTIVE_FAILURES { + self.stale.insert(project_id.to_string()) + } else { + false + } + } + + fn set_foreground(&mut self, project_id: Option) { + self.foreground_project = project_id; + } + + fn set_focus(&mut self, focused: bool) { + self.focused = focused; + } + + fn set_branch_pending(&mut self, branch_id: String, project_id: String, pending: bool) { + if pending { + self.pending_branches.insert(branch_id, project_id); + } else { + self.pending_branches.remove(&branch_id); + } + } + + fn force(&mut self, project_id: String) { + self.forced.insert(project_id); + } +} + +// --------------------------------------------------------------------------- +// Scheduler — managed state shared between the tick loop and the hint commands +// --------------------------------------------------------------------------- + +/// Long-lived scheduler state, stored in Tauri managed state as +/// `Arc`. The interest/hint commands mutate [`PollState`] and +/// wake the loop; the loop reads it to decide what to poll. +pub struct PrPollScheduler { + state: Mutex, + /// Projects with an in-flight refresh, for dedup across overlapping ticks + /// and `refresh_now` nudges. + in_flight: Mutex>, + /// Wakes the tick loop when interest changes or a `refresh_now` arrives so + /// it re-evaluates promptly instead of waiting out the periodic tick. + notify: tokio::sync::Notify, +} + +impl PrPollScheduler { + pub fn new() -> Self { + Self { + state: Mutex::new(PollState::new()), + in_flight: Mutex::new(HashSet::new()), + notify: tokio::sync::Notify::new(), + } + } + + fn set_foreground(&self, project_id: Option) { + self.state.lock().unwrap().set_foreground(project_id); + self.notify.notify_one(); + } + + fn set_focus(&self, focused: bool) { + self.state.lock().unwrap().set_focus(focused); + self.notify.notify_one(); + } + + fn set_branch_pending(&self, branch_id: String, project_id: String, pending: bool) { + self.state + .lock() + .unwrap() + .set_branch_pending(branch_id, project_id, pending); + self.notify.notify_one(); + } + + fn force(&self, project_id: String) { + self.state.lock().unwrap().force(project_id); + self.notify.notify_one(); + } +} + +impl Default for PrPollScheduler { + fn default() -> Self { + Self::new() + } +} + +// --------------------------------------------------------------------------- +// Tick loop +// --------------------------------------------------------------------------- + +/// Spawn the PR-poll loop. Call once during app setup, after the store is +/// initialised. Takes the store directly (like `background_sync::spawn`); a DB +/// reset only happens via a restart-gated flow, so the loop's store handle stays +/// valid for the process lifetime. +pub fn spawn(scheduler: Arc, store: Arc, app_handle: tauri::AppHandle) { + tauri::async_runtime::spawn(async move { + poll_loop(scheduler, store, app_handle).await; + }); +} + +async fn poll_loop( + scheduler: Arc, + store: Arc, + app_handle: tauri::AppHandle, +) { + // One bounded pool shared across every project the scheduler refreshes, so a + // tick that finds many projects due still caps total concurrent `gh` + // subprocesses — this is what tames the focus-regain / launch herd. + let semaphore = Arc::new(tokio::sync::Semaphore::new( + crate::prs::PR_REFRESH_CONCURRENCY, + )); + + let mut interval = tokio::time::interval(Duration::from_secs(TICK_INTERVAL_SECS)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + // The first `interval.tick()` resolves immediately, so the loop runs an + // initial tick at startup (everything is due ⇒ initial poll). + tokio::select! { + _ = interval.tick() => {} + _ = scheduler.notify.notified() => {} + } + tick(&scheduler, &store, &app_handle, &semaphore).await; + } +} + +async fn tick( + scheduler: &Arc, + store: &Arc, + app_handle: &tauri::AppHandle, + semaphore: &Arc, +) { + // Re-derive the project list from the DB each tick (cheap indexed read) so + // the backend owns the set of projects to poll without a frontend hint — + // this is what replaces the old `prPollingService.setProjects`. + let project_ids: Vec = match store.list_projects() { + Ok(projects) => projects.into_iter().map(|p| p.id).collect(), + Err(e) => { + log::warn!("[pr_poll] failed to list projects: {e}"); + return; + } + }; + + let now = crate::store::now_timestamp(); + + // Decide what to poll while holding the locks (no `.await` inside), then + // mark those projects in flight. Lock order here (in_flight → state) is the + // only nesting site; completion handlers below take the two locks + // sequentially, never nested, so there is no ordering hazard. + let due = { + let known: HashSet<&str> = project_ids.iter().map(|s| s.as_str()).collect(); + let mut in_flight = scheduler.in_flight.lock().unwrap(); + let mut state = scheduler.state.lock().unwrap(); + state.prune(&known); + let due = state.due(&project_ids, now, &in_flight); + for id in &due { + in_flight.insert(id.clone()); + // Consume the explicit nudge now that we're acting on it. + state.forced.remove(id); + } + due + }; + + for project_id in due { + emit_refresh_state(app_handle, &project_id, true); + + let scheduler = Arc::clone(scheduler); + let store = Arc::clone(store); + let app_handle = app_handle.clone(); + let semaphore = Arc::clone(semaphore); + + tauri::async_runtime::spawn(async move { + let result = crate::prs::refresh_project_pr_statuses( + &store, + &app_handle, + &project_id, + semaphore, + ) + .await; + let now = crate::store::now_timestamp(); + + // Update poll-state, then clear the in-flight marker. The two locks + // are taken sequentially (never nested) to stay deadlock-free. + let stale_change = { + let mut state = scheduler.state.lock().unwrap(); + match &result { + Ok(_) => state.record_success(&project_id, now).then_some(false), + Err(e) => { + log::warn!("[pr_poll] refresh failed for project {project_id}: {e}"); + state.record_failure(&project_id, now).then_some(true) + } + } + }; + scheduler.in_flight.lock().unwrap().remove(&project_id); + + emit_refresh_state(&app_handle, &project_id, false); + if let Some(stale) = stale_change { + emit_stale(&app_handle, &project_id, stale); + } + + // A refresh just finished — wake the loop so any project forced + // while this one was in flight is picked up without waiting out the + // periodic tick. + scheduler.notify.notify_one(); + }); + } +} + +// --------------------------------------------------------------------------- +// Events +// --------------------------------------------------------------------------- + +/// Per-project refresh lifecycle, so the frontend can show "checking right now". +/// Replaces the refresh-state the frontend used to track around its own poll +/// loop, which now lives here. +fn emit_refresh_state(app_handle: &tauri::AppHandle, project_id: &str, refreshing: bool) { + #[derive(Clone, serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct PrRefreshState { + project_id: String, + refreshing: bool, + } + + crate::web_server::emit_to_all( + app_handle, + "pr-refresh-state", + PrRefreshState { + project_id: project_id.to_string(), + refreshing, + }, + ); +} + +/// Project crossed (or recovered from) the consecutive-failure threshold, so the +/// frontend can show a stale-data indicator. +fn emit_stale(app_handle: &tauri::AppHandle, project_id: &str, stale: bool) { + #[derive(Clone, serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct PrStale { + project_id: String, + stale: bool, + } + + crate::web_server::emit_to_all( + app_handle, + "pr-status-stale", + PrStale { + project_id: project_id.to_string(), + stale, + }, + ); +} + +// --------------------------------------------------------------------------- +// Interest / hint commands +// --------------------------------------------------------------------------- + +/// Set the foregrounded/selected project (→ selected tier). `None` clears it. +#[tauri::command(rename_all = "camelCase")] +pub fn set_foreground_project( + scheduler: tauri::State<'_, Arc>, + project_id: Option, +) { + scheduler.set_foreground(project_id); +} + +/// Report window focus. With no focused client, periodic polling pauses (an +/// explicit `refresh_now` still fetches). +#[tauri::command] +pub fn set_focus(scheduler: tauri::State<'_, Arc>, focused: bool) { + scheduler.set_focus(focused); +} + +/// Mark whether a branch has pending CI checks (→ pending tier for its project). +#[tauri::command(rename_all = "camelCase")] +pub fn set_branch_pending( + scheduler: tauri::State<'_, Arc>, + branch_id: String, + project_id: String, + pending: bool, +) { + scheduler.set_branch_pending(branch_id, project_id, pending); +} + +/// Explicitly nudge the scheduler to refresh a project now (e.g. just created or +/// pushed a PR). Folded into the scheduler's dedup rather than fetching directly. +#[tauri::command(rename_all = "camelCase")] +pub fn refresh_now(scheduler: tauri::State<'_, Arc>, project_id: String) { + scheduler.force(project_id); +} + +// --------------------------------------------------------------------------- +// Tests — pure due/dedup/backoff logic, no clock or `gh` +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + fn ids(v: &[&str]) -> Vec { + v.iter().map(|s| s.to_string()).collect() + } + + fn set(v: &[&str]) -> HashSet { + v.iter().map(|s| s.to_string()).collect() + } + + #[test] + fn due_respects_the_three_tiers() { + let mut st = PollState::new(); + st.set_foreground(Some("sel".into())); + st.set_branch_pending("b1".into(), "pend".into(), true); + for id in ["sel", "pend", "bg"] { + st.last_polled_at.insert(id.into(), 0); + } + let projects = ids(&["sel", "pend", "bg"]); + let none = HashSet::new(); + + // Just after the pending interval: only the pending project is due. + assert_eq!( + st.due(&projects, PENDING_INTERVAL_MS, &none), + ids(&["pend"]) + ); + + // Past the selected interval: pending + selected (order follows input). + assert_eq!( + st.due(&projects, SELECTED_INTERVAL_MS, &none), + ids(&["sel", "pend"]) + ); + + // Past the background interval: all three. + assert_eq!( + st.due(&projects, BACKGROUND_INTERVAL_MS, &none), + ids(&["sel", "pend", "bg"]) + ); + } + + #[test] + fn unfocused_pauses_periodic_polling_but_not_forced() { + let mut st = PollState::new(); + st.last_polled_at.insert("p".into(), 0); + st.set_focus(false); + let none = HashSet::new(); + + // Long past every interval, but no focused client ⇒ nothing due. + assert!(st + .due(&ids(&["p"]), BACKGROUND_INTERVAL_MS * 10, &none) + .is_empty()); + + // A forced project still polls while unfocused. + st.force("p".into()); + assert_eq!( + st.due(&ids(&["p"]), BACKGROUND_INTERVAL_MS * 10, &none), + ids(&["p"]) + ); + } + + #[test] + fn in_flight_projects_are_not_re_enqueued() { + let mut st = PollState::new(); + st.last_polled_at.insert("p".into(), 0); + let now = BACKGROUND_INTERVAL_MS * 10; + + // Due when nothing is in flight. + assert_eq!(st.due(&ids(&["p"]), now, &HashSet::new()), ids(&["p"])); + // Deduped when already in flight. + assert!(st.due(&ids(&["p"]), now, &set(&["p"])).is_empty()); + + // Even an explicit nudge does not double-fetch an in-flight project. + st.force("p".into()); + assert!(st.due(&ids(&["p"]), now, &set(&["p"])).is_empty()); + } + + #[test] + fn refresh_now_forces_a_poll_before_the_interval_elapses() { + let mut st = PollState::new(); + st.last_polled_at.insert("p".into(), 0); + + // Just polled ⇒ not yet interval-due. + assert!(st.due(&ids(&["p"]), 1_000, &HashSet::new()).is_empty()); + + // refresh_now folds in: due immediately regardless of interval. + st.force("p".into()); + assert_eq!(st.due(&ids(&["p"]), 1_000, &HashSet::new()), ids(&["p"])); + } + + #[test] + fn failures_trip_stale_after_threshold_then_clear_on_success() { + let mut st = PollState::new(); + assert!(!st.record_failure("p", 1)); + assert!(!st.record_failure("p", 2)); + // Third consecutive failure crosses the threshold ⇒ transition to stale. + assert!(st.record_failure("p", 3)); + // Further failures stay stale (no new transition). + assert!(!st.record_failure("p", 4)); + // A success transitions back out of stale exactly once. + assert!(st.record_success("p", 5)); + assert!(!st.record_success("p", 6)); + } + + #[test] + fn failure_advances_last_polled_so_retries_use_the_tier_cadence() { + let mut st = PollState::new(); + st.last_polled_at.insert("p".into(), 0); + // Fails at now=1000; last_polled advances to 1000. + st.record_failure("p", 1_000); + // Not due again until a full background interval after the failure. + assert!(st + .due( + &ids(&["p"]), + 1_000 + BACKGROUND_INTERVAL_MS - 1, + &HashSet::new() + ) + .is_empty()); + assert_eq!( + st.due( + &ids(&["p"]), + 1_000 + BACKGROUND_INTERVAL_MS, + &HashSet::new() + ), + ids(&["p"]) + ); + } + + #[test] + fn prune_drops_unknown_projects_and_their_interest() { + let mut st = PollState::new(); + st.last_polled_at.insert("gone".into(), 0); + st.failures.insert("gone".into(), 2); + st.stale.insert("gone".into()); + st.force("gone".into()); + st.set_foreground(Some("gone".into())); + st.set_branch_pending("b".into(), "gone".into(), true); + + let known: HashSet<&str> = ["alive"].into_iter().collect(); + st.prune(&known); + + assert!(st.last_polled_at.is_empty()); + assert!(st.failures.is_empty()); + assert!(st.stale.is_empty()); + assert!(st.forced.is_empty()); + assert!(st.pending_branches.is_empty()); + assert!(st.foreground_project.is_none()); + } +} diff --git a/apps/staged/src-tauri/src/prs.rs b/apps/staged/src-tauri/src/prs.rs index a8f7e4662..7ca970bb8 100644 --- a/apps/staged/src-tauri/src/prs.rs +++ b/apps/staged/src-tauri/src/prs.rs @@ -824,13 +824,20 @@ pub async fn refresh_pr_status( Ok(()) } -/// Max concurrent PR-status fetches inside a single `refresh_all_pr_statuses` -/// call. Each fetch spawns a `gh` subprocess + GitHub round-trip, so we cap the -/// fan-out to avoid a subprocess thundering herd while still resolving a -/// project's PRs in ~1 round-trip's wall-clock instead of N (fully serial). -const PR_REFRESH_CONCURRENCY: usize = 6; +/// Max concurrent PR-status fetches inside a single project refresh. Each fetch +/// spawns a `gh` subprocess + GitHub round-trip, so we cap the fan-out to avoid +/// a subprocess thundering herd while still resolving a project's PRs in ~1 +/// round-trip's wall-clock instead of N (fully serial). The backend PR-poll +/// scheduler reuses this cap for a single pool shared across all the projects +/// it refreshes (see `pr_poll_scheduler`). +pub(crate) const PR_REFRESH_CONCURRENCY: usize = 6; /// Refresh PR status for all branches in a project. +/// +/// Thin command wrapper around [`refresh_project_pr_statuses`]; the same core is +/// also driven on a cadence by the backend PR-poll scheduler. Each command call +/// gets its own bounded pool (the scheduler instead shares one pool across +/// projects). #[tauri::command(rename_all = "camelCase")] pub async fn refresh_all_pr_statuses( store: tauri::State<'_, Mutex>>>, @@ -838,30 +845,46 @@ pub async fn refresh_all_pr_statuses( project_id: String, ) -> Result { let store = get_store(&store)?; + let semaphore = Arc::new(tokio::sync::Semaphore::new(PR_REFRESH_CONCURRENCY)); + refresh_project_pr_statuses(&store, &app_handle, &project_id, semaphore).await +} + +/// Core implementation shared by the `refresh_all_pr_statuses` command and the +/// backend PR-poll scheduler. +/// +/// Fans the per-branch fetches out across the bounded `semaphore` pool instead +/// of awaiting them one at a time. Repo resolution is a cheap local DB read so +/// it stays on this task; only the network fetch + DB write + per-branch +/// `pr-status-changed` emit move into the spawned tasks, gated by the semaphore. +/// A final `pr-statuses-refreshed` event is emitted and the number of branches +/// refreshed is returned. +/// +/// The semaphore is passed in (rather than created per call) so the scheduler +/// can share a single pool across every project it refreshes — a tick that +/// finds many projects due still caps total concurrent `gh` subprocesses. +pub(crate) async fn refresh_project_pr_statuses( + store: &Arc, + app_handle: &tauri::AppHandle, + project_id: &str, + semaphore: Arc, +) -> Result { let project = store - .get_project(&project_id) + .get_project(project_id) .map_err(|e| e.to_string())? .ok_or_else(|| format!("Project not found: {project_id}"))?; let branches = store - .list_branches_for_project(&project_id) + .list_branches_for_project(project_id) .map_err(|e| e.to_string())?; let branches_with_prs: Vec<_> = branches .into_iter() .filter(|b| b.pr_number.is_some()) .collect(); - // Fan the per-branch fetches out across a bounded pool instead of awaiting - // them one at a time. Repo resolution is a cheap local DB read so it stays - // on this task; only the network fetch + DB write + per-branch emit move - // into the spawned tasks, gated by the semaphore. The per-branch - // `pr-status-changed` emissions and final `pr-statuses-refreshed` event are - // unchanged — only the scheduling is now parallel rather than serial. - let semaphore = Arc::new(tokio::sync::Semaphore::new(PR_REFRESH_CONCURRENCY)); let mut tasks = Vec::new(); for branch in branches_with_prs { let pr_number = branch.pr_number.unwrap(); - let github_repo = match resolve_branch_repo_and_subpath(&store, &project, &branch) { + let github_repo = match resolve_branch_repo_and_subpath(store, &project, &branch) { Ok((repo, _)) => repo, Err(e) => { log::warn!( @@ -874,7 +897,7 @@ pub async fn refresh_all_pr_statuses( } }; - let store = Arc::clone(&store); + let store = Arc::clone(store); let app_handle = app_handle.clone(); let semaphore = Arc::clone(&semaphore); let branch_id = branch.id.clone(); @@ -885,13 +908,13 @@ pub async fn refresh_all_pr_statuses( let _permit = semaphore .acquire_owned() .await - .map_err(|e| format!("refresh_all_pr_statuses semaphore closed: {e}"))?; + .map_err(|e| format!("refresh_project_pr_statuses semaphore closed: {e}"))?; let pr_result = tauri::async_runtime::spawn_blocking(move || { git::fetch_pr_status_for_repo(&github_repo, pr_number) }) .await - .map_err(|e| format!("refresh_all_pr_statuses task failed: {e}"))?; + .map_err(|e| format!("refresh_project_pr_statuses task failed: {e}"))?; match pr_result { Ok(pr_status) => { @@ -948,13 +971,13 @@ pub async fn refresh_all_pr_statuses( for task in tasks { let refreshed = task .await - .map_err(|e| format!("refresh_all_pr_statuses join failed: {e}"))??; + .map_err(|e| format!("refresh_project_pr_statuses join failed: {e}"))??; if refreshed { refreshed_count += 1; } } - crate::web_server::emit_to_all(&app_handle, "pr-statuses-refreshed", &project_id); + crate::web_server::emit_to_all(app_handle, "pr-statuses-refreshed", project_id); Ok(refreshed_count) } diff --git a/apps/staged/src/App.svelte b/apps/staged/src/App.svelte index d5c237946..7b756e918 100644 --- a/apps/staged/src/App.svelte +++ b/apps/staged/src/App.svelte @@ -45,7 +45,6 @@ import { listenForSessionStatus } from './lib/listeners/sessionStatusListener'; import { darkMode } from './lib/stores/isDark.svelte'; import * as prPollingService from './lib/services/prPollingService'; - import { projectsList } from './lib/features/projects/projectsSidebarState.svelte'; import { reposUiEnabled } from './lib/featureFlags'; import type { StoreIncompatibility } from './lib/types'; @@ -68,12 +67,11 @@ let storeError = $state(null); // ========================================================================= - // App-wide PR polling — sync project list and selected project reactively + // App-wide PR polling — the backend scheduler owns cadence/concurrency and + // derives the project list from the DB. The frontend only forwards the + // selected project as an interest hint (focus + lifecycle wiring is in + // prPollingService.init(), called from onMount). // ========================================================================= - $effect(() => { - prPollingService.setProjects(projectsList.current.map((p) => p.id)); - }); - $effect(() => { prPollingService.setSelectedProject(navigation.selectedProjectId); }); @@ -226,6 +224,8 @@ onMount(async () => { darkMode.init(); + // Wire up PR-polling interest hints (window focus + backend lifecycle events). + prPollingService.init(); document.addEventListener('keydown', handleKonamiKey); // Listen for the app menu Preferences item. @@ -412,6 +412,7 @@ }); onDestroy(() => { + prPollingService.dispose(); document.removeEventListener('keydown', handleKonamiKey); unregisterShortcuts?.(); unlistenSettings?.(); diff --git a/apps/staged/src/lib/commands.ts b/apps/staged/src/lib/commands.ts index ad973231b..681012c42 100644 --- a/apps/staged/src/lib/commands.ts +++ b/apps/staged/src/lib/commands.ts @@ -1071,6 +1071,38 @@ export function refreshAllPrStatuses(projectId: string): Promise { return invokeCommand('refresh_all_pr_statuses', { projectId }); } +// --------------------------------------------------------------------------- +// PR poll scheduler — interest/hint commands +// +// The backend owns PR-polling cadence and concurrency. These commands feed it +// interest hints; the cadence, dedup, and failure backoff all live in the +// backend scheduler (see src-tauri/src/pr_poll_scheduler.rs). +// --------------------------------------------------------------------------- + +/** Tell the backend which project is foregrounded/selected (→ selected tier). */ +export function setForegroundProject(projectId: string | null): Promise { + return invokeCommand('set_foreground_project', { projectId }); +} + +/** Report window focus to the backend. No focused client ⇒ polling pauses. */ +export function setPrPollFocus(focused: boolean): Promise { + return invokeCommand('set_focus', { focused }); +} + +/** Mark whether a branch has pending CI checks (→ pending tier for its project). */ +export function setBranchPending( + branchId: string, + projectId: string, + pending: boolean +): Promise { + return invokeCommand('set_branch_pending', { branchId, projectId, pending }); +} + +/** Nudge the backend to refresh a project's PR statuses now (folded into dedup). */ +export function refreshPrStatusesNow(projectId: string): Promise { + return invokeCommand('refresh_now', { projectId }); +} + // ============================================================================= // Images // ============================================================================= diff --git a/apps/staged/src/lib/services/prPollingService.ts b/apps/staged/src/lib/services/prPollingService.ts index eeeb07b94..7b7e1462d 100644 --- a/apps/staged/src/lib/services/prPollingService.ts +++ b/apps/staged/src/lib/services/prPollingService.ts @@ -1,14 +1,29 @@ /** - * Centralized PR status polling service. + * PR status polling — frontend interest/hint layer. * - * Polls all projects app-wide. The selected project polls more frequently - * than background projects, and projects with pending CI checks poll fastest. + * The backend owns PR-polling cadence and concurrency (see + * `src-tauri/src/pr_poll_scheduler.rs`): it derives the project list from the + * DB, decides what is due across the pending/selected/background tiers, dedups + * in-flight work, and backs off failures — all on a single bounded pool. * - * The backend's `refreshAllPrStatuses` already emits per-branch - * `pr-status-changed` events, so components only need to listen for those. + * This module is now a thin shim that: + * - forwards UI interest to the backend as hints (selected project, pending + * checks, window focus, explicit refresh nudges); and + * - re-broadcasts the backend's per-project refresh/stale lifecycle events to + * local subscribers, preserving the `onRefreshing` / `onStale` / + * `isRefreshing` API that `BranchCardPrButton` relies on. + * + * The backend already emits per-branch `pr-status-changed` and a final + * `pr-statuses-refreshed`; components subscribe to those directly. */ -import { refreshAllPrStatuses } from '../commands'; +import { isTauri, listenToEvent, type UnlistenFn } from '../transport'; +import { + setForegroundProject, + setPrPollFocus, + setBranchPending, + refreshPrStatusesNow, +} from '../commands'; // --------------------------------------------------------------------------- // Types @@ -17,170 +32,37 @@ import { refreshAllPrStatuses } from '../commands'; type StaleCallback = (projectId: string, isStale: boolean) => void; type RefreshingCallback = (projectId: string, isRefreshing: boolean) => void; -// --------------------------------------------------------------------------- -// Intervals -// --------------------------------------------------------------------------- +interface PrRefreshStateEvent { + projectId: string; + refreshing: boolean; +} -const PENDING_INTERVAL = 15_000; // any project with pending CI checks -const SELECTED_INTERVAL = 60_000; // selected project, no pending checks -const BACKGROUND_INTERVAL = 5 * 60_000; // non-selected, no pending checks -const MAX_CONSECUTIVE_FAILURES = 3; -// After a project switch, hold background-tier refreshes for a beat so the -// switch's reactive work isn't competing with a background poll cycle for the -// main thread. Selected/pending tiers still poll during the cooldown. -const SWITCH_COOLDOWN = 1_500; +interface PrStaleEvent { + projectId: string; + stale: boolean; +} // --------------------------------------------------------------------------- // State // --------------------------------------------------------------------------- -/** All project IDs to poll. */ -const allProjectIds = new Set(); - -/** Currently selected (viewed) project. */ -let selectedProjectId: string | null = null; - -/** Branches with pending checks, keyed by branchId → projectId. */ -const pendingBranches = new Map(); - -/** When each project was last successfully polled. */ -const lastPolledAt = new Map(); - -/** Consecutive failure count per projectId. */ -const failures = new Map(); - /** Registered stale-data callbacks. */ const staleCallbacks = new Set(); /** Registered refresh-state callbacks. */ const refreshingCallbacks = new Set(); -/** Projects currently being refreshed. */ +/** Projects the backend currently reports as refreshing. */ const refreshingProjects = new Set(); -let timerId: ReturnType | null = null; -let refreshInFlight = false; -let windowFocused = true; -let listenersAttached = false; - -/** Background-tier polling is deprioritized until this timestamp (see SWITCH_COOLDOWN). */ -let switchCooldownUntil = 0; - -/** Project IDs queued for immediate refresh while another refresh is in-flight. */ -const pendingRefreshProjectIds = new Set(); +let initialized = false; +let unlistenRefreshState: UnlistenFn | null = null; +let unlistenStale: UnlistenFn | null = null; // --------------------------------------------------------------------------- // Internal helpers // --------------------------------------------------------------------------- -function projectHasPendingChecks(projectId: string): boolean { - for (const pId of pendingBranches.values()) { - if (pId === projectId) return true; - } - return false; -} - -function getProjectInterval(projectId: string): number { - if (projectHasPendingChecks(projectId)) return PENDING_INTERVAL; - if (projectId === selectedProjectId) return SELECTED_INTERVAL; - return BACKGROUND_INTERVAL; -} - -/** Yield to the macrotask queue so foreground work can interleave between projects. */ -function yieldToEventLoop(): Promise { - return new Promise((resolve) => setTimeout(resolve, 0)); -} - -/** Return project IDs whose polling interval has elapsed. */ -function getProjectsDue(): string[] { - const now = Date.now(); - const due: string[] = []; - for (const projectId of allProjectIds) { - const interval = getProjectInterval(projectId); - const last = lastPolledAt.get(projectId) ?? 0; - if (now - last >= interval) { - due.push(projectId); - } - } - return due; -} - -async function poll() { - if (refreshInFlight || !windowFocused || allProjectIds.size === 0) { - // Don't reschedule here — the in-flight operation's `finally` block - // already calls scheduleNext(), and the other two cases (unfocused / - // empty) intentionally have no timer running. - return; - } - - refreshInFlight = true; - const due = getProjectsDue(); - // Right after a switch, hold background-tier projects so the switch's - // reactive work isn't competing with a background poll cycle. They stay due - // and poll on the next cycle once the cooldown elapses. - const inSwitchCooldown = Date.now() < switchCooldownUntil; - - for (const projectId of due) { - if (inSwitchCooldown && getProjectInterval(projectId) === BACKGROUND_INTERVAL) { - continue; - } - setProjectRefreshing(projectId, true); - try { - await refreshAllPrStatuses(projectId); - lastPolledAt.set(projectId, Date.now()); - // Reset failure counter on success - const prev = failures.get(projectId) ?? 0; - if (prev > 0) { - failures.set(projectId, 0); - notifyStale(projectId, false); - } - } catch (e) { - const count = (failures.get(projectId) ?? 0) + 1; - failures.set(projectId, count); - console.error( - `[PrPollingService] refreshAllPrStatuses failed for project=${projectId} (attempt ${count}):`, - e - ); - if (count === MAX_CONSECUTIVE_FAILURES) { - notifyStale(projectId, true); - } - } finally { - setProjectRefreshing(projectId, false); - } - // Yield between projects so a project switch's reactive flush can interleave - // instead of waiting out the whole serial chain of IPC round-trips. - await yieldToEventLoop(); - } - - refreshInFlight = false; - scheduleNext(); -} - -function scheduleNext() { - stopTimer(); - if (allProjectIds.size === 0 || !windowFocused) return; - - const now = Date.now(); - let minDelay = Infinity; - for (const projectId of allProjectIds) { - const interval = getProjectInterval(projectId); - const last = lastPolledAt.get(projectId) ?? 0; - const remaining = Math.max(0, interval - (now - last)); - minDelay = Math.min(minDelay, remaining); - } - - if (!Number.isFinite(minDelay)) return; - // Floor at 1s to avoid tight loops - timerId = setTimeout(poll, Math.max(1_000, minDelay)); -} - -function stopTimer() { - if (timerId !== null) { - clearTimeout(timerId); - timerId = null; - } -} - function notifyStale(projectId: string, isStale: boolean) { for (const cb of staleCallbacks) { try { @@ -218,91 +100,67 @@ function setProjectRefreshing(projectId: string, isRefreshing: boolean) { // --------------------------------------------------------------------------- function handleFocus() { - windowFocused = true; - poll(); + void setPrPollFocus(true).catch(() => {}); } function handleBlur() { - windowFocused = false; - stopTimer(); -} - -function ensureWindowListeners() { - if (listenersAttached) return; - window.addEventListener('focus', handleFocus); - window.addEventListener('blur', handleBlur); - listenersAttached = true; -} - -function removeWindowListeners() { - if (!listenersAttached) return; - window.removeEventListener('focus', handleFocus); - window.removeEventListener('blur', handleBlur); - listenersAttached = false; + void setPrPollFocus(false).catch(() => {}); } // --------------------------------------------------------------------------- -// Public API +// Lifecycle // --------------------------------------------------------------------------- -/** Set the full list of project IDs to poll. Starts/stops polling as needed. */ -export function setProjects(projectIds: string[]): void { - const newIds = new Set(projectIds); +/** + * Wire up the interest/hint layer: forward window focus to the backend and + * subscribe to its refresh/stale lifecycle events. Idempotent; call once at app + * start. No-op in web mode (the transport is stubbed in this build). + */ +export function init(): void { + if (initialized || !isTauri) return; + initialized = true; - // Short-circuit if the set of project IDs hasn't changed - if (newIds.size === allProjectIds.size && projectIds.every((id) => allProjectIds.has(id))) { - return; - } + window.addEventListener('focus', handleFocus); + window.addEventListener('blur', handleBlur); + // Seed the backend with the current focus state (it defaults to focused, so + // the initial poll already ran; this corrects it if we launched unfocused). + void setPrPollFocus(document.hasFocus()).catch(() => {}); - // Remove projects no longer in the list - for (const id of allProjectIds) { - if (!newIds.has(id)) { - allProjectIds.delete(id); - lastPolledAt.delete(id); - failures.delete(id); - setProjectRefreshing(id, false); - } - } + unlistenRefreshState = listenToEvent('pr-refresh-state', (payload) => { + setProjectRefreshing(payload.projectId, payload.refreshing); + }); + unlistenStale = listenToEvent('pr-status-stale', (payload) => { + notifyStale(payload.projectId, payload.stale); + }); +} - // Clean up pending branches for removed projects - for (const [branchId, projectId] of pendingBranches) { - if (!newIds.has(projectId)) { - pendingBranches.delete(branchId); - } - } +/** Tear down listeners and subscriptions. */ +export function dispose(): void { + if (!initialized) return; + initialized = false; - // Add new projects - for (const id of newIds) { - allProjectIds.add(id); - } + window.removeEventListener('focus', handleFocus); + window.removeEventListener('blur', handleBlur); + unlistenRefreshState?.(); + unlistenStale?.(); + unlistenRefreshState = null; + unlistenStale = null; - if (allProjectIds.size > 0) { - ensureWindowListeners(); - // Trigger poll — new projects have no lastPolledAt so they'll be due - poll(); - } else { - stopTimer(); - removeWindowListeners(); - failures.clear(); - for (const projectId of [...refreshingProjects]) { - setProjectRefreshing(projectId, false); - } + for (const projectId of [...refreshingProjects]) { + setProjectRefreshing(projectId, false); } } +// --------------------------------------------------------------------------- +// Public API — interest hints (forwarded to the backend scheduler) +// --------------------------------------------------------------------------- + /** Set the currently selected project (polls more frequently). */ export function setSelectedProject(projectId: string | null): void { - if (selectedProjectId === projectId) return; - selectedProjectId = projectId; - // Give the switch's reactive work room to flush before background polling - // resumes competing for the main thread. - switchCooldownUntil = Date.now() + SWITCH_COOLDOWN; - if (projectId && allProjectIds.has(projectId)) { - // Selected project's interval just changed — trigger a poll if it's due - poll(); - } else { - scheduleNext(); - } + if (!isTauri) return; + void setForegroundProject(projectId).catch((e) => + console.error('[PrPollingService] set_foreground_project failed:', e) + ); } /** Update whether a branch has pending CI checks (affects its project's poll interval). */ @@ -311,17 +169,24 @@ export function updateChecksStatus( projectId: string, hasPendingChecks: boolean ): void { - const hadPending = pendingBranches.has(branchId); - if (hasPendingChecks) { - pendingBranches.set(branchId, projectId); - } else { - pendingBranches.delete(branchId); - } - if (hadPending !== hasPendingChecks) { - scheduleNext(); - } + if (!isTauri) return; + void setBranchPending(branchId, projectId, hasPendingChecks).catch((e) => + console.error('[PrPollingService] set_branch_pending failed:', e) + ); +} + +/** Trigger an immediate refresh for a specific project (e.g. after PR creation or push). */ +export function refreshNow(projectId: string): void { + if (!isTauri) return; + void refreshPrStatusesNow(projectId).catch((e) => + console.error(`[PrPollingService] refresh_now failed for project=${projectId}:`, e) + ); } +// --------------------------------------------------------------------------- +// Public API — UI state subscriptions (driven by backend lifecycle events) +// --------------------------------------------------------------------------- + /** Register a callback for stale-data notifications. Returns an unsubscribe function. */ export function onStale(callback: StaleCallback): () => void { staleCallbacks.add(callback); @@ -342,7 +207,7 @@ export function isRefreshing(projectId: string): boolean { } // --------------------------------------------------------------------------- -// PR recovery coordination +// PR recovery coordination (frontend-only dedup, unchanged) // --------------------------------------------------------------------------- /** Branch IDs for which recovery has already been attempted (or is in progress). */ @@ -368,43 +233,3 @@ export function shouldAttemptRecovery(branchId: string): boolean { export function clearRecoveryAttempt(branchId: string): void { recoveryAttempted.delete(branchId); } - -/** Trigger an immediate refresh for a specific project (e.g. after PR creation or push). */ -export function refreshNow(projectId: string): void { - if (refreshInFlight) { - // Queue so the project is refreshed as soon as the current operation finishes. - pendingRefreshProjectIds.add(projectId); - return; - } - refreshInFlight = true; - setProjectRefreshing(projectId, true); - refreshAllPrStatuses(projectId) - .then(() => { - lastPolledAt.set(projectId, Date.now()); - // Reset failure counter on success - const prev = failures.get(projectId) ?? 0; - if (prev > 0) { - failures.set(projectId, 0); - notifyStale(projectId, false); - } - }) - .catch((e) => - console.error(`[PrPollingService] immediate refresh failed for project=${projectId}:`, e) - ) - .finally(() => { - setProjectRefreshing(projectId, false); - refreshInFlight = false; - // Drain queued immediate-refresh requests one at a time. - if (pendingRefreshProjectIds.size > 0) { - const queued = [...pendingRefreshProjectIds]; - pendingRefreshProjectIds.clear(); - // Re-queue all but the first; they'll drain on the next finally cycle. - for (let i = 1; i < queued.length; i++) { - pendingRefreshProjectIds.add(queued[i]); - } - refreshNow(queued[0]); - } else { - scheduleNext(); - } - }); -} From fa22aa611e681c30a01f6ec5c81a0c839348dcb8 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 11 Jun 2026 09:15:34 +1000 Subject: [PATCH 03/11] refactor(staged): per-client PR-poll interest with union cadence and disconnect eviction (plan phase 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 of the PR-poll-scheduler rearchitecture. Turns the scheduler's process-global, single-valued interest into per-connected-client interest, with the effective cadence computed as the union across all clients and per-client eviction on disconnect. Clients are the native Tauri window plus each WebSocket browser session (web path lands in parallel). Per-client data model + union cadence (pr_poll_scheduler.rs): - New `ClientInterest { foreground_project, focused, pending_branches, last_seen }`. `PollState` drops the scalar `foreground_project`/`focused`/ `pending_branches` and gains `clients: HashMap`. The per-project *work* bookkeeping (`last_polled_at`/`failures`/`stale`/ `forced`) stays project-keyed and shared — it is about the work, not the observer, so N clients still trigger only one poll per project per tier. - Union helpers keep `PollState` clock-free (all `now` passed in): `any_focused()`, `is_foreground(p)`, and `project_has_pending(p)` rewritten over `clients`. `interval_for` keeps its precedence (any pending => 15s; else foregrounded => 60s; else 5m); `due` swaps `!self.focused` for `!self.any_focused()`. `forced`/dedup/interval logic is untouched. - Client-keyed setters bump `last_seen`: `set_foreground`/`set_focus`/ `set_branch_pending(client_id, .., now)`. `force(project_id)` stays global. Lifecycle + TTL: - New `touch(client_id, now)` (create-or-bump heartbeat), `disconnect_client` (clean WS close), and `evict_stale_clients(now, ttl_ms)` (dirty-drop fallback). `CLIENT_TTL_MS = 90_000` (~3x an expected <=30s WS keepalive); the tick loop calls `evict_stale_clients` inside the existing locked block before `prune`. `TAURI_CLIENT_ID = "tauri-main"` is exempt from TTL eviction (process death is the native window's teardown). - `prune` keeps project-keyed pruning and now also clears each client's foreground/pending interest in dead projects; client lifecycle eviction stays separate from project prune. Phase-1 equivalence: `PollState::new()` pre-seeds `clients` with the Tauri id as `focused: true, last_seen: 0`, so the first immediate tick still polls at launch and single-client behavior is byte-for-byte equivalent to Phase 1 (the Tauri frontend's later hints update that same entry). Commands (pr_poll_scheduler.rs + lib.rs): all four hint commands gain a `client_id` arg and `rename_all = "camelCase"` (added to `set_focus`, which lacked it); `refresh_now` carries `client_id` only to `touch` that client. New `disconnect_client` command, registered in the lib.rs invoke handler. The public `PrPollScheduler` methods (`set_foreground`/`set_focus`/ `set_branch_pending`/`force`/`touch`/`disconnect_client`) read the clock and delegate to the pure `PollState`, and are `pub` so the parallel web `handle_ws`/`dispatch` can drive them via `app_handle.state::>()`. Frontend (prPollingService.ts + commands.ts): the service owns a module-level client id — the fixed `"tauri-main"` under Tauri (via `isTauri`), else a `crypto.randomUUID()` per page load — threaded through all four hint wrappers (now carrying `clientId`). New `disconnectPrPollClient(clientId)` wrapper, called from `dispose()`. The id is exposed via `getPrPollClientId()` so the web transport can append the same value as `?clientId=` on the WS connect. Backend<->web contract the parallel work must honor: the same `clientId` flows on both channels — every interest invoke and the events WS connect query. The web work owns adding the four hint arms + `disconnect_client` to `dispatch` (reading `clientId` from args) and the `handle_ws` `touch` (connect/ping) / `disconnect_client` (close) calls, plus ensuring a keepalive actually flows so the TTL fallback works. Intentionally out of scope: `web_server.rs` is left to the parallel web work (touching it here would conflict); the two open code-review bugs — `record_success`/`record_failure` clearing `forced` (mid-flight re-force drop) and pruning `pending_branches` by branch id — are left for their own commits. `forced` stays exactly as today (global, `record_*` behavior unchanged) so the bug-(a) fix applies cleanly on top. Tests: the 7 existing PollState tests are ported to the per-client structure (seeding the single Tauri client); 9 new pure-`PollState` tests cover foreground/pending/focus union across two clients, disconnect recomputing the union and dropping pending, TTL eviction with the Tauri-id exemption, `touch` keeping a client alive past the boundary, `forced` surviving the forcing client's disconnect, and single-client equivalence to Phase 1. Verification: - `cargo check` (src-tauri) — clean, no warnings. - `cargo test pr_poll_scheduler` — 16/16 pass (7 ported + 9 new; 291 other tests unaffected). - `pnpm check` — 0 errors / 0 warnings. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Matt Toohey --- apps/staged/src-tauri/src/lib.rs | 1 + .../staged/src-tauri/src/pr_poll_scheduler.rs | 481 +++++++++++++++--- apps/staged/src/lib/commands.ts | 30 +- .../src/lib/services/prPollingService.ts | 47 +- 4 files changed, 483 insertions(+), 76 deletions(-) diff --git a/apps/staged/src-tauri/src/lib.rs b/apps/staged/src-tauri/src/lib.rs index 988bebf73..f849b8b91 100644 --- a/apps/staged/src-tauri/src/lib.rs +++ b/apps/staged/src-tauri/src/lib.rs @@ -2257,6 +2257,7 @@ pub fn run() { pr_poll_scheduler::set_focus, pr_poll_scheduler::set_branch_pending, pr_poll_scheduler::refresh_now, + pr_poll_scheduler::disconnect_client, // Utilities util_commands::open_url, util_commands::is_sq_available, diff --git a/apps/staged/src-tauri/src/pr_poll_scheduler.rs b/apps/staged/src-tauri/src/pr_poll_scheduler.rs index 7d12b7e19..cfafd8171 100644 --- a/apps/staged/src-tauri/src/pr_poll_scheduler.rs +++ b/apps/staged/src-tauri/src/pr_poll_scheduler.rs @@ -10,8 +10,23 @@ //! window is focused, via the [`set_foreground_project`], [`set_branch_pending`], //! and [`set_focus`] commands. The effective tier for a project is the union of //! interest (any foregrounding ⇒ selected; any pending ⇒ fast; nothing focused ⇒ -//! pause). For a single client this is just that client's state, but it is -//! structured as a union so Phase 2 can extend it across connected clients. +//! pause). +//! +//! ## Per-client interest (Phase 2) +//! +//! Interest is tracked **per connected client** — the native Tauri window plus +//! each WebSocket browser session — keyed by a frontend-supplied `client_id`. +//! The cadence for a project is the union across all clients ([`PollState::any_focused`], +//! [`PollState::is_foreground`], [`PollState::project_has_pending`]), so a project +//! that any client cares about is polled at the appropriate tier; the *work* +//! bookkeeping (`last_polled_at`/`failures`/`stale`/`forced`) stays project-keyed +//! and shared, so N clients still trigger only one poll per project per tier. +//! +//! Clients are evicted on disconnect (clean WS close ⇒ [`PrPollScheduler::disconnect_client`]) +//! and via a [`CLIENT_TTL_MS`] fallback for dirty drops ([`PollState::evict_stale_clients`], +//! swept each tick). The native window uses the fixed [`TAURI_CLIENT_ID`], which +//! is pre-seeded at launch and exempt from TTL eviction (process death is its +//! teardown), so single-client behaviour stays byte-for-byte equivalent to Phase 1. //! //! Poll-state (last-polled timestamps, failure counts) is intentionally **not //! persisted** — on restart everything is "due", matching the frontend's @@ -53,10 +68,40 @@ const MAX_CONSECUTIVE_FAILURES: u32 = 3; /// wake the loop immediately, so this only bounds the *periodic* re-poll delay. const TICK_INTERVAL_SECS: u64 = 5; +/// Well-known id for the native Tauri window. It has no WS heartbeat (the +/// process dying is its teardown), so it is pre-seeded at launch and exempt from +/// TTL eviction. Must match `TAURI_CLIENT_ID` in `prPollingService.ts`. +const TAURI_CLIENT_ID: &str = "tauri-main"; + +/// How long a client's interest survives without a heartbeat before the tick +/// loop evicts it — the dirty-drop fallback for WS clients that vanish without a +/// clean close. Set to ≈3× the expected WS keepalive (≤~30s), so it tolerates +/// transient lag while bounding spurious pending-tier polls from a dead-but- +/// counted client to ≲6. The Tauri id is exempt. +const CLIENT_TTL_MS: i64 = 90_000; + // --------------------------------------------------------------------------- // Poll-state — pure decision logic, no clock / store / Tauri handles // --------------------------------------------------------------------------- +/// One connected client's interest. The effective cadence for a project is the +/// *union* of these across all clients (see [`PollState::any_focused`], +/// [`PollState::is_foreground`], [`PollState::project_has_pending`]). +#[derive(Default)] +struct ClientInterest { + /// This client's foregrounded/selected project (→ selected tier). + foreground_project: Option, + /// Whether this client's window is focused. + focused: bool, + /// branch_id → project_id for branches this client sees as having pending CI + /// checks (→ pending tier). Mirrors the granularity the frontend tracks via + /// `updateChecksStatus`. + pending_branches: HashMap, + /// Last heartbeat (ms since epoch). Drives TTL eviction of dirty-dropped + /// clients; the Tauri id is exempt regardless of this value. + last_seen: i64, +} + /// The poll-state and interest the scheduler owns. Pure: every method that needs /// the current time takes `now` (ms since epoch) as a parameter, so the /// due/dedup/backoff logic can be unit-tested deterministically. @@ -69,45 +114,68 @@ struct PollState { /// Projects currently reported as stale (failures ≥ threshold). Tracked so a /// stale event is only emitted on transitions. stale: HashSet, - /// The foregrounded/selected project (→ selected tier). `Option` models a - /// single client; Phase 2 will union a set across connected clients. - foreground_project: Option, - /// Whether any client window is focused. No focus ⇒ polling pauses. - focused: bool, - /// branch_id → project_id for branches with pending CI checks (→ pending - /// tier). Mirrors the granularity the frontend tracked via - /// `updateChecksStatus`. - pending_branches: HashMap, /// Projects explicitly nudged via `refresh_now`; due on the next tick - /// regardless of interval or focus. + /// regardless of interval or focus. Project-keyed and global (a nudge is + /// about the *project*, not the observer), so it is shared across clients + /// and survives the forcing client disconnecting. forced: HashSet, + /// Per-connected-client interest, keyed by `client_id`. The cadence for a + /// project is the union across these (see the union helpers). Only the + /// *interest* is per-client; the work bookkeeping above stays project-keyed + /// so N clients still trigger only one poll per project per tier. + clients: HashMap, } impl PollState { fn new() -> Self { + let mut clients = HashMap::new(); + // Pre-seed the native client as focused so the very first immediate tick + // polls at launch, matching Phase 1's `focused: true` default. The Tauri + // frontend's later hints update this same entry, and `last_seen: 0` is + // fine because the Tauri id is exempt from TTL eviction. + clients.insert( + TAURI_CLIENT_ID.to_string(), + ClientInterest { + focused: true, + last_seen: 0, + ..Default::default() + }, + ); Self { last_polled_at: HashMap::new(), failures: HashMap::new(), stale: HashSet::new(), - foreground_project: None, - // Start focused so the very first tick polls at launch, matching the - // frontend which began with `windowFocused = true`. - focused: true, - pending_branches: HashMap::new(), forced: HashSet::new(), + clients, } } + /// Whether any connected client's window is focused. No focused client ⇒ + /// periodic polling pauses. + fn any_focused(&self) -> bool { + self.clients.values().any(|c| c.focused) + } + + /// Whether any client has this project foregrounded/selected. + fn is_foreground(&self, project_id: &str) -> bool { + self.clients + .values() + .any(|c| c.foreground_project.as_deref() == Some(project_id)) + } + + /// Whether any client sees a pending branch in this project. fn project_has_pending(&self, project_id: &str) -> bool { - self.pending_branches.values().any(|p| p == project_id) + self.clients + .values() + .any(|c| c.pending_branches.values().any(|p| p == project_id)) } - /// The polling interval for a project, as the union of current interest. - /// Mirrors the frontend `getProjectInterval`. + /// The polling interval for a project, as the union of current interest + /// across all clients. Mirrors the frontend `getProjectInterval`. fn interval_for(&self, project_id: &str) -> i64 { if self.project_has_pending(project_id) { PENDING_INTERVAL_MS - } else if self.foreground_project.as_deref() == Some(project_id) { + } else if self.is_foreground(project_id) { SELECTED_INTERVAL_MS } else { BACKGROUND_INTERVAL_MS @@ -129,7 +197,7 @@ impl PollState { due.push(id.clone()); continue; } - if !self.focused { + if !self.any_focused() { continue; // no focused client ⇒ pause periodic polling } let last = self.last_polled_at.get(id).copied().unwrap_or(0); @@ -140,18 +208,25 @@ impl PollState { due } - /// Drop tracking for projects (and pending branches) that no longer exist. + /// Drop tracking for projects (and per-client interest in them) that no + /// longer exist. Project-keyed work bookkeeping is pruned by membership; + /// each client's interest is pruned in place. Client *lifecycle* eviction + /// (disconnect / TTL) is separate — that drops whole clients, this drops + /// dead projects from surviving clients. fn prune(&mut self, known: &HashSet<&str>) { self.last_polled_at .retain(|k, _| known.contains(k.as_str())); self.failures.retain(|k, _| known.contains(k.as_str())); self.stale.retain(|k| known.contains(k.as_str())); self.forced.retain(|k| known.contains(k.as_str())); - self.pending_branches - .retain(|_, p| known.contains(p.as_str())); - if let Some(fg) = &self.foreground_project { - if !known.contains(fg.as_str()) { - self.foreground_project = None; + for client in self.clients.values_mut() { + client + .pending_branches + .retain(|_, p| known.contains(p.as_str())); + if let Some(fg) = &client.foreground_project { + if !known.contains(fg.as_str()) { + client.foreground_project = None; + } } } } @@ -184,25 +259,64 @@ impl PollState { } } - fn set_foreground(&mut self, project_id: Option) { - self.foreground_project = project_id; + fn set_foreground(&mut self, client_id: &str, project_id: Option, now: i64) { + let client = self.clients.entry(client_id.to_string()).or_default(); + client.foreground_project = project_id; + client.last_seen = now; } - fn set_focus(&mut self, focused: bool) { - self.focused = focused; + fn set_focus(&mut self, client_id: &str, focused: bool, now: i64) { + let client = self.clients.entry(client_id.to_string()).or_default(); + client.focused = focused; + client.last_seen = now; } - fn set_branch_pending(&mut self, branch_id: String, project_id: String, pending: bool) { + fn set_branch_pending( + &mut self, + client_id: &str, + branch_id: String, + project_id: String, + pending: bool, + now: i64, + ) { + let client = self.clients.entry(client_id.to_string()).or_default(); if pending { - self.pending_branches.insert(branch_id, project_id); + client.pending_branches.insert(branch_id, project_id); } else { - self.pending_branches.remove(&branch_id); + client.pending_branches.remove(&branch_id); } + client.last_seen = now; } + /// `refresh_now` nudge. Project-keyed and global — independent of which + /// client asked, so it survives that client disconnecting. fn force(&mut self, project_id: String) { self.forced.insert(project_id); } + + // -- Client lifecycle ------------------------------------------------- + + /// Heartbeat: create the client entry if absent and bump its `last_seen` so + /// it survives the next TTL sweep. Called on WS connect and each WS ping. + fn touch(&mut self, client_id: &str, now: i64) { + self.clients + .entry(client_id.to_string()) + .or_default() + .last_seen = now; + } + + /// Clean disconnect: drop the client and all its interest. + fn disconnect_client(&mut self, client_id: &str) { + self.clients.remove(client_id); + } + + /// Dirty-drop fallback: evict clients not heard from within `ttl_ms`. The + /// Tauri id is exempt (the native window has no WS heartbeat; the process + /// dying tears it down). + fn evict_stale_clients(&mut self, now: i64, ttl_ms: i64) { + self.clients + .retain(|id, c| id == TAURI_CLIENT_ID || now.saturating_sub(c.last_seen) <= ttl_ms); + } } // --------------------------------------------------------------------------- @@ -231,28 +345,64 @@ impl PrPollScheduler { } } - fn set_foreground(&self, project_id: Option) { - self.state.lock().unwrap().set_foreground(project_id); + // These wrappers are the only place that reads the real clock for interest + // updates: they stamp `now` and delegate to the pure [`PollState`] methods, + // then wake the loop so the union is recomputed promptly. `pub` so the web + // server's `dispatch` / `handle_ws` can drive them via the managed + // `Arc` for WebSocket clients. + + pub fn set_foreground(&self, client_id: String, project_id: Option) { + let now = crate::store::now_timestamp(); + self.state + .lock() + .unwrap() + .set_foreground(&client_id, project_id, now); self.notify.notify_one(); } - fn set_focus(&self, focused: bool) { - self.state.lock().unwrap().set_focus(focused); + pub fn set_focus(&self, client_id: String, focused: bool) { + let now = crate::store::now_timestamp(); + self.state + .lock() + .unwrap() + .set_focus(&client_id, focused, now); self.notify.notify_one(); } - fn set_branch_pending(&self, branch_id: String, project_id: String, pending: bool) { + pub fn set_branch_pending( + &self, + client_id: String, + branch_id: String, + project_id: String, + pending: bool, + ) { + let now = crate::store::now_timestamp(); self.state .lock() .unwrap() - .set_branch_pending(branch_id, project_id, pending); + .set_branch_pending(&client_id, branch_id, project_id, pending, now); self.notify.notify_one(); } - fn force(&self, project_id: String) { + pub fn force(&self, project_id: String) { self.state.lock().unwrap().force(project_id); self.notify.notify_one(); } + + /// Heartbeat for a client (WS connect / ping). Keeps it alive past the TTL. + pub fn touch(&self, client_id: String) { + let now = crate::store::now_timestamp(); + self.state.lock().unwrap().touch(&client_id, now); + self.notify.notify_one(); + } + + /// Clean disconnect for a client (WS close). Wakes the loop so a vanished + /// focus/foreground/pending recomputes the union promptly (it may now pause + /// or slow polling). + pub fn disconnect_client(&self, client_id: String) { + self.state.lock().unwrap().disconnect_client(&client_id); + self.notify.notify_one(); + } } impl Default for PrPollScheduler { @@ -328,6 +478,9 @@ async fn tick( let known: HashSet<&str> = project_ids.iter().map(|s| s.as_str()).collect(); let mut in_flight = scheduler.in_flight.lock().unwrap(); let mut state = scheduler.state.lock().unwrap(); + // Evict clients that dropped without a clean WS close before deciding + // what is due, so their stale interest stops inflating the union. + state.evict_stale_clients(now, CLIENT_TTL_MS); state.prune(&known); let due = state.due(&project_ids, now, &in_flight); for id in &due { @@ -432,40 +585,62 @@ fn emit_stale(app_handle: &tauri::AppHandle, project_id: &str, stale: bool) { // Interest / hint commands // --------------------------------------------------------------------------- -/// Set the foregrounded/selected project (→ selected tier). `None` clears it. +/// Set a client's foregrounded/selected project (→ selected tier). `None` +/// clears it. The effective tier unions this across all connected clients. #[tauri::command(rename_all = "camelCase")] pub fn set_foreground_project( scheduler: tauri::State<'_, Arc>, + client_id: String, project_id: Option, ) { - scheduler.set_foreground(project_id); + scheduler.set_foreground(client_id, project_id); } -/// Report window focus. With no focused client, periodic polling pauses (an -/// explicit `refresh_now` still fetches). -#[tauri::command] -pub fn set_focus(scheduler: tauri::State<'_, Arc>, focused: bool) { - scheduler.set_focus(focused); +/// Report a client's window focus. With no client focused, periodic polling +/// pauses (an explicit `refresh_now` still fetches). +#[tauri::command(rename_all = "camelCase")] +pub fn set_focus( + scheduler: tauri::State<'_, Arc>, + client_id: String, + focused: bool, +) { + scheduler.set_focus(client_id, focused); } -/// Mark whether a branch has pending CI checks (→ pending tier for its project). +/// Mark whether a branch has pending CI checks for a client (→ pending tier for +/// its project, unioned across clients). #[tauri::command(rename_all = "camelCase")] pub fn set_branch_pending( scheduler: tauri::State<'_, Arc>, + client_id: String, branch_id: String, project_id: String, pending: bool, ) { - scheduler.set_branch_pending(branch_id, project_id, pending); + scheduler.set_branch_pending(client_id, branch_id, project_id, pending); } /// Explicitly nudge the scheduler to refresh a project now (e.g. just created or /// pushed a PR). Folded into the scheduler's dedup rather than fetching directly. +/// The `client_id` is carried only to keep that client's heartbeat fresh; the +/// force itself is project-keyed and global. #[tauri::command(rename_all = "camelCase")] -pub fn refresh_now(scheduler: tauri::State<'_, Arc>, project_id: String) { +pub fn refresh_now( + scheduler: tauri::State<'_, Arc>, + client_id: String, + project_id: String, +) { + scheduler.touch(client_id); scheduler.force(project_id); } +/// Drop a client's interest on clean disconnect. For the native app this fires +/// from `prPollingService.dispose()`; for web it fires on WS close. +#[tauri::command(rename_all = "camelCase")] +pub fn disconnect_client(scheduler: tauri::State<'_, Arc>, client_id: String) { + scheduler.disconnect_client(client_id); +} + // --------------------------------------------------------------------------- // Tests — pure due/dedup/backoff logic, no clock or `gh` // --------------------------------------------------------------------------- @@ -482,11 +657,19 @@ mod tests { v.iter().map(|s| s.to_string()).collect() } + /// A `PollState` with the launch-seeded Tauri client removed, for + /// multi-client tests that want a clean slate of explicitly-added clients. + fn empty_state() -> PollState { + let mut st = PollState::new(); + st.disconnect_client(TAURI_CLIENT_ID); + st + } + #[test] fn due_respects_the_three_tiers() { let mut st = PollState::new(); - st.set_foreground(Some("sel".into())); - st.set_branch_pending("b1".into(), "pend".into(), true); + st.set_foreground(TAURI_CLIENT_ID, Some("sel".into()), 0); + st.set_branch_pending(TAURI_CLIENT_ID, "b1".into(), "pend".into(), true, 0); for id in ["sel", "pend", "bg"] { st.last_polled_at.insert(id.into(), 0); } @@ -516,7 +699,7 @@ mod tests { fn unfocused_pauses_periodic_polling_but_not_forced() { let mut st = PollState::new(); st.last_polled_at.insert("p".into(), 0); - st.set_focus(false); + st.set_focus(TAURI_CLIENT_ID, false, 0); let none = HashSet::new(); // Long past every interval, but no focused client ⇒ nothing due. @@ -600,14 +783,14 @@ mod tests { } #[test] - fn prune_drops_unknown_projects_and_their_interest() { + fn prune_clears_per_client_foreground_and_pending() { let mut st = PollState::new(); st.last_polled_at.insert("gone".into(), 0); st.failures.insert("gone".into(), 2); st.stale.insert("gone".into()); st.force("gone".into()); - st.set_foreground(Some("gone".into())); - st.set_branch_pending("b".into(), "gone".into(), true); + st.set_foreground(TAURI_CLIENT_ID, Some("gone".into()), 0); + st.set_branch_pending(TAURI_CLIENT_ID, "b".into(), "gone".into(), true, 0); let known: HashSet<&str> = ["alive"].into_iter().collect(); st.prune(&known); @@ -616,7 +799,185 @@ mod tests { assert!(st.failures.is_empty()); assert!(st.stale.is_empty()); assert!(st.forced.is_empty()); - assert!(st.pending_branches.is_empty()); - assert!(st.foreground_project.is_none()); + // The client itself survives prune (lifecycle is separate); only its + // interest in the now-gone project is cleared. + let client = &st.clients[TAURI_CLIENT_ID]; + assert!(client.pending_branches.is_empty()); + assert!(client.foreground_project.is_none()); + } + + // -- Phase 2: per-client interest / union / lifecycle ------------------ + + #[test] + fn union_foreground_across_two_clients() { + let mut st = empty_state(); + st.set_focus("a", true, 0); + st.set_focus("b", true, 0); + st.set_foreground("a", Some("p1".into()), 0); + st.set_foreground("b", Some("p2".into()), 0); + for id in ["p1", "p2", "bg"] { + st.last_polled_at.insert(id.into(), 0); + } + // Each client's foreground reaches the selected tier; bg stays slow. + assert_eq!(st.interval_for("p1"), SELECTED_INTERVAL_MS); + assert_eq!(st.interval_for("p2"), SELECTED_INTERVAL_MS); + assert_eq!(st.interval_for("bg"), BACKGROUND_INTERVAL_MS); + // Both foregrounded projects are due at the selected interval; bg isn't. + assert_eq!( + st.due( + &ids(&["p1", "p2", "bg"]), + SELECTED_INTERVAL_MS, + &HashSet::new() + ), + ids(&["p1", "p2"]) + ); + } + + #[test] + fn union_pending_beats_foreground() { + let mut st = empty_state(); + // Client A sees a pending branch in p1; client B merely foregrounds p1. + st.set_branch_pending("a", "b1".into(), "p1".into(), true, 0); + st.set_foreground("b", Some("p1".into()), 0); + // Pending wins the union precedence. + assert_eq!(st.interval_for("p1"), PENDING_INTERVAL_MS); + } + + #[test] + fn union_focus() { + let mut st = empty_state(); + st.set_focus("a", false, 0); + st.set_focus("b", true, 0); + st.last_polled_at.insert("p".into(), 0); + let none = HashSet::new(); + + // Any client focused ⇒ active. + assert!(st.any_focused()); + assert_eq!( + st.due(&ids(&["p"]), BACKGROUND_INTERVAL_MS, &none), + ids(&["p"]) + ); + + // All clients unfocused ⇒ paused. + st.set_focus("b", false, 0); + assert!(!st.any_focused()); + assert!(st + .due(&ids(&["p"]), BACKGROUND_INTERVAL_MS * 10, &none) + .is_empty()); + } + + #[test] + fn disconnect_recomputes_union() { + let mut st = empty_state(); + st.set_foreground("a", Some("p1".into()), 0); + st.set_foreground("b", Some("p1".into()), 0); + assert_eq!(st.interval_for("p1"), SELECTED_INTERVAL_MS); + + // One client leaves: still selected (B still holds it). + st.disconnect_client("a"); + assert!(st.is_foreground("p1")); + assert_eq!(st.interval_for("p1"), SELECTED_INTERVAL_MS); + + // Last client holding it leaves: falls back to the background tier. + st.disconnect_client("b"); + assert!(!st.is_foreground("p1")); + assert_eq!(st.interval_for("p1"), BACKGROUND_INTERVAL_MS); + } + + #[test] + fn disconnect_drops_pending() { + let mut st = empty_state(); + st.set_branch_pending("a", "b1".into(), "p".into(), true, 0); + assert!(st.project_has_pending("p")); + assert_eq!(st.interval_for("p"), PENDING_INTERVAL_MS); + + st.disconnect_client("a"); + assert!(!st.project_has_pending("p")); + assert_eq!(st.interval_for("p"), BACKGROUND_INTERVAL_MS); + } + + #[test] + fn ttl_evicts_stale_clients_but_exempts_tauri() { + // Keep the launch-seeded Tauri client (last_seen = 0). + let mut st = PollState::new(); + st.set_focus("web", true, 0); + st.set_foreground("web", Some("p".into()), 0); + assert!(st.is_foreground("p")); + + // Sweep well past the TTL relative to last_seen = 0. + st.evict_stale_clients(CLIENT_TTL_MS + 1, CLIENT_TTL_MS); + + // The stale web client is gone; its interest no longer counts. + assert!(!st.clients.contains_key("web")); + assert!(!st.is_foreground("p")); + // The Tauri client is exempt despite last_seen = 0, and stays focused. + assert!(st.clients.contains_key(TAURI_CLIENT_ID)); + assert!(st.any_focused()); + } + + #[test] + fn touch_keeps_client_alive() { + let mut st = empty_state(); + st.set_focus("web", true, 0); + + // A heartbeat at the TTL boundary keeps the client alive through a sweep + // at the same instant. + let now = CLIENT_TTL_MS + 10; + st.touch("web", now); + st.evict_stale_clients(now, CLIENT_TTL_MS); + assert!(st.clients.contains_key("web")); + assert!(st.any_focused()); + + // Without a further touch, it is evicted once the TTL elapses past the + // last heartbeat. + st.evict_stale_clients(now + CLIENT_TTL_MS + 1, CLIENT_TTL_MS); + assert!(!st.clients.contains_key("web")); + } + + #[test] + fn forced_is_global_and_survives_disconnect() { + let mut st = empty_state(); + st.last_polled_at.insert("p".into(), 0); + + // Client A nudges a refresh, then disconnects. + st.force("p".into()); + st.disconnect_client("a"); + + // `forced` is project-keyed and global, so it outlives the forcing + // client and still bypasses the focus pause. + assert!(!st.any_focused()); + assert_eq!(st.due(&ids(&["p"]), 1_000, &HashSet::new()), ids(&["p"])); + } + + #[test] + fn single_client_equivalence_to_phase1() { + // One seeded Tauri client reproduces Phase 1's tier and pause behaviour. + let mut st = PollState::new(); + st.set_foreground(TAURI_CLIENT_ID, Some("sel".into()), 0); + st.set_branch_pending(TAURI_CLIENT_ID, "b1".into(), "pend".into(), true, 0); + for id in ["sel", "pend", "bg"] { + st.last_polled_at.insert(id.into(), 0); + } + let projects = ids(&["sel", "pend", "bg"]); + let none = HashSet::new(); + + assert_eq!( + st.due(&projects, PENDING_INTERVAL_MS, &none), + ids(&["pend"]) + ); + assert_eq!( + st.due(&projects, SELECTED_INTERVAL_MS, &none), + ids(&["sel", "pend"]) + ); + assert_eq!( + st.due(&projects, BACKGROUND_INTERVAL_MS, &none), + ids(&["sel", "pend", "bg"]) + ); + + // Unfocusing the lone client pauses periodic polling. + st.set_focus(TAURI_CLIENT_ID, false, 0); + assert!(st + .due(&projects, BACKGROUND_INTERVAL_MS * 10, &none) + .is_empty()); } } diff --git a/apps/staged/src/lib/commands.ts b/apps/staged/src/lib/commands.ts index 681012c42..99b1b8a85 100644 --- a/apps/staged/src/lib/commands.ts +++ b/apps/staged/src/lib/commands.ts @@ -1077,30 +1077,40 @@ export function refreshAllPrStatuses(projectId: string): Promise { // The backend owns PR-polling cadence and concurrency. These commands feed it // interest hints; the cadence, dedup, and failure backoff all live in the // backend scheduler (see src-tauri/src/pr_poll_scheduler.rs). +// +// Interest is tracked per connected client, so every hint carries a `clientId` +// (see prPollingService.ts). The same id must also be sent on the WS connect +// query so the backend can correlate interest with disconnect. // --------------------------------------------------------------------------- -/** Tell the backend which project is foregrounded/selected (→ selected tier). */ -export function setForegroundProject(projectId: string | null): Promise { - return invokeCommand('set_foreground_project', { projectId }); +/** Tell the backend which project this client has foregrounded/selected (→ selected tier). */ +export function setForegroundProject(clientId: string, projectId: string | null): Promise { + return invokeCommand('set_foreground_project', { clientId, projectId }); } -/** Report window focus to the backend. No focused client ⇒ polling pauses. */ -export function setPrPollFocus(focused: boolean): Promise { - return invokeCommand('set_focus', { focused }); +/** Report this client's window focus to the backend. No focused client ⇒ polling pauses. */ +export function setPrPollFocus(clientId: string, focused: boolean): Promise { + return invokeCommand('set_focus', { clientId, focused }); } -/** Mark whether a branch has pending CI checks (→ pending tier for its project). */ +/** Mark whether a branch has pending CI checks for this client (→ pending tier for its project). */ export function setBranchPending( + clientId: string, branchId: string, projectId: string, pending: boolean ): Promise { - return invokeCommand('set_branch_pending', { branchId, projectId, pending }); + return invokeCommand('set_branch_pending', { clientId, branchId, projectId, pending }); } /** Nudge the backend to refresh a project's PR statuses now (folded into dedup). */ -export function refreshPrStatusesNow(projectId: string): Promise { - return invokeCommand('refresh_now', { projectId }); +export function refreshPrStatusesNow(clientId: string, projectId: string): Promise { + return invokeCommand('refresh_now', { clientId, projectId }); +} + +/** Tell the backend this client has disconnected so its interest is dropped. */ +export function disconnectPrPollClient(clientId: string): Promise { + return invokeCommand('disconnect_client', { clientId }); } // ============================================================================= diff --git a/apps/staged/src/lib/services/prPollingService.ts b/apps/staged/src/lib/services/prPollingService.ts index 7b7e1462d..6e2200d02 100644 --- a/apps/staged/src/lib/services/prPollingService.ts +++ b/apps/staged/src/lib/services/prPollingService.ts @@ -23,8 +23,40 @@ import { setPrPollFocus, setBranchPending, refreshPrStatusesNow, + disconnectPrPollClient, } from '../commands'; +// --------------------------------------------------------------------------- +// Client identity +// --------------------------------------------------------------------------- +// +// The backend tracks PR-poll interest per connected client and unions the +// cadence across them (see src-tauri/src/pr_poll_scheduler.rs). This module +// owns a stable id for the lifetime of the page that is threaded through every +// interest hint. +// +// - Native (Tauri): the fixed well-known id `tauri-main` (must match +// `TAURI_CLIENT_ID` in pr_poll_scheduler.rs), so single-client behaviour is +// identical to before per-client interest existed. +// - Web (browser): a fresh UUID per page load (per tab). The same value must +// be appended to the WS connect URL (`?clientId=`) by the web transport +// so the backend correlates this client's interest (invoke channel) with +// its disconnect (WS close). See `getPrPollClientId`. + +const TAURI_CLIENT_ID = 'tauri-main'; + +const clientId: string = isTauri ? TAURI_CLIENT_ID : crypto.randomUUID(); + +/** + * This client's PR-poll id, stable for the page's lifetime. Exposed so the web + * transport can append the *same* id to the events WebSocket connect query + * (`?clientId=`) — the backend↔frontend contract requires the same id on + * both the invoke and WS channels. + */ +export function getPrPollClientId(): string { + return clientId; +} + // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- @@ -100,11 +132,11 @@ function setProjectRefreshing(projectId: string, isRefreshing: boolean) { // --------------------------------------------------------------------------- function handleFocus() { - void setPrPollFocus(true).catch(() => {}); + void setPrPollFocus(clientId, true).catch(() => {}); } function handleBlur() { - void setPrPollFocus(false).catch(() => {}); + void setPrPollFocus(clientId, false).catch(() => {}); } // --------------------------------------------------------------------------- @@ -124,7 +156,7 @@ export function init(): void { window.addEventListener('blur', handleBlur); // Seed the backend with the current focus state (it defaults to focused, so // the initial poll already ran; this corrects it if we launched unfocused). - void setPrPollFocus(document.hasFocus()).catch(() => {}); + void setPrPollFocus(clientId, document.hasFocus()).catch(() => {}); unlistenRefreshState = listenToEvent('pr-refresh-state', (payload) => { setProjectRefreshing(payload.projectId, payload.refreshing); @@ -146,6 +178,9 @@ export function dispose(): void { unlistenRefreshState = null; unlistenStale = null; + // Drop this client's interest so the backend recomputes the union without it. + void disconnectPrPollClient(clientId).catch(() => {}); + for (const projectId of [...refreshingProjects]) { setProjectRefreshing(projectId, false); } @@ -158,7 +193,7 @@ export function dispose(): void { /** Set the currently selected project (polls more frequently). */ export function setSelectedProject(projectId: string | null): void { if (!isTauri) return; - void setForegroundProject(projectId).catch((e) => + void setForegroundProject(clientId, projectId).catch((e) => console.error('[PrPollingService] set_foreground_project failed:', e) ); } @@ -170,7 +205,7 @@ export function updateChecksStatus( hasPendingChecks: boolean ): void { if (!isTauri) return; - void setBranchPending(branchId, projectId, hasPendingChecks).catch((e) => + void setBranchPending(clientId, branchId, projectId, hasPendingChecks).catch((e) => console.error('[PrPollingService] set_branch_pending failed:', e) ); } @@ -178,7 +213,7 @@ export function updateChecksStatus( /** Trigger an immediate refresh for a specific project (e.g. after PR creation or push). */ export function refreshNow(projectId: string): void { if (!isTauri) return; - void refreshPrStatusesNow(projectId).catch((e) => + void refreshPrStatusesNow(clientId, projectId).catch((e) => console.error(`[PrPollingService] refresh_now failed for project=${projectId}:`, e) ); } From 65ec3adc05257d8d5aa102790cae600f2521a959 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 11 Jun 2026 13:49:36 +1000 Subject: [PATCH 04/11] chore(staged): add project-switch latency instrumentation (switchTracer + slow-path logs) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port the project-switch diagnostics from the upstream working tree so the PR-poll-scheduler rework on this branch can be measured against the original freeze. Console-only; no behavior change. - switchTracer.ts (new): traces a project→project switch — synchronous milestones (selectProject, ProjectSection mount/destroy, the App selectedProjectId effect), component mount/unmount counts, and requestAnimationFrame gaps as main-thread stalls — so genuine render cost is distinguishable from event-loop starvation. Settles on quiet (300ms) or a 3s cap; frame gaps >700ms are treated as background throttling and not counted as stalls. Lifecycle lines use `[switch #N ...]`; one-off slow-path warnings from other modules use the bare `[switch]` prefix. - navigation.svelte.ts: beginSwitch() at the top of selectProject and a "sync complete" mark at the end, bracketing the synchronous switch work. - App.svelte: mark the selectedProjectId effect that hints prPolling. - ProjectSection.svelte / BranchCard.svelte: count mounts/unmounts and mark ProjectSection mount/destroy; BranchCard also records its synchronous timeline-cache hydration cost (recordSync). - DiffModal.svelte: time selected-file → painted via a double rAF (`[diff] render painted in Nms`). - persistentStore.ts: warn when a store.set disk write blocks >=50ms. Skipped from the upstream tree: the prPollingService.ts slow-`refreshAllPrStatuses` log. That diagnostic instrumented the frontend poll loop, which this branch already moved into the backend scheduler (phases 0–2) — the instrumented code no longer exists here, so there is no landing site for it. Verification: `pnpm check` — 0 errors / 0 warnings. Signed-off-by: Matt Toohey --- apps/staged/src/App.svelte | 2 + .../lib/features/branches/BranchCard.svelte | 8 +- .../src/lib/features/diff/DiffModal.svelte | 16 ++ .../lib/features/layout/navigation.svelte.ts | 3 + .../features/projects/ProjectSection.svelte | 5 + apps/staged/src/lib/shared/persistentStore.ts | 8 + apps/staged/src/lib/shared/switchTracer.ts | 200 ++++++++++++++++++ 7 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 apps/staged/src/lib/shared/switchTracer.ts diff --git a/apps/staged/src/App.svelte b/apps/staged/src/App.svelte index 7b756e918..81f922737 100644 --- a/apps/staged/src/App.svelte +++ b/apps/staged/src/App.svelte @@ -45,6 +45,7 @@ import { listenForSessionStatus } from './lib/listeners/sessionStatusListener'; import { darkMode } from './lib/stores/isDark.svelte'; import * as prPollingService from './lib/services/prPollingService'; + import * as switchTracer from './lib/shared/switchTracer'; import { reposUiEnabled } from './lib/featureFlags'; import type { StoreIncompatibility } from './lib/types'; @@ -73,6 +74,7 @@ // prPollingService.init(), called from onMount). // ========================================================================= $effect(() => { + switchTracer.mark('App: selectedProjectId effect → prPolling.setSelectedProject'); prPollingService.setSelectedProject(navigation.selectedProjectId); }); diff --git a/apps/staged/src/lib/features/branches/BranchCard.svelte b/apps/staged/src/lib/features/branches/BranchCard.svelte index 98fcb5651..9e15a0b9c 100644 --- a/apps/staged/src/lib/features/branches/BranchCard.svelte +++ b/apps/staged/src/lib/features/branches/BranchCard.svelte @@ -11,7 +11,8 @@ - Each item shows session + delete actions on hover -->