diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 796504a13e..b9d0912ad9 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -477,6 +477,7 @@ jobs: "card-source-endpoints-test.ts", "definition-lookup-test.ts", "file-watcher-events-test.ts", + "indexing-event-sink-test.ts", "indexing-test.ts", "runtime-dependency-tracker-test.ts", "transpile-test.ts", diff --git a/README.md b/README.md index 6bb9132222..2efab92c64 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,12 @@ Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` You can also use `start:development` if you want the functionality of `start:all`, but without running the test realms. `start:development` will enable you to open http://localhost:4201 and allow to select between the cards in the /base and /experiments realm. In order to use `start:development` you must also make sure to run `start:worker-development` in order to start the workers which are normally started in `start:all`. +#### Indexing dashboard + +In development, you can visit an indexing dashboard at [`http://localhost:4210/_indexing-dashboard`](http://localhost:4210/_indexing-dashboard) to view the status of all active indexing jobs across all workers. + +In environment mode, this is at `http://worker.environment-name.localhost/_indexing-dashboard`. + ### Card Pre-rendering Boxel supports server-side rendering of cards via a lightweight prerender service and an optional manager that coordinates multiple services. diff --git a/packages/realm-server/handlers/handle-indexing-dashboard.ts b/packages/realm-server/handlers/handle-indexing-dashboard.ts new file mode 100644 index 0000000000..f9756b8995 --- /dev/null +++ b/packages/realm-server/handlers/handle-indexing-dashboard.ts @@ -0,0 +1,440 @@ +import type { RealmIndexingState } from '../indexing-event-sink'; + +export interface PendingJob { + jobId: number; + jobType: string; + realmURL: string; + createdAt: string; + priority: number; +} + +function escapeHtml(str: string): string { + return str + .replace(/&/g, '&') + .replace(//g, '>') + .replace(/"/g, '"'); +} + +function timeSince(ms: number): string { + let seconds = Math.floor((Date.now() - ms) / 1000); + if (seconds < 60) { + return `${seconds}s ago`; + } + let minutes = Math.floor(seconds / 60); + if (minutes < 60) { + return `${minutes}m ${seconds % 60}s ago`; + } + let hours = Math.floor(minutes / 60); + return `${hours}h ${minutes % 60}m ago`; +} + +function durationMs(startMs: number, endMs?: number): string { + let ms = (endMs ?? Date.now()) - startMs; + if (ms < 1000) { + return `${ms}ms`; + } + let seconds = Math.floor(ms / 1000); + if (seconds < 60) { + return `${seconds}s`; + } + let minutes = Math.floor(seconds / 60); + return `${minutes}m ${seconds % 60}s`; +} + +function renderActiveCard(state: RealmIndexingState): string { + let remaining = state.totalFiles - state.filesCompleted; + let pct = + state.totalFiles > 0 + ? Math.round((state.filesCompleted / state.totalFiles) * 100) + : 0; + + const completedSet = new Set(state.completedFiles); + let remainingFiles = state.files.filter((f) => !completedSet.has(f)); + let remainingList = remainingFiles + .map((f) => `
  • ${escapeHtml(f)}
  • `) + .join(''); + let completedList = state.completedFiles + .map((f) => `
  • ${escapeHtml(f)}
  • `) + .join(''); + + return ` +
    +
    + +

    ${escapeHtml(state.realmURL)}

    +
    +
    + ${escapeHtml(state.jobType)} index + job #${state.jobId} · started ${timeSince(state.startedAt)} · ${durationMs(state.startedAt)} elapsed +
    +
    +
    + ${state.filesCompleted} / ${state.totalFiles} files (${pct}%) +
    +
    ${remaining} file${remaining !== 1 ? 's' : ''} remaining
    + ${ + remainingFiles.length > 0 + ? `
    + ${remaining} file${remaining !== 1 ? 's' : ''} left to index + +
    ` + : '' + } + ${ + state.completedFiles.length > 0 + ? `
    + ${state.filesCompleted} file${state.filesCompleted !== 1 ? 's' : ''} completed + +
    ` + : '' + } +
    `; +} + +function renderHistoryRow(state: RealmIndexingState): string { + let statsHtml = state.stats + ? Object.entries(state.stats) + .map(([k, v]) => `${k}: ${v}`) + .join(', ') + : ''; + return ` + + ${state.jobId} + ${escapeHtml(state.jobType)} + ${escapeHtml(state.realmURL)} + ${state.totalFiles} + ${durationMs(state.startedAt, state.lastUpdatedAt)} + ${timeSince(state.lastUpdatedAt)} + ${escapeHtml(statsHtml)} + `; +} + +function renderPendingRow(job: PendingJob): string { + let createdAt = new Date(job.createdAt); + return ` + + ${job.jobId} + ${escapeHtml(job.jobType)} + ${escapeHtml(job.realmURL)} + ${job.priority} + ${timeSince(createdAt.getTime())} + `; +} + +export interface DashboardSnapshot { + active: RealmIndexingState[]; + pending: PendingJob[]; + history: RealmIndexingState[]; +} + +export function renderIndexingDashboard(snapshot: DashboardSnapshot): string { + let { active, pending, history } = snapshot; + + let activeCards = active.map(renderActiveCard).join(''); + let pendingRows = pending.map(renderPendingRow).join(''); + let historyRows = history.map(renderHistoryRow).join(''); + + return ` + + + + Indexing Dashboard + + + + +
    +

    Indexing Dashboard

    +
    + + +
    +
    + +
    +
    +
    ${active.length}
    +
    Active Jobs
    +
    +
    +
    ${pending.length}
    +
    Pending Jobs
    +
    +
    +
    ${active.reduce((s, a) => s + (a.totalFiles - a.filesCompleted), 0)}
    +
    Files Remaining
    +
    +
    +
    ${history.length}
    +
    Completed
    +
    +
    + +

    Active Indexing

    + ${activeCards.length > 0 ? `
    ${activeCards}
    ` : '
    No active indexing jobs
    '} + +

    Pending Jobs

    + ${ + pending.length > 0 + ? `
    + + + + + + + + + + + ${pendingRows} +
    JobTypeRealmPriorityQueued
    +
    ` + : '
    No pending jobs
    ' + } + +

    Recent Completed

    + ${ + history.length > 0 + ? `
    + + + + + + + + + + + + + ${historyRows} +
    JobTypeRealmFilesDurationFinishedStats
    +
    ` + : '
    No completed jobs yet (history is populated from events received since the worker manager started)
    ' + } + + + +`; +} diff --git a/packages/realm-server/indexing-event-sink.ts b/packages/realm-server/indexing-event-sink.ts new file mode 100644 index 0000000000..c19d6f8df3 --- /dev/null +++ b/packages/realm-server/indexing-event-sink.ts @@ -0,0 +1,117 @@ +import type { IndexingProgressEvent, Stats } from '@cardstack/runtime-common'; + +export interface RealmIndexingState { + realmURL: string; + jobId: number; + jobType: string; + status: 'indexing' | 'finished'; + totalFiles: number; + filesCompleted: number; + /** All files that need to be indexed */ + files: string[]; + /** Files that have been indexed so far */ + completedFiles: string[]; + stats?: Stats; + startedAt: number; + lastUpdatedAt: number; +} + +export class IndexingEventSink { + /** Active indexing state keyed by jobId */ + #active = new Map(); + + /** Recently completed indexing runs (most recent first) */ + #history: RealmIndexingState[] = []; + + /** Max history entries to keep */ + #maxHistory = 50; + + /** Tracks unique completed files per job to avoid duplicates */ + #completedFilesSets = new Map>(); + + /** Max completed files to keep per job for UI/display purposes */ + #maxCompletedFilesPerJob = 1000; + + handleEvent(event: IndexingProgressEvent): void { + switch (event.type) { + case 'indexing-started': { + this.#active.set(event.jobId, { + realmURL: event.realmURL, + jobId: event.jobId, + jobType: event.jobType ?? 'unknown', + status: 'indexing', + totalFiles: event.totalFiles ?? 0, + filesCompleted: 0, + files: event.files ?? [], + completedFiles: [], + startedAt: Date.now(), + lastUpdatedAt: Date.now(), + }); + this.#completedFilesSets.set(event.jobId, new Set()); + break; + } + case 'file-visited': { + let state = this.#active.get(event.jobId); + if (state) { + state.filesCompleted = + event.filesCompleted ?? state.filesCompleted + 1; + state.totalFiles = event.totalFiles ?? state.totalFiles; + if (event.url) { + let completedSet = this.#completedFilesSets.get(event.jobId); + if (!completedSet) { + completedSet = new Set(state.completedFiles); + this.#completedFilesSets.set(event.jobId, completedSet); + } + if (!completedSet.has(event.url)) { + completedSet.add(event.url); + state.completedFiles.push(event.url); + if (state.completedFiles.length > this.#maxCompletedFilesPerJob) { + const excess = + state.completedFiles.length - this.#maxCompletedFilesPerJob; + const removed = state.completedFiles.splice(0, excess); + for (let url of removed) { + completedSet.delete(url); + } + } + } + } + state.lastUpdatedAt = Date.now(); + } + break; + } + case 'indexing-finished': { + let state = this.#active.get(event.jobId); + if (state) { + state.status = 'finished'; + state.stats = event.stats; + state.lastUpdatedAt = Date.now(); + this.#history.unshift({ ...state }); + if (this.#history.length > this.#maxHistory) { + this.#history.length = this.#maxHistory; + } + } + this.#active.delete(event.jobId); + this.#completedFilesSets.delete(event.jobId); + break; + } + } + } + + getActiveIndexing(): RealmIndexingState[] { + return [...this.#active.values()]; + } + + getHistory(): RealmIndexingState[] { + return [...this.#history]; + } + + getSnapshot(): { + active: RealmIndexingState[]; + history: RealmIndexingState[]; + } { + return { + active: this.getActiveIndexing(), + history: this.getHistory(), + }; + } +} diff --git a/packages/realm-server/tests/index.ts b/packages/realm-server/tests/index.ts index 868183f2f9..72e9c16ace 100644 --- a/packages/realm-server/tests/index.ts +++ b/packages/realm-server/tests/index.ts @@ -181,3 +181,4 @@ import './runtime-dependency-tracker-test'; import './sanitize-head-html-test'; import './node-realm-test'; import './session-room-queries-test'; +import './indexing-event-sink-test'; diff --git a/packages/realm-server/tests/indexing-event-sink-test.ts b/packages/realm-server/tests/indexing-event-sink-test.ts new file mode 100644 index 0000000000..8602710676 --- /dev/null +++ b/packages/realm-server/tests/indexing-event-sink-test.ts @@ -0,0 +1,146 @@ +import { module, test } from 'qunit'; +import { basename } from 'path'; +import { IndexingEventSink } from '../indexing-event-sink'; + +module(basename(__filename), function () { + test('tracks active indexing from start through file visits to finish', function (assert) { + let sink = new IndexingEventSink(); + + assert.deepEqual(sink.getSnapshot(), { active: [], history: [] }); + + sink.handleEvent({ + type: 'indexing-started', + realmURL: 'http://example.com/realm/', + jobId: 1, + jobType: 'from-scratch', + totalFiles: 3, + files: [ + 'http://example.com/realm/a.gts', + 'http://example.com/realm/b.json', + 'http://example.com/realm/c.gts', + ], + }); + + let { active, history } = sink.getSnapshot(); + assert.strictEqual(active.length, 1); + assert.strictEqual(history.length, 0); + assert.strictEqual(active[0].realmURL, 'http://example.com/realm/'); + assert.strictEqual(active[0].totalFiles, 3); + assert.strictEqual(active[0].filesCompleted, 0); + assert.strictEqual(active[0].status, 'indexing'); + + sink.handleEvent({ + type: 'file-visited', + realmURL: 'http://example.com/realm/', + jobId: 1, + url: 'http://example.com/realm/a.gts', + filesCompleted: 1, + totalFiles: 3, + }); + + ({ active } = sink.getSnapshot()); + assert.strictEqual(active[0].filesCompleted, 1); + assert.deepEqual(active[0].completedFiles, [ + 'http://example.com/realm/a.gts', + ]); + + sink.handleEvent({ + type: 'file-visited', + realmURL: 'http://example.com/realm/', + jobId: 1, + url: 'http://example.com/realm/b.json', + filesCompleted: 2, + totalFiles: 3, + }); + + sink.handleEvent({ + type: 'file-visited', + realmURL: 'http://example.com/realm/', + jobId: 1, + url: 'http://example.com/realm/c.gts', + filesCompleted: 3, + totalFiles: 3, + }); + + ({ active } = sink.getSnapshot()); + assert.strictEqual(active[0].filesCompleted, 3); + + sink.handleEvent({ + type: 'indexing-finished', + realmURL: 'http://example.com/realm/', + jobId: 1, + stats: { + instancesIndexed: 1, + filesIndexed: 2, + instanceErrors: 0, + fileErrors: 0, + totalIndexEntries: 3, + }, + }); + + ({ active, history } = sink.getSnapshot()); + assert.strictEqual(active.length, 0, 'no longer active after finish'); + assert.strictEqual(history.length, 1); + assert.strictEqual(history[0].realmURL, 'http://example.com/realm/'); + assert.strictEqual(history[0].status, 'finished'); + assert.deepEqual(history[0].stats, { + instancesIndexed: 1, + filesIndexed: 2, + instanceErrors: 0, + fileErrors: 0, + totalIndexEntries: 3, + }); + }); + + test('tracks multiple realms concurrently', function (assert) { + let sink = new IndexingEventSink(); + + sink.handleEvent({ + type: 'indexing-started', + realmURL: 'http://example.com/realm-a/', + jobId: 1, + jobType: 'from-scratch', + totalFiles: 10, + files: [], + }); + + sink.handleEvent({ + type: 'indexing-started', + realmURL: 'http://example.com/realm-b/', + jobId: 2, + jobType: 'incremental', + totalFiles: 2, + files: [], + }); + + assert.strictEqual(sink.getActiveIndexing().length, 2); + + sink.handleEvent({ + type: 'indexing-finished', + realmURL: 'http://example.com/realm-b/', + jobId: 2, + }); + + assert.strictEqual(sink.getActiveIndexing().length, 1); + assert.strictEqual( + sink.getActiveIndexing()[0].realmURL, + 'http://example.com/realm-a/', + ); + assert.strictEqual(sink.getHistory().length, 1); + }); + + test('ignores file-visited for unknown realm', function (assert) { + let sink = new IndexingEventSink(); + + sink.handleEvent({ + type: 'file-visited', + realmURL: 'http://example.com/unknown/', + jobId: 99, + url: 'http://example.com/unknown/x.json', + filesCompleted: 1, + totalFiles: 1, + }); + + assert.strictEqual(sink.getActiveIndexing().length, 0); + }); +}); diff --git a/packages/realm-server/worker-manager.ts b/packages/realm-server/worker-manager.ts index 358e10791d..f3c9dbf5e0 100644 --- a/packages/realm-server/worker-manager.ts +++ b/packages/realm-server/worker-manager.ts @@ -11,6 +11,7 @@ import { isUrlLike, type Expression, type StatusArgs, + type IndexingProgressEvent, } from '@cardstack/runtime-common'; import yargs from 'yargs'; import * as Sentry from '@sentry/node'; @@ -28,6 +29,11 @@ import { registerService, deregisterService, } from './lib/dev-service-registry'; +import { IndexingEventSink } from './indexing-event-sink'; +import { + renderIndexingDashboard, + type PendingJob, +} from './handlers/handle-indexing-dashboard'; /* About the Worker Manager * @@ -116,6 +122,13 @@ let { let isReady = false; let isExiting = false; let workers: ChildProcess[] = []; +function isIndexingDashboardEnabled(): boolean { + return !ECS_CONTAINER_METADATA_URI; +} + +let eventSink: IndexingEventSink | undefined = isIndexingDashboardEnabled() + ? new IndexingEventSink() + : undefined; process.on('SIGINT', () => (isExiting = true)); process.on('SIGTERM', () => (isExiting = true)); @@ -142,6 +155,53 @@ if (port != null) { ctxt.body = JSON.stringify(result); ctxt.status = isReady ? 200 : 503; }); + if (eventSink) { + let getPendingJobs = async (): Promise => { + let rows = (await query([ + `SELECT j.id, j.job_type, j.args, j.priority, j.created_at`, + `FROM jobs j`, + `WHERE j.status = 'unfulfilled'`, + `AND j.job_type IN ('from-scratch-index', 'incremental-index')`, + `AND NOT EXISTS (`, + ` SELECT 1 FROM job_reservations jr`, + ` WHERE jr.job_id = j.id AND jr.completed_at IS NULL`, + `)`, + `ORDER BY j.created_at ASC`, + ])) as { + id: string; + job_type: string; + args: { realmURL?: string }; + priority: number; + created_at: string; + }[]; + return rows.map((r) => ({ + jobId: Number(r.id), + jobType: r.job_type, + realmURL: r.args?.realmURL ?? 'unknown', + priority: r.priority, + createdAt: r.created_at, + })); + }; + + router.get('/_indexing-dashboard', async (ctxt: Koa.Context) => { + ctxt.set('Content-Type', 'text/html; charset=utf-8'); + let pending = await getPendingJobs(); + ctxt.body = renderIndexingDashboard({ + ...eventSink.getSnapshot(), + pending, + }); + ctxt.status = 200; + }); + router.get('/_indexing-status', async (ctxt: Koa.Context) => { + ctxt.set('Content-Type', 'application/json'); + let pending = await getPendingJobs(); + ctxt.body = JSON.stringify({ + ...eventSink.getSnapshot(), + pending, + }); + ctxt.status = 200; + }); + } webServer .use(router.routes()) @@ -529,6 +589,18 @@ async function startWorker( currentState = undefined; (worker as any).__boxelIndexState = undefined; } + } else if ( + eventSink && + typeof message === 'string' && + message.startsWith('progress|') + ) { + try { + let payload = message.substring('progress|'.length); + let progressEvent = JSON.parse(payload) as IndexingProgressEvent; + eventSink.handleEvent(progressEvent); + } catch (e) { + log.error(`Failed to parse progress event: ${e}`); + } } }); }), diff --git a/packages/realm-server/worker.ts b/packages/realm-server/worker.ts index 8e3f5cedb8..93b064e6a8 100644 --- a/packages/realm-server/worker.ts +++ b/packages/realm-server/worker.ts @@ -8,6 +8,7 @@ import { IndexWriter, registerCardReferencePrefix, type StatusArgs, + type IndexingProgressEvent, } from '@cardstack/runtime-common'; import yargs from 'yargs'; import * as Sentry from '@sentry/node'; @@ -121,6 +122,12 @@ let autoMigrate = migrateDB || undefined; } } + function reportProgress(event: IndexingProgressEvent) { + if (!ECS_CONTAINER_METADATA_URI && process.send) { + process.send(`progress|${JSON.stringify(event)}`); + } + } + let dbAdapter = new PgAdapter({ autoMigrate }); let queue = new PgQueueRunner({ adapter: dbAdapter, workerId, priority }); let worker = new Worker({ @@ -130,6 +137,7 @@ let autoMigrate = migrateDB || undefined; matrixURL: new URL(matrixURL), secretSeed: REALM_SECRET_SEED, reportStatus, + reportProgress, realmServerMatrixUsername: REALM_SERVER_MATRIX_USERNAME, dbAdapter, queuePublisher: new PgQueuePublisher(dbAdapter), diff --git a/packages/runtime-common/index-runner.ts b/packages/runtime-common/index-runner.ts index 4fc6084946..6423bd8095 100644 --- a/packages/runtime-common/index-runner.ts +++ b/packages/runtime-common/index-runner.ts @@ -28,6 +28,7 @@ import { moduleFrom } from './code-ref'; import type { CacheScope, DefinitionLookup } from './definition-lookup'; import { resolveCardReference } from './card-reference-resolver'; import { isCardError } from './error'; +import type { IndexingProgressEvent } from './worker'; import { IndexRunnerDependencyManager } from './index-runner/dependency-resolver'; import { discoverInvalidations } from './index-runner/discover-invalidations'; import { visitFileForIndexing } from './index-runner/visit-file'; @@ -61,6 +62,7 @@ export class IndexRunner { jobInfo: JobInfo | undefined, status: 'start' | 'finish', ) => void; + #onProgress?: (event: IndexingProgressEvent) => void; readonly stats: Stats = { instancesIndexed: 0, filesIndexed: 0, @@ -78,6 +80,7 @@ export class IndexRunner { ignoreData = {}, jobInfo, reportStatus, + onProgress, prerenderer, auth, fetch, @@ -97,6 +100,7 @@ export class IndexRunner { jobInfo: JobInfo | undefined, status: 'start' | 'finish', ): void; + onProgress?(event: IndexingProgressEvent): void; }) { this.#indexWriter = indexWriter; this.#realmPaths = new RealmPaths(realmURL); @@ -105,6 +109,7 @@ export class IndexRunner { this.#ignoreData = ignoreData; this.#jobInfo = jobInfo ?? { jobId: -1, reservationId: -1 }; this.#reportStatus = reportStatus; + this.#onProgress = onProgress; this.#prerenderer = prerenderer; this.#auth = auth; this.#fetch = fetch; @@ -164,18 +169,45 @@ export class IndexRunner { await current.#dependencyResolver.orderInvalidationsByDependencies( invalidations, ); - for (let invalidation of invalidations) { - await current.tryToVisit(invalidation); + current.#onProgress?.({ + type: 'indexing-started', + realmURL: current.realmURL.href, + jobId: current.#jobInfo.jobId, + jobType: 'from-scratch', + totalFiles: invalidations.length, + files: invalidations.map((u) => u.href), + }); + try { + let filesCompleted = 0; + for (let invalidation of invalidations) { + await current.tryToVisit(invalidation); + filesCompleted++; + current.#onProgress?.({ + type: 'file-visited', + realmURL: current.realmURL.href, + jobId: current.#jobInfo.jobId, + url: invalidation.href, + filesCompleted, + totalFiles: invalidations.length, + }); + } + current.#perfLog.debug( + `${jobIdentity(current.#jobInfo)} completed index visit in ${Date.now() - visitStart} ms`, + ); + let finalizeStart = Date.now(); + let { totalIndexEntries } = await current.batch.done(); + current.#perfLog.debug( + `${jobIdentity(current.#jobInfo)} completed index finalization in ${Date.now() - finalizeStart} ms`, + ); + current.stats.totalIndexEntries = totalIndexEntries; + } finally { + current.#onProgress?.({ + type: 'indexing-finished', + realmURL: current.realmURL.href, + jobId: current.#jobInfo.jobId, + stats: current.stats, + }); } - current.#perfLog.debug( - `${jobIdentity(current.#jobInfo)} completed index visit in ${Date.now() - visitStart} ms`, - ); - let finalizeStart = Date.now(); - let { totalIndexEntries } = await current.batch.done(); - current.#perfLog.debug( - `${jobIdentity(current.#jobInfo)} completed index finalization in ${Date.now() - finalizeStart} ms`, - ); - current.stats.totalIndexEntries = totalIndexEntries; current.#log.debug( `${jobIdentity(current.#jobInfo)} completed from scratch indexing in ${Date.now() - start}ms`, ); @@ -242,19 +274,46 @@ export class IndexRunner { } let hrefs = urls.map((u) => u.href); - for (let invalidation of invalidations) { - if ( - operations.get(invalidation.href) === 'delete' && - hrefs.includes(invalidation.href) - ) { - // file is deleted, there is nothing to visit - } else { - await current.tryToVisit(invalidation); + current.#onProgress?.({ + type: 'indexing-started', + realmURL: current.realmURL.href, + jobId: current.#jobInfo.jobId, + jobType: 'incremental', + totalFiles: invalidations.length, + files: invalidations.map((u) => u.href), + }); + try { + let filesCompleted = 0; + for (let invalidation of invalidations) { + if ( + operations.get(invalidation.href) === 'delete' && + hrefs.includes(invalidation.href) + ) { + // file is deleted, there is nothing to visit + } else { + await current.tryToVisit(invalidation); + } + filesCompleted++; + current.#onProgress?.({ + type: 'file-visited', + realmURL: current.realmURL.href, + jobId: current.#jobInfo.jobId, + url: invalidation.href, + filesCompleted, + totalFiles: invalidations.length, + }); } - } - let { totalIndexEntries } = await current.batch.done(); - current.stats.totalIndexEntries = totalIndexEntries; + let { totalIndexEntries } = await current.batch.done(); + current.stats.totalIndexEntries = totalIndexEntries; + } finally { + current.#onProgress?.({ + type: 'indexing-finished', + realmURL: current.realmURL.href, + jobId: current.#jobInfo.jobId, + stats: current.stats, + }); + } current.#log.debug( `${jobIdentity(current.#jobInfo)} completed incremental indexing for ${urls.map((u) => u.href).join()} in ${ diff --git a/packages/runtime-common/tasks/index.ts b/packages/runtime-common/tasks/index.ts index 00e1a14324..f491875dbc 100644 --- a/packages/runtime-common/tasks/index.ts +++ b/packages/runtime-common/tasks/index.ts @@ -8,7 +8,7 @@ import type { RealmPermissions, DefinitionLookup, } from '../index'; -import type { JobInfo } from '../worker'; +import type { JobInfo, IndexingProgressEvent } from '../worker'; export * from './lint'; export * from './full-reindex'; export * from './daily-credit-grant'; @@ -30,6 +30,7 @@ export interface TaskArgs { getAuthedFetch(args: WorkerArgs): Promise; createPrerenderAuth(userId: string, permissions: RealmPermissions): string; reportStatus(jobInfo: JobInfo | undefined, status: 'start' | 'finish'): void; + reportProgress?(event: IndexingProgressEvent): void; } export type Task = ( diff --git a/packages/runtime-common/tasks/indexer.ts b/packages/runtime-common/tasks/indexer.ts index db7f81c0cd..9113289921 100644 --- a/packages/runtime-common/tasks/indexer.ts +++ b/packages/runtime-common/tasks/indexer.ts @@ -236,6 +236,7 @@ registerQueueJobDefinition({ const fromScratchIndex: Task = ({ log, reportStatus, + reportProgress, dbAdapter, matrixURL, indexWriter, @@ -268,6 +269,7 @@ const fromScratchIndex: Task = ({ definitionLookup, jobInfo, reportStatus, + onProgress: reportProgress, auth, fetch: _fetch, prerenderer, @@ -292,6 +294,7 @@ const fromScratchIndex: Task = ({ const incrementalIndex: Task = ({ log, reportStatus, + reportProgress, dbAdapter, matrixURL, indexWriter, @@ -325,6 +328,7 @@ const incrementalIndex: Task = ({ definitionLookup, jobInfo, reportStatus, + onProgress: reportProgress, auth, fetch: _fetch, prerenderer, diff --git a/packages/runtime-common/worker.ts b/packages/runtime-common/worker.ts index 84458b79dd..8af5e377f8 100644 --- a/packages/runtime-common/worker.ts +++ b/packages/runtime-common/worker.ts @@ -64,6 +64,18 @@ export interface StatusArgs { deps?: string[]; } +export interface IndexingProgressEvent { + type: 'indexing-started' | 'file-visited' | 'indexing-finished'; + realmURL: string; + jobId: number; + jobType?: string; + totalFiles?: number; + filesCompleted?: number; + files?: string[]; + url?: string; + stats?: Stats; +} + export class Worker { #log = logger('worker'); #indexWriter: IndexWriter; @@ -77,6 +89,7 @@ export class Worker { #realmAuthCache: Map = new Map(); #secretSeed: string; #reportStatus: ((args: StatusArgs) => void) | undefined; + #reportProgress: ((event: IndexingProgressEvent) => void) | undefined; #realmServerMatrixUsername; #createPrerenderAuth: ( userId: string, @@ -93,6 +106,7 @@ export class Worker { realmServerMatrixUsername, secretSeed, reportStatus, + reportProgress, prerenderer, createPrerenderAuth, }: { @@ -106,6 +120,7 @@ export class Worker { secretSeed: string; prerenderer: Prerenderer; reportStatus?: (args: StatusArgs) => void; + reportProgress?: (event: IndexingProgressEvent) => void; createPrerenderAuth: ( userId: string, permissions: RealmPermissions, @@ -117,6 +132,7 @@ export class Worker { this.#matrixURL = matrixURL; this.#secretSeed = secretSeed; this.#reportStatus = reportStatus; + this.#reportProgress = reportProgress; this.#realmServerMatrixUsername = realmServerMatrixUsername; this.#dbAdapter = dbAdapter; this.#queuePublisher = queuePublisher; @@ -142,6 +158,7 @@ export class Worker { queuePublisher: this.#queuePublisher, getAuthedFetch: this.makeAuthedFetch.bind(this), reportStatus: this.reportStatus.bind(this), + reportProgress: this.reportProgress.bind(this), createPrerenderAuth: this.#createPrerenderAuth, }; @@ -220,6 +237,10 @@ export class Worker { this.#reportStatus?.({ ...args, jobId: String(args.jobId), status }); } } + + private reportProgress(event: IndexingProgressEvent) { + this.#reportProgress?.(event); + } } export function getReader(