diff --git a/CHANGELOG.md b/CHANGELOG.md index 0bef403..2434e16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to this project will be documented in this file. +## 0.6.0-beta.5 - 2026-02-25 + +- Merges main into split_db_adapter (per-batch polling, coordinator/worker architecture, destroyPayload cleanup). + ## 0.6.0-beta.4 - 2026-02-20 - Merges main with should embed changes. @@ -87,6 +91,26 @@ const score = result.similarity const score = result.score ``` +## 0.5.5 - 2026-02-24 + +### Added + +- **`batchLimit` option on `CollectionVectorizeOption`** – limits the number of documents fetched per bulk-embed worker job. When set, each page of results queues a continuation job for the next page, preventing serverless time-limit issues on large collections. Defaults to 1000. + +### Changed + +- **Coordinator / worker architecture for `prepare-bulk-embedding`** – the initial job now acts as a coordinator that fans out one worker job per collection. Each worker processes a single page of documents, making bulk embedding parallelizable and more resilient to timeouts. +- **Per-batch polling via `poll-or-complete-single-batch`** – replaced the monolithic `poll-or-complete-bulk-embedding` task. Each provider batch now has its own polling job, improving observability and reducing memory usage. +- **Memory-efficient incremental aggregation** – `finalizeRunIfComplete` now scans batch records page-by-page instead of loading all batches into memory at once. + +### Removed + +- `poll-or-complete-bulk-embedding` task (replaced by `poll-or-complete-single-batch`). + +### Upgrade Notes + +- **Ensure no bulk embedding run is in progress when upgrading.** The `poll-or-complete-bulk-embedding` task has been removed and replaced by `poll-or-complete-single-batch`. Any in-flight bulk run that still has pending `poll-or-complete-bulk-embedding` jobs will fail because the task slug no longer exists. Wait for all active runs to complete (or cancel them) before deploying this version. + ## 0.5.4 - 2026-02-20 ### Added diff --git a/README.md b/README.md index e311896..7b621b0 100644 --- a/README.md +++ b/README.md @@ -426,8 +426,8 @@ type OnBulkErrorArgs = { The plugin uses separate Payload jobs for reliability with long-running providers: -- **`prepare-bulk-embedding`**: Streams through documents, calls your `addChunk` for each chunk, creates batch records. -- **`poll-or-complete-bulk-embedding`**: Polls all batches, requeues itself until done, then writes all successful embeddings (partial chunk failures are allowed). +- **`prepare-bulk-embedding`**: A coordinator job fans out one worker per collection. Each worker streams through documents, calls your `addChunk` for each chunk, and creates batch records. When `batchLimit` is set on a collection, workers paginate and queue continuation jobs. +- **`poll-or-complete-single-batch`**: Polls a single batch, requeues itself until done, then writes successful embeddings. When all batches for a run are terminal, the run is finalized (partial chunk failures are allowed). ### Queue Configuration @@ -561,6 +561,7 @@ curl -X POST http://localhost:3000/api/vector-retry-failed-batch \ - `shouldEmbedFn? (doc, payload)` – optional filter that runs **before** the document is queued for embedding. Return `false` to skip the document entirely (no job is created and `toKnowledgePool` is never called). Works for both real-time and bulk embedding. Defaults to embedding all documents when omitted. - `toKnowledgePool (doc, payload)` – return an array of `{ chunk, ...extensionFieldValues }`. Each object becomes one embedding row and the index in the array determines `chunkIndex`. +- `batchLimit? (number)` – max documents to fetch per bulk-embed worker job. When set, each page of results becomes a separate job that queues a continuation for the next page. Useful for large collections that would exceed serverless time limits in a single job. Defaults to 1000. Reserved column names: `sourceCollection`, `docId`, `chunkIndex`, `chunkText`, `embeddingVersion`. Avoid reusing them in `extensionFields`. diff --git a/adapters/pg/dev/specs/extensionFields.spec.ts b/adapters/pg/dev/specs/extensionFields.spec.ts index bc553ba..43ebf2c 100644 --- a/adapters/pg/dev/specs/extensionFields.spec.ts +++ b/adapters/pg/dev/specs/extensionFields.spec.ts @@ -1,8 +1,8 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { postgresAdapter } from '@payloadcms/db-postgres' import { buildDummyConfig, integration, plugin } from './constants.js' -import { createTestDb, waitForVectorizationJobs } from './utils.js' +import { createTestDb, destroyPayload, waitForVectorizationJobs } from './utils.js' import { getPayload } from 'payload' import { PostgresPayload } from '../../src/types.js' import { chunkText, chunkRichTextSimple as chunkRichText } from '@shared-test/helpers/chunkers' @@ -113,6 +113,10 @@ describe('Extension fields integration tests', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('extension fields are added to the embeddings table schema', async () => { const db = (payload as PostgresPayload).db const sql = ` diff --git a/adapters/pg/dev/specs/integration.spec.ts b/adapters/pg/dev/specs/integration.spec.ts index a809273..0090c3f 100644 --- a/adapters/pg/dev/specs/integration.spec.ts +++ b/adapters/pg/dev/specs/integration.spec.ts @@ -4,13 +4,14 @@ * These tests verify Postgres-specific functionality like * vector column creation, schema modifications, etc. */ -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import type { Payload, SanitizedConfig } from 'payload' import { buildConfig, getPayload } from 'payload' import { postgresAdapter } from '@payloadcms/db-postgres' import { lexicalEditor } from '@payloadcms/richtext-lexical' import { Client } from 'pg' import { createPostgresVectorIntegration } from '../../src/index.js' +import { destroyPayload } from './utils.js' import payloadcmsVectorize from 'payloadcms-vectorize' const DIMS = 8 @@ -88,6 +89,10 @@ describe('Postgres-specific integration tests', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('adds embeddings collection with vector column', async () => { // Check schema for embeddings collection const collections = payload.collections diff --git a/adapters/pg/dev/specs/migrationCli.spec.ts b/adapters/pg/dev/specs/migrationCli.spec.ts index fd6004a..221f84f 100644 --- a/adapters/pg/dev/specs/migrationCli.spec.ts +++ b/adapters/pg/dev/specs/migrationCli.spec.ts @@ -4,7 +4,7 @@ import { postgresAdapter } from '@payloadcms/db-postgres' import { buildConfig, getPayload } from 'payload' import { createPostgresVectorIntegration } from '../../src/index.js' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from '@shared-test/helpers/embed' -import { createTestDb } from './utils.js' +import { createTestDb, destroyPayload } from './utils.js' import { DIMS } from './constants.js' const createVectorizeIntegration = createPostgresVectorIntegration @@ -77,6 +77,10 @@ describe('Migration CLI integration tests', () => { payload = await getPayload({ config, cron: true }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('VectorizedPayload has _staticConfigs via getDbAdapterCustom', async () => { const { getVectorizedPayload } = await import('payloadcms-vectorize') const vectorizedPayload = getVectorizedPayload(payload) @@ -159,6 +163,10 @@ describe('Migration CLI integration tests', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('vector search fails with descriptive error when embedding column missing', async () => { const { getVectorizedPayload } = await import('payloadcms-vectorize') const vectorizedPayload = getVectorizedPayload(payload) @@ -207,7 +215,7 @@ describe('Migration CLI integration tests', () => { }) afterAll(async () => { - // Cleanup: remove test migrations directory + await destroyPayload(autoPayload) if (existsSync(migrationsDir)) { rmSync(migrationsDir, { recursive: true, force: true }) } @@ -429,6 +437,7 @@ describe('Migration CLI integration tests', () => { }) afterAll(async () => { + await destroyPayload(dimsPayload) if (existsSync(migrationsDir)) { rmSync(migrationsDir, { recursive: true, force: true }) } @@ -729,6 +738,7 @@ describe('Migration CLI integration tests', () => { }) afterAll(async () => { + await destroyPayload(multiPayload) if (existsSync(migrationsDir)) { rmSync(migrationsDir, { recursive: true, force: true }) } diff --git a/adapters/pg/dev/specs/multipools.spec.ts b/adapters/pg/dev/specs/multipools.spec.ts index 497db69..04bef3c 100644 --- a/adapters/pg/dev/specs/multipools.spec.ts +++ b/adapters/pg/dev/specs/multipools.spec.ts @@ -1,10 +1,10 @@ import type { Payload, SanitizedConfig } from 'payload' import { buildConfig } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { lexicalEditor } from '@payloadcms/richtext-lexical' import { postgresAdapter } from '@payloadcms/db-postgres' -import { createTestDb } from './utils.js' +import { createTestDb, destroyPayload } from './utils.js' import { getPayload } from 'payload' import { createPostgresVectorIntegration } from '../../src/index.js' import payloadcmsVectorize from 'payloadcms-vectorize' @@ -79,6 +79,10 @@ describe('Multiple knowledge pools', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('creates two embeddings collections with vector columns', async () => { const collections = payload.collections expect(collections).toHaveProperty('pool1') diff --git a/adapters/pg/dev/specs/schemaName.spec.ts b/adapters/pg/dev/specs/schemaName.spec.ts index 23cd366..438a634 100644 --- a/adapters/pg/dev/specs/schemaName.spec.ts +++ b/adapters/pg/dev/specs/schemaName.spec.ts @@ -3,12 +3,16 @@ import type { Payload } from 'payload' import { postgresAdapter } from '@payloadcms/db-postgres' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from '@shared-test/helpers/embed' import { Client } from 'pg' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import type { PostgresPayload } from '../../src/types.js' import { buildDummyConfig, DIMS, integration, plugin } from './constants.js' -import { createTestDb, waitForVectorizationJobs } from './utils.js' +import { + createTestDb, + destroyPayload, + waitForVectorizationJobs, +} from './utils.js' import { getPayload } from 'payload' import { getVectorizedPayload } from 'payloadcms-vectorize' const CUSTOM_SCHEMA = 'custom' @@ -91,6 +95,10 @@ describe('Custom schemaName support', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('embeddings table is created in custom schema', async () => { const db = (payload as PostgresPayload).db const tablesRes = await db.pool?.query( diff --git a/adapters/pg/dev/specs/utils.ts b/adapters/pg/dev/specs/utils.ts index 28ad9fc..d7b465b 100644 --- a/adapters/pg/dev/specs/utils.ts +++ b/adapters/pg/dev/specs/utils.ts @@ -1,6 +1,6 @@ import { Client } from 'pg' -export { waitForVectorizationJobs } from '@shared-test/utils' +export { waitForVectorizationJobs, destroyPayload } from '@shared-test/utils' export const createTestDb = async ({ dbName }: { dbName: string }) => { const adminUri = diff --git a/dev/specs/bulkEmbed/basic.spec.ts b/dev/specs/bulkEmbed/basic.spec.ts index cca0566..174f3c1 100644 --- a/dev/specs/bulkEmbed/basic.spec.ts +++ b/dev/specs/bulkEmbed/basic.spec.ts @@ -1,5 +1,5 @@ import type { Payload, SanitizedConfig } from 'payload' -import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js' import { BULK_EMBEDDINGS_INPUT_METADATA_SLUG } from '../../../src/collections/bulkEmbeddingInputMetadata.js' @@ -10,6 +10,7 @@ import { clearAllCollections, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -56,6 +57,10 @@ describe('Bulk embed - basic tests', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + beforeEach(async () => { await clearAllCollections(payload) }) diff --git a/dev/specs/bulkEmbed/batchLimit.spec.ts b/dev/specs/bulkEmbed/batchLimit.spec.ts new file mode 100644 index 0000000..d9170a8 --- /dev/null +++ b/dev/specs/bulkEmbed/batchLimit.spec.ts @@ -0,0 +1,117 @@ +import type { Payload } from 'payload' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' +import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' +import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js' +import { + BULK_QUEUE_NAMES, + DEFAULT_DIMS, + buildPayloadWithIntegration, + createMockBulkEmbeddings, + createTestDb, + destroyPayload, + waitForBulkJobs, +} from '../utils.js' +import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' +import { getVectorizedPayload, VectorizedPayload } from 'payloadcms-vectorize' +import { expectGoodResult } from '../utils.vitest.js' +import { createMockAdapter } from 'helpers/mockAdapter.js' + +const DIMS = DEFAULT_DIMS +const dbName = `bulk_batchlimit_${Date.now()}` + +describe('Bulk embed - batchLimit', () => { + let payload: Payload + let vectorizedPayload: VectorizedPayload | null = null + + beforeAll(async () => { + await createTestDb({ dbName }) + const built = await buildPayloadWithIntegration({ + dbName, + pluginOpts: { + dbAdapter: createMockAdapter(), + knowledgePools: { + default: { + collections: { + posts: { + toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], + batchLimit: 2, + }, + }, + embeddingConfig: { + version: testEmbeddingVersion, + queryFn: makeDummyEmbedQuery(DIMS), + bulkEmbeddingsFns: createMockBulkEmbeddings({ + statusSequence: ['succeeded'], + }), + }, + }, + }, + bulkQueueNames: BULK_QUEUE_NAMES, + }, + key: `batchlimit-${Date.now()}`, + }) + payload = built.payload + vectorizedPayload = getVectorizedPayload(payload) + }) + + afterAll(async () => { + await destroyPayload(payload) + }) + + test('batchLimit splits docs across continuation jobs and all get embedded', async () => { + // Create 5 posts with batchLimit: 2 + // Should result in 3 prepare jobs (2 docs, 2 docs, 1 doc) + for (let i = 0; i < 5; i++) { + await payload.create({ collection: 'posts', data: { title: `BatchLimit Post ${i}` } as any }) + } + + const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' }) + expectGoodResult(result) + + await waitForBulkJobs(payload, 30000) + + // All 5 posts should have embeddings + const embeds = await payload.find({ collection: 'default' }) + expect(embeds.totalDocs).toBe(5) + + // Run should be succeeded + const runDoc = ( + await (payload as any).find({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + where: { id: { equals: result!.runId } }, + }) + ).docs[0] + expect(runDoc.status).toBe('succeeded') + expect(runDoc.inputs).toBe(5) + }) + + test('batchLimit equal to doc count does not create extra continuations', async () => { + // Clean up from prior test: delete all posts and embeddings + await payload.delete({ collection: 'posts', where: {} }) + await payload.delete({ collection: 'default' as any, where: {} }) + + // Create exactly 2 posts (matching batchLimit: 2) + for (let i = 0; i < 2; i++) { + await payload.create({ + collection: 'posts', + data: { title: `Exact Post ${i}` } as any, + }) + } + + const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' }) + expectGoodResult(result) + + await waitForBulkJobs(payload, 20000) + + const embeds = await payload.find({ collection: 'default' }) + expect(embeds.totalDocs).toBe(2) + + const runDoc = ( + await (payload as any).find({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + where: { id: { equals: result!.runId } }, + }) + ).docs[0] + expect(runDoc.status).toBe('succeeded') + }) +}) diff --git a/dev/specs/bulkEmbed/canceledBatch.spec.ts b/dev/specs/bulkEmbed/canceledBatch.spec.ts index 1e3f0b3..ade1df2 100644 --- a/dev/specs/bulkEmbed/canceledBatch.spec.ts +++ b/dev/specs/bulkEmbed/canceledBatch.spec.ts @@ -1,11 +1,12 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_QUEUE_NAMES, DEFAULT_DIMS, buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -51,6 +52,10 @@ describe('Bulk embed - canceled batch', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('canceled batch marks entire run as failed', async () => { const post = await payload.create({ collection: 'posts', data: { title: 'Cancel' } as any }) const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' }) diff --git a/dev/specs/bulkEmbed/concurrentRuns.spec.ts b/dev/specs/bulkEmbed/concurrentRuns.spec.ts index 52c6b7e..27023bd 100644 --- a/dev/specs/bulkEmbed/concurrentRuns.spec.ts +++ b/dev/specs/bulkEmbed/concurrentRuns.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { getVectorizedPayload } from '../../../src/types.js' import { @@ -8,6 +8,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' import { createMockAdapter } from 'helpers/mockAdapter.js' @@ -47,6 +48,10 @@ describe('Bulk embed - concurrent runs prevention', () => { payload = built.payload }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('cannot start concurrent bulk embed runs for the same pool', async () => { const vectorizedPayload = getVectorizedPayload<'default'>(payload)! // Create a test post first diff --git a/dev/specs/bulkEmbed/extensionFields.spec.ts b/dev/specs/bulkEmbed/extensionFields.spec.ts index 61d488d..865aed5 100644 --- a/dev/specs/bulkEmbed/extensionFields.spec.ts +++ b/dev/specs/bulkEmbed/extensionFields.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_QUEUE_NAMES, @@ -7,6 +7,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -55,6 +56,10 @@ describe('Bulk embed - extension fields', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('extension fields are merged when writing embeddings', async () => { const post = await payload.create({ collection: 'posts', diff --git a/dev/specs/bulkEmbed/failedBatch.spec.ts b/dev/specs/bulkEmbed/failedBatch.spec.ts index cc863dd..8313167 100644 --- a/dev/specs/bulkEmbed/failedBatch.spec.ts +++ b/dev/specs/bulkEmbed/failedBatch.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js' import { BULK_EMBEDDINGS_INPUT_METADATA_SLUG } from '../../../src/collections/bulkEmbeddingInputMetadata.js' @@ -10,6 +10,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -51,6 +52,10 @@ describe('Bulk embed - failed batch', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('failed batch marks entire run as failed', async () => { const post = await payload.create({ collection: 'posts', data: { title: 'Fail' } as any }) diff --git a/dev/specs/bulkEmbed/ingestionFailure.spec.ts b/dev/specs/bulkEmbed/ingestionFailure.spec.ts index 40ab448..20fa09e 100644 --- a/dev/specs/bulkEmbed/ingestionFailure.spec.ts +++ b/dev/specs/bulkEmbed/ingestionFailure.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js' import { @@ -8,6 +8,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -25,6 +26,10 @@ describe('Bulk embed - ingestion validation failures', () => { await createTestDb({ dbName }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('malformed chunk entry fails the bulk embedding run', async () => { // Use unique version to ensure this test only processes its own data const testVersion = `${testEmbeddingVersion}-ingestion-fail-${Date.now()}` diff --git a/dev/specs/bulkEmbed/multipleBatches.spec.ts b/dev/specs/bulkEmbed/multipleBatches.spec.ts index fda0e17..b038e1d 100644 --- a/dev/specs/bulkEmbed/multipleBatches.spec.ts +++ b/dev/specs/bulkEmbed/multipleBatches.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js' import { @@ -8,6 +8,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -53,6 +54,10 @@ describe('Bulk embed - multiple batches', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('multiple batches are created when flushing after N chunks', async () => { // Create 5 posts (should result in 3 batches: 2, 2, 1) for (let i = 0; i < 5; i++) { diff --git a/dev/specs/bulkEmbed/multipleChunks.spec.ts b/dev/specs/bulkEmbed/multipleChunks.spec.ts index a19c521..8d1cd64 100644 --- a/dev/specs/bulkEmbed/multipleChunks.spec.ts +++ b/dev/specs/bulkEmbed/multipleChunks.spec.ts @@ -1,11 +1,12 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_QUEUE_NAMES, DEFAULT_DIMS, buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -53,6 +54,10 @@ describe('Bulk embed - multiple chunks with extension fields', () => { payload = built.payload }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('multiple chunks keep their respective extension fields', async () => { const post = await payload.create({ collection: 'posts', diff --git a/dev/specs/bulkEmbed/onError.spec.ts b/dev/specs/bulkEmbed/onError.spec.ts index 6cc258c..c6da88c 100644 --- a/dev/specs/bulkEmbed/onError.spec.ts +++ b/dev/specs/bulkEmbed/onError.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_QUEUE_NAMES, @@ -7,6 +7,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -58,6 +59,10 @@ describe('Bulk embed - onError callback', () => { payload = built.payload }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('onError callback is called when batch fails', async () => { await payload.create({ collection: 'posts', data: { title: 'Error Test' } as any }) diff --git a/dev/specs/bulkEmbed/partialFailure.spec.ts b/dev/specs/bulkEmbed/partialFailure.spec.ts index daed418..499dba8 100644 --- a/dev/specs/bulkEmbed/partialFailure.spec.ts +++ b/dev/specs/bulkEmbed/partialFailure.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_QUEUE_NAMES, @@ -7,6 +7,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -31,6 +32,10 @@ describe('Bulk embed - partial chunk failures', () => { await createTestDb({ dbName }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('partial chunk failures are tracked and passed to onError', async () => { // Reset state onErrorCalled = false diff --git a/dev/specs/bulkEmbed/partialFailureNoFail.spec.ts b/dev/specs/bulkEmbed/partialFailureNoFail.spec.ts index a83367d..7ff483a 100644 --- a/dev/specs/bulkEmbed/partialFailureNoFail.spec.ts +++ b/dev/specs/bulkEmbed/partialFailureNoFail.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_QUEUE_NAMES, @@ -7,6 +7,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -31,6 +32,10 @@ describe('Bulk embed - partial failures', () => { await createTestDb({ dbName }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('run with no partial failures does not call onError', async () => { // Reset state onErrorCalled = false diff --git a/dev/specs/bulkEmbed/polling.spec.ts b/dev/specs/bulkEmbed/polling.spec.ts index f07db3e..50e283b 100644 --- a/dev/specs/bulkEmbed/polling.spec.ts +++ b/dev/specs/bulkEmbed/polling.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test, vi } from 'vitest' +import { afterAll, beforeAll, describe, expect, test, vi } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_QUEUE_NAMES, @@ -7,6 +7,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { getVectorizedPayload } from 'payloadcms-vectorize' @@ -49,6 +50,10 @@ describe('Bulk embed - polling requeue', () => { payload = built.payload }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('polling requeues when non-terminal then succeeds', async () => { const post = await payload.create({ collection: 'posts', data: { title: 'Loop' } as any }) const queueSpy = vi.spyOn(payload.jobs, 'queue') @@ -59,12 +64,12 @@ describe('Bulk embed - polling requeue', () => { await waitForBulkJobs(payload, 15000) expect(queueSpy).toHaveBeenNthCalledWith( - 2, // 2nd call - expect.objectContaining({ task: 'payloadcms-vectorize:poll-or-complete-bulk-embedding' }), + 3, // 3rd call - per-batch task queued from worker (1=coordinator, 2=worker) + expect.objectContaining({ task: 'payloadcms-vectorize:poll-or-complete-single-batch' }), ) expect(queueSpy).toHaveBeenNthCalledWith( - 3, // 3rd call - expect.objectContaining({ task: 'payloadcms-vectorize:poll-or-complete-bulk-embedding' }), + 4, // 4th call - per-batch task re-queued after 'running' status + expect.objectContaining({ task: 'payloadcms-vectorize:poll-or-complete-single-batch' }), ) const embeds = await payload.find({ diff --git a/dev/specs/bulkEmbed/realtimeMode.spec.ts b/dev/specs/bulkEmbed/realtimeMode.spec.ts index d73a63c..9f0ecd6 100644 --- a/dev/specs/bulkEmbed/realtimeMode.spec.ts +++ b/dev/specs/bulkEmbed/realtimeMode.spec.ts @@ -1,11 +1,12 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_QUEUE_NAMES, DEFAULT_DIMS, buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForVectorizationJobs, } from '../utils.js' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -48,6 +49,10 @@ describe('Bulk embed - realtime mode', () => { payload = built.payload }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('realtime mode queues vectorize jobs when realTimeIngestionFn is provided', async () => { const post = await payload.create({ collection: 'posts', diff --git a/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts b/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts index ffe2382..c7ee29b 100644 --- a/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts +++ b/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts @@ -1,5 +1,5 @@ import type { Payload, SanitizedConfig } from 'payload' -import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest' import { BULK_QUEUE_NAMES, DEFAULT_DIMS, @@ -7,6 +7,7 @@ import { clearAllCollections, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -54,6 +55,10 @@ describe('Bulk embed - shouldEmbedFn', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + beforeEach(async () => { await clearAllCollections(payload) }) diff --git a/dev/specs/bulkEmbed/versionBump.spec.ts b/dev/specs/bulkEmbed/versionBump.spec.ts index d2a21b3..b60afbe 100644 --- a/dev/specs/bulkEmbed/versionBump.spec.ts +++ b/dev/specs/bulkEmbed/versionBump.spec.ts @@ -1,114 +1,93 @@ -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_QUEUE_NAMES, DEFAULT_DIMS, buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery } from 'helpers/embed.js' import { getVectorizedPayload } from '../../../src/types.js' import { expectGoodResult } from '../utils.vitest.js' import { createMockAdapter } from 'helpers/mockAdapter.js' +import type { Payload } from 'payload' const DIMS = DEFAULT_DIMS const dbName = `bulk_version_${Date.now()}` -// Use distinct bulk queue names per payload instance so that -// the second payload's cron worker handles its own bulk runs, -// instead of the first payload instance continuing to process them. -const BULK_QUEUE_NAMES_0 = BULK_QUEUE_NAMES -const BULK_QUEUE_NAMES_1 = { - prepareBulkEmbedQueueName: `${BULK_QUEUE_NAMES.prepareBulkEmbedQueueName}-v2`, - pollOrCompleteQueueName: `${BULK_QUEUE_NAMES.pollOrCompleteQueueName}-v2`, -} - describe('Bulk embed - version bump', () => { - let post: any + let payload: Payload + let knowledgePools: any + beforeAll(async () => { await createTestDb({ dbName }) - }) - test('version bump re-embeds all even without updates', async () => { - const payload0 = ( + knowledgePools = { + default: { + collections: { + posts: { + toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], + }, + }, + embeddingConfig: { + version: 'old-version', + queryFn: makeDummyEmbedQuery(DIMS), + bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), + }, + }, + } + + payload = ( await buildPayloadWithIntegration({ dbName, pluginOpts: { dbAdapter: createMockAdapter(), - knowledgePools: { - default: { - collections: { - posts: { - toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], - }, - }, - embeddingConfig: { - version: 'old-version', - queryFn: makeDummyEmbedQuery(DIMS), - bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), - }, - }, - }, - bulkQueueNames: BULK_QUEUE_NAMES_0, + knowledgePools, + bulkQueueNames: BULK_QUEUE_NAMES, }, - key: `payload0`, + key: `version-bump-${Date.now()}`, }) ).payload + }) - post = await payload0.create({ collection: 'posts', data: { title: 'Old' } as any }) + afterAll(async () => { + await destroyPayload(payload) + }) - const vectorizedPayload0 = getVectorizedPayload(payload0) - const result0 = await vectorizedPayload0?.bulkEmbed({ knowledgePool: 'default' }) + test('version bump re-embeds all even without updates', async () => { + // Phase 1: Bulk embed with old-version + const post = await payload.create({ collection: 'posts', data: { title: 'Old' } as any }) + + const vp = getVectorizedPayload(payload) + const result0 = await vp?.bulkEmbed({ knowledgePool: 'default' }) expectGoodResult(result0) - await waitForBulkJobs(payload0) + await waitForBulkJobs(payload, 30000) - // Debug: log embeddings after first run - const embeds0 = await payload0.find({ + const embeds0 = await payload.find({ collection: 'default', where: { docId: { equals: String(post.id) } }, }) expect(embeds0.totalDocs).toBe(1) expect(embeds0.docs[0].embeddingVersion).toBe('old-version') - const payload1 = ( - await buildPayloadWithIntegration({ - dbName, - pluginOpts: { - dbAdapter: createMockAdapter(), - knowledgePools: { - default: { - collections: { - posts: { - toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], - }, - }, - embeddingConfig: { - version: 'new-version', - queryFn: makeDummyEmbedQuery(DIMS), - bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), - }, - }, - }, - bulkQueueNames: BULK_QUEUE_NAMES_1, - }, - key: `payload1`, - skipMigrations: true, - }) - ).payload + // Phase 2: Mutate config to new-version and re-embed + knowledgePools.default.embeddingConfig.version = 'new-version' + knowledgePools.default.embeddingConfig.bulkEmbeddingsFns = createMockBulkEmbeddings({ + statusSequence: ['succeeded'], + }) - const vectorizedPayload1 = getVectorizedPayload(payload1) - const result1 = await vectorizedPayload1?.bulkEmbed({ knowledgePool: 'default' }) + const result1 = await vp?.bulkEmbed({ knowledgePool: 'default' }) expectGoodResult(result1) - await waitForBulkJobs(payload1) + await waitForBulkJobs(payload, 30000) - const embeds1 = await payload1.find({ + const embeds1 = await payload.find({ collection: 'default', where: { docId: { equals: String(post.id) } }, }) - expect(embeds1.totalDocs).toBe(1) expect(embeds1.docs[0].embeddingVersion).toBe('new-version') }) diff --git a/dev/specs/chunkers.spec.ts b/dev/specs/chunkers.spec.ts index d5905b5..cbf7848 100644 --- a/dev/specs/chunkers.spec.ts +++ b/dev/specs/chunkers.spec.ts @@ -19,6 +19,8 @@ describe('Chunkers', () => { }) const markdownContent = await getInitialMarkdownContent(cfg) + // chunkRichText only needs the SanitizedConfig for Lexical editor setup, + // no real db required const chunks = await chunkRichText(markdownContent, cfg) expect(chunks.length).toBe(3) diff --git a/dev/specs/config.spec.ts b/dev/specs/config.spec.ts index 3ec578d..7818cc4 100644 --- a/dev/specs/config.spec.ts +++ b/dev/specs/config.spec.ts @@ -17,7 +17,7 @@ describe('jobs.tasks merging', () => { { slug: 'payloadcms-vectorize:vectorize', handler: expect.any(Function) }, { slug: 'payloadcms-vectorize:prepare-bulk-embedding', handler: expect.any(Function) }, { - slug: 'payloadcms-vectorize:poll-or-complete-bulk-embedding', + slug: 'payloadcms-vectorize:poll-or-complete-single-batch', handler: expect.any(Function), }, ]), diff --git a/dev/specs/e2e.spec.ts b/dev/specs/e2e.spec.ts index 5b8bf85..7d81b5e 100644 --- a/dev/specs/e2e.spec.ts +++ b/dev/specs/e2e.spec.ts @@ -3,7 +3,7 @@ import type { Payload, SanitizedConfig } from 'payload' import config from '@payload-config' import { getPayload } from 'payload' import { getInitialMarkdownContent } from './constants.js' -import { waitForVectorizationJobs, waitForBulkJobs } from './utils.js' +import { destroyPayload, waitForVectorizationJobs, waitForBulkJobs } from './utils.js' import { testEmbeddingVersion } from 'helpers/embed.js' import { devUser } from 'helpers/credentials.js' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../src/collections/bulkEmbeddingsRuns.js' @@ -66,6 +66,10 @@ test.describe('Vector embedding e2e tests', () => { payload = await getPayload({ config: _config, key: `e2e-test-${Date.now()}` }) }) + test.afterAll(async () => { + await destroyPayload(payload) + }) + test('querying the endpoint should return the title with testEmbeddingVersion', async ({ request, }) => { diff --git a/dev/specs/extensionFields.spec.ts b/dev/specs/extensionFields.spec.ts index f8bd3b4..911056c 100644 --- a/dev/specs/extensionFields.spec.ts +++ b/dev/specs/extensionFields.spec.ts @@ -1,7 +1,11 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { postgresAdapter } from '@payloadcms/db-postgres' -import { createTestDb, waitForVectorizationJobs } from './utils.js' +import { + createTestDb, + destroyPayload, + waitForVectorizationJobs, +} from './utils.js' import { getPayload, buildConfig } from 'payload' import { chunkText, chunkRichText } from 'helpers/chunkers.js' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -118,6 +122,10 @@ describe('Extension fields integration tests', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('extension field values are stored with embeddings', async () => { const post = await payload.create({ collection: 'posts', diff --git a/dev/specs/extensionFieldsVectorSearch.spec.ts b/dev/specs/extensionFieldsVectorSearch.spec.ts index ee55b4b..0eada79 100644 --- a/dev/specs/extensionFieldsVectorSearch.spec.ts +++ b/dev/specs/extensionFieldsVectorSearch.spec.ts @@ -1,7 +1,11 @@ import { describe, expect, test } from 'vitest' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' import { buildDummyConfig, DIMS } from './constants.js' -import { createTestDb, waitForVectorizationJobs } from './utils.js' +import { + createTestDb, + destroyPayload, + waitForVectorizationJobs, +} from './utils.js' import { getPayload } from 'payload' import { postgresAdapter } from '@payloadcms/db-postgres' import { chunkRichText, chunkText } from 'helpers/chunkers.js' @@ -111,55 +115,59 @@ describe('extensionFields', () => { cron: true, }) - // Create a post with extension field values - const testQuery = 'Extension fields test content' - const post = await payloadWithExtensions.create({ - collection: 'posts', - data: { - title: testQuery, - content: null, - category: 'tech', - priorityLevel: 42, - } as unknown as any, - }) + try { + // Create a post with extension field values + const testQuery = 'Extension fields test content' + const post = await payloadWithExtensions.create({ + collection: 'posts', + data: { + title: testQuery, + content: null, + category: 'tech', + priorityLevel: 42, + } as unknown as any, + }) - // Wait for vectorization jobs to complete - await waitForVectorizationJobs(payloadWithExtensions) + // Wait for vectorization jobs to complete + await waitForVectorizationJobs(payloadWithExtensions) - // Perform vector search - const knowledgePools: Record = { - default: defaultKnowledgePool, - } - const searchHandler = createVectorSearchHandlers(knowledgePools, adapter).requestHandler - const mockRequest = { - json: async () => ({ - query: testQuery, - knowledgePool: 'default', - }), - payload: payloadWithExtensions, - } as any - const response = await searchHandler(mockRequest) - const json = await response.json() + // Perform vector search + const knowledgePools: Record = { + default: defaultKnowledgePool, + } + const searchHandler = createVectorSearchHandlers(knowledgePools, adapter).requestHandler + const mockRequest = { + json: async () => ({ + query: testQuery, + knowledgePool: 'default', + }), + payload: payloadWithExtensions, + } as any + const response = await searchHandler(mockRequest) + const json = await response.json() - // Verify results contain extensionFields - expect(json).toHaveProperty('results') - expect(Array.isArray(json.results)).toBe(true) - expect(json.results.length).toBeGreaterThan(0) + // Verify results contain extensionFields + expect(json).toHaveProperty('results') + expect(Array.isArray(json.results)).toBe(true) + expect(json.results.length).toBeGreaterThan(0) - // Find a result that matches our post - const matchingResult = json.results.find( - (r: any) => r.docId === String(post.id) && r.chunkText === testQuery, - ) - expect(matchingResult).toBeDefined() + // Find a result that matches our post + const matchingResult = json.results.find( + (r: any) => r.docId === String(post.id) && r.chunkText === testQuery, + ) + expect(matchingResult).toBeDefined() - // Verify extensionFields are present - expect(matchingResult).toHaveProperty('category') - expect(matchingResult).toHaveProperty('priorityLevel') + // Verify extensionFields are present + expect(matchingResult).toHaveProperty('category') + expect(matchingResult).toHaveProperty('priorityLevel') - // Verify types are correct - expect(typeof matchingResult.category).toBe('string') - expect(matchingResult.category).toBe('tech') - expect(typeof matchingResult.priorityLevel).toBe('number') - expect(matchingResult.priorityLevel).toBe(42) + // Verify types are correct + expect(typeof matchingResult.category).toBe('string') + expect(matchingResult.category).toBe('tech') + expect(typeof matchingResult.priorityLevel).toBe('number') + expect(matchingResult.priorityLevel).toBe(42) + } finally { + await destroyPayload(payloadWithExtensions) + } }) }) diff --git a/dev/specs/failedValidation.spec.ts b/dev/specs/failedValidation.spec.ts index 933e046..9dd44c4 100644 --- a/dev/specs/failedValidation.spec.ts +++ b/dev/specs/failedValidation.spec.ts @@ -3,7 +3,11 @@ import { buildConfig } from 'payload' import { describe, expect, test } from 'vitest' import payloadcmsVectorize from '../../src/index.js' -import { createTestDb, waitForVectorizationJobs } from './utils.js' +import { + createTestDb, + destroyPayload, + waitForVectorizationJobs, +} from './utils.js' import { getPayload } from 'payload' import { createMockAdapter } from 'helpers/mockAdapter.js' @@ -71,31 +75,35 @@ describe('Validation failures mark jobs as errored', () => { cron: true, }) - await payload.create({ - collection: 'posts', - data: { title: 'bad chunks' }, - }) + try { + await payload.create({ + collection: 'posts', + data: { title: 'bad chunks' }, + }) - // Wait for the queued job to finish (success or failure) - await waitForVectorizationJobs(payload, 30000) + // Wait for the queued job to finish (success or failure) + await waitForVectorizationJobs(payload, 30000) - // Then assert failure - const res = await payload.find({ - collection: 'payload-jobs', - where: { - and: [{ taskSlug: { equals: 'payloadcms-vectorize:vectorize' } }], - }, - limit: 1, - sort: '-createdAt', - }) - const failedJob = (res as any)?.docs?.[0] - expect(failedJob.hasError).toBe(true) - const errMsg = failedJob.error.message - expect(errMsg).toMatch(/chunk/i) - expect(errMsg).toMatch(/Invalid indices: 1/) + // Then assert failure + const res = await payload.find({ + collection: 'payload-jobs', + where: { + and: [{ taskSlug: { equals: 'payloadcms-vectorize:vectorize' } }], + }, + limit: 1, + sort: '-createdAt', + }) + const failedJob = (res as any)?.docs?.[0] + expect(failedJob.hasError).toBe(true) + const errMsg = failedJob.error.message + expect(errMsg).toMatch(/chunk/i) + expect(errMsg).toMatch(/Invalid indices: 1/) - // Ensure no embeddings were created (all-or-nothing validation) - const embeddingsCount = await payload.count({ collection: 'default' }) - expect(embeddingsCount.totalDocs).toBe(0) + // Ensure no embeddings were created (all-or-nothing validation) + const embeddingsCount = await payload.count({ collection: 'default' }) + expect(embeddingsCount.totalDocs).toBe(0) + } finally { + await destroyPayload(payload) + } }, 60000) }) diff --git a/dev/specs/int.spec.ts b/dev/specs/int.spec.ts index 792dbab..9fede10 100644 --- a/dev/specs/int.spec.ts +++ b/dev/specs/int.spec.ts @@ -1,6 +1,6 @@ import type { Payload, SanitizedConfig } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' import { chunkRichText, chunkText } from 'helpers/chunkers.js' import { createHeadlessEditor } from '@payloadcms/richtext-lexical/lexical/headless' @@ -13,7 +13,11 @@ import { import { $createHeadingNode } from '@payloadcms/richtext-lexical/lexical/rich-text' import { editorConfigFactory, getEnabledNodes, lexicalEditor } from '@payloadcms/richtext-lexical' import { DIMS, getInitialMarkdownContent } from './constants.js' -import { createTestDb, waitForVectorizationJobs } from './utils.js' +import { + createTestDb, + destroyPayload, + waitForVectorizationJobs, +} from './utils.js' import { getPayload } from 'payload' import { postgresAdapter } from '@payloadcms/db-postgres' import { buildConfig } from 'payload' @@ -98,6 +102,10 @@ describe('Plugin integration tests', () => { markdownContent = await getInitialMarkdownContent(config) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('creates embeddings on create', async () => { const title = 'Hello world' const post = await payload.create({ diff --git a/dev/specs/queueName.spec.ts b/dev/specs/queueName.spec.ts index 1e15d22..57615aa 100644 --- a/dev/specs/queueName.spec.ts +++ b/dev/specs/queueName.spec.ts @@ -1,10 +1,10 @@ import type { Payload, SanitizedConfig } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { chunkText, chunkRichText } from 'helpers/chunkers.js' import type { SerializedEditorState } from '@payloadcms/richtext-lexical/lexical' import { postgresAdapter } from '@payloadcms/db-postgres' import { buildDummyConfig, getInitialMarkdownContent } from './constants.js' -import { createTestDb } from './utils.js' +import { createTestDb, destroyPayload } from './utils.js' import { getPayload } from 'payload' import payloadcmsVectorize from 'payloadcms-vectorize' import { createMockAdapter } from 'helpers/mockAdapter.js' @@ -76,6 +76,11 @@ describe('Queue tests', () => { }) markdownContent = await getInitialMarkdownContent(config) }) + + afterAll(async () => { + await destroyPayload(payload) + }) + test('vectorization jobs are queued using the queueName', async () => { // There is no autoRun so previous jobs are queued and never removed between tests const prevJobs = await payload.find({ diff --git a/dev/specs/shouldEmbedFn.spec.ts b/dev/specs/shouldEmbedFn.spec.ts index 539e0d0..27ed84e 100644 --- a/dev/specs/shouldEmbedFn.spec.ts +++ b/dev/specs/shouldEmbedFn.spec.ts @@ -1,8 +1,8 @@ import type { Payload, SanitizedConfig } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' import { DIMS } from './constants.js' -import { createTestDb, waitForVectorizationJobs } from './utils.js' +import { createTestDb, destroyPayload, waitForVectorizationJobs } from './utils.js' import { getPayload } from 'payload' import { postgresAdapter } from '@payloadcms/db-postgres' import { buildConfig } from 'payload' @@ -73,6 +73,10 @@ describe('shouldEmbedFn - real-time', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('shouldEmbedFn filters documents on real-time create', async () => { const skippedPost = await payload.create({ collection: 'posts', diff --git a/dev/specs/utils.ts b/dev/specs/utils.ts index b45ee28..8739b10 100644 --- a/dev/specs/utils.ts +++ b/dev/specs/utils.ts @@ -75,12 +75,39 @@ export async function waitForVectorizationJobs(payload: Payload, maxWaitMs = 100 await waitForTasks(payload, [TASK_SLUG_VECTORIZE], maxWaitMs) } -export async function waitForBulkJobs(payload: Payload, maxWaitMs = 10000) { - await waitForTasks( - payload, - [TASK_SLUG_PREPARE_BULK_EMBEDDING, TASK_SLUG_POLL_OR_COMPLETE_BULK_EMBEDDING], - maxWaitMs, +export async function waitForBulkJobs(payload: Payload, maxWaitMs = 10000, intervalMs = 250) { + const hasJobsCollection = (payload as any)?.config?.collections?.some( + (c: any) => c.slug === 'payload-jobs', ) + if (!hasJobsCollection) return + + const taskSlugs = [TASK_SLUG_PREPARE_BULK_EMBEDDING, TASK_SLUG_POLL_OR_COMPLETE_BULK_EMBEDDING] + const startTime = Date.now() + + while (Date.now() - startTime < maxWaitMs) { + const pending = await payload.find({ + collection: 'payload-jobs', + where: { + and: [{ taskSlug: { in: taskSlugs } }, { completedAt: { exists: false } }], + }, + }) + + if (pending.totalDocs === 0) { + // No pending jobs — but with coordinator/worker fan-out, new jobs may + // appear between the coordinator completing and the worker being queued. + // Double-check: if any bulk run is still non-terminal, keep waiting. + const activeRuns = await payload.find({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + where: { status: { in: ['queued', 'running'] } }, + limit: 1, + }) + if (activeRuns.totalDocs === 0) return + } + + await new Promise((resolve) => setTimeout(resolve, intervalMs)) + } + // One last grace wait + await new Promise((resolve) => setTimeout(resolve, 500)) } export const DEFAULT_DIMS = 8 @@ -277,3 +304,25 @@ export const clearAllCollections = async (pl: Payload) => { await safeDelete('payload-jobs') } +/** Safely destroy a Payload instance (stops crons, closes DB pool). */ +export async function destroyPayload(payload: Payload | null | undefined) { + if (payload) await payload.destroy() +} + +export async function createSucceededBaselineRun( + payload: Payload, + { + version, + completedAt = new Date().toISOString(), + }: { version?: string; completedAt?: string } = {}, +) { + return (payload as any).create({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { + pool: 'default', + embeddingVersion: version ?? '', + status: 'succeeded', + completedAt, + }, + }) +} diff --git a/dev/specs/vectorSearch.spec.ts b/dev/specs/vectorSearch.spec.ts index d6b6688..091d49b 100644 --- a/dev/specs/vectorSearch.spec.ts +++ b/dev/specs/vectorSearch.spec.ts @@ -1,6 +1,6 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' import { type SerializedEditorState } from '@payloadcms/richtext-lexical/lexical' import { buildDummyConfig, DIMS, getInitialMarkdownContent } from './constants.js' @@ -8,6 +8,7 @@ import { BULK_QUEUE_NAMES, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForVectorizationJobs, } from './utils.js' import { getPayload } from 'payload' @@ -190,6 +191,10 @@ describe('Search endpoint integration tests', () => { markdownContent = await getInitialMarkdownContent(config) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('querying a title should return the title', async () => { // This should create multiple embeddings for the title and content const post = await payload.create({ diff --git a/dev/specs/vectorizedPayload.spec.ts b/dev/specs/vectorizedPayload.spec.ts index 9883e3c..65c40ba 100644 --- a/dev/specs/vectorizedPayload.spec.ts +++ b/dev/specs/vectorizedPayload.spec.ts @@ -1,9 +1,13 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { getVectorizedPayload, VectorizedPayload } from '../../src/types.js' import { buildDummyConfig, DIMS, getInitialMarkdownContent } from './constants.js' -import { createTestDb, waitForVectorizationJobs } from './utils.js' +import { + createTestDb, + destroyPayload, + waitForVectorizationJobs, +} from './utils.js' import { getPayload } from 'payload' import { postgresAdapter } from '@payloadcms/db-postgres' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -93,6 +97,10 @@ describe('VectorizedPayload', () => { markdownContent = await getInitialMarkdownContent(config) }) + afterAll(async () => { + await destroyPayload(payload) + }) + describe('getVectorizedPayload', () => { test('returns vectorized payload object for a payload instance with vectorize extensions', () => { const vectorizedPayload = getVectorizedPayload(payload) diff --git a/package.json b/package.json index 556ce39..7641980 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "payloadcms-vectorize", - "version": "0.6.0-beta.4", + "version": "0.6.0-beta.5", "description": "A plugin to vectorize collections for RAG in Payload 3.0", "license": "MIT", "type": "module", @@ -47,9 +47,9 @@ "test:teardown": "docker-compose -f dev/docker-compose.test.yml down", "test": "pnpm test:int && pnpm test:e2e", "test:e2e": "playwright test", - "test:int": "cross-env DOTENV_CONFIG_PATH=dev/.env.test NODE_OPTIONS='--require=dotenv/config --import=tsx' vitest", - "test:adapters:pg": "cross-env DOTENV_CONFIG_PATH=dev/.env.test NODE_OPTIONS='--require=dotenv/config --import=tsx' vitest --config adapters/pg/vitest.config.js", - "test:adapters:cf": "cross-env DOTENV_CONFIG_PATH=dev/.env.test NODE_OPTIONS='--require=dotenv/config --import=tsx' vitest --config adapters/cf/vitest.config.js" + "test:int": "cross-env DOTENV_CONFIG_PATH=dev/.env.test NODE_OPTIONS='--require=dotenv/config --import=tsx --max-old-space-size=8192' vitest", + "test:adapters:pg": "cross-env DOTENV_CONFIG_PATH=dev/.env.test NODE_OPTIONS='--require=dotenv/config --import=tsx --max-old-space-size=8192' vitest --config adapters/pg/vitest.config.js", + "test:adapters:cf": "cross-env DOTENV_CONFIG_PATH=dev/.env.test NODE_OPTIONS='--require=dotenv/config --import=tsx --max-old-space-size=8192' vitest --config adapters/cf/vitest.config.js" }, "devDependencies": { "@changesets/changelog-github": "^0.5.2", diff --git a/src/collections/bulkEmbeddingsBatches.ts b/src/collections/bulkEmbeddingsBatches.ts index e47c18a..43aa02a 100644 --- a/src/collections/bulkEmbeddingsBatches.ts +++ b/src/collections/bulkEmbeddingsBatches.ts @@ -115,6 +115,14 @@ export const createBulkEmbeddingsBatchesCollection = (): CollectionConfig => ({ description: 'Error message if the batch failed', }, }, + { + name: 'failedChunkData', + type: 'json', + admin: { + description: + 'Collection, documentId and chunkIndex for each chunk that failed in this batch', + }, + }, { name: 'retriedBatch', type: 'relationship', diff --git a/src/constants.ts b/src/constants.ts index 21f55cf..aabe244 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -2,4 +2,4 @@ export const TASK_SLUG_VECTORIZE = 'payloadcms-vectorize:vectorize' as const export const TASK_SLUG_PREPARE_BULK_EMBEDDING = 'payloadcms-vectorize:prepare-bulk-embedding' as const export const TASK_SLUG_POLL_OR_COMPLETE_BULK_EMBEDDING = - 'payloadcms-vectorize:poll-or-complete-bulk-embedding' as const + 'payloadcms-vectorize:poll-or-complete-single-batch' as const diff --git a/src/endpoints/retryFailedBatch.ts b/src/endpoints/retryFailedBatch.ts index 2361785..a5a2975 100644 --- a/src/endpoints/retryFailedBatch.ts +++ b/src/endpoints/retryFailedBatch.ts @@ -210,10 +210,10 @@ export async function retryBatch({ task: TASK_SLUG_POLL_OR_COMPLETE_BULK_EMBEDDING, - input: { runId: String(runId) }, + input: { runId: String(runId), batchId: String(newBatch.id) }, ...(queueName ? { queue: queueName } : {}), }) diff --git a/src/index.ts b/src/index.ts index 5c7ed9e..44aca25 100644 --- a/src/index.ts +++ b/src/index.ts @@ -29,7 +29,7 @@ import { } from './collections/bulkEmbeddingsBatches.js' import { createPrepareBulkEmbeddingTask, - createPollOrCompleteBulkEmbeddingTask, + createPollOrCompleteSingleBatchTask, } from './tasks/bulkEmbedAll.js' import { createBulkEmbedHandler, startBulkEmbed } from './endpoints/bulkEmbed.js' import { createRetryFailedBatchHandler, retryBatch } from './endpoints/retryFailedBatch.js' @@ -177,7 +177,7 @@ export default (pluginOptions: PayloadcmsVectorizeConfig) => }) tasks.push(prepareBulkEmbedTask) - const pollOrCompleteBulkEmbedTask = createPollOrCompleteBulkEmbeddingTask({ + const pollOrCompleteBulkEmbedTask = createPollOrCompleteSingleBatchTask({ knowledgePools: pluginOptions.knowledgePools, pollOrCompleteQueueName: pluginOptions.bulkQueueNames?.pollOrCompleteQueueName, adapter: pluginOptions.dbAdapter, diff --git a/src/tasks/bulkEmbedAll.ts b/src/tasks/bulkEmbedAll.ts index f3471d9..4f77883 100644 --- a/src/tasks/bulkEmbedAll.ts +++ b/src/tasks/bulkEmbedAll.ts @@ -1,7 +1,6 @@ import { CollectionSlug, JsonObject, - PaginatedDocs, Payload, TaskConfig, TaskHandlerResult, @@ -14,6 +13,7 @@ import { BulkEmbeddingBatchDoc, BulkEmbeddingInputMetadataDoc, CollectedEmbeddingInput, + CollectionVectorizeOption, KnowledgePoolDynamicConfig, KnowledgePoolName, BulkEmbeddingInput, @@ -32,6 +32,10 @@ import { deleteDocumentEmbeddings } from '../utils/deleteDocumentEmbeddings.js' type PrepareBulkEmbeddingTaskInput = { runId: string + /** If set, this is a per-collection worker job */ + collectionSlug?: string + /** Page within the collection (default: 1) */ + page?: number } type PrepareBulkEmbeddingTaskOutput = { @@ -45,18 +49,20 @@ type PrepareBulkEmbeddingTaskInputOutput = { output: PrepareBulkEmbeddingTaskOutput } -type PollOrCompleteBulkEmbeddingTaskInput = { +type PollOrCompleteSingleBatchTaskInput = { runId: string + batchId: string } -type PollOrCompleteBulkEmbeddingTaskOutput = { +type PollOrCompleteSingleBatchTaskOutput = { runId: string + batchId: string status: string } -type PollOrCompleteBulkEmbeddingTaskInputOutput = { - input: PollOrCompleteBulkEmbeddingTaskInput - output: PollOrCompleteBulkEmbeddingTaskOutput +type PollOrCompleteSingleBatchTaskInputOutput = { + input: PollOrCompleteSingleBatchTaskInput + output: PollOrCompleteSingleBatchTaskOutput } const TERMINAL_STATUSES = new Set(['succeeded', 'failed', 'canceled', 'retried']) @@ -93,12 +99,156 @@ async function loadRunAndConfig({ return { run, poolName, dynamicConfig } } +/** + * Check if all batches for a run are terminal, and if so finalize the run. + * This function is idempotent - safe to call concurrently from multiple per-batch tasks. + */ +async function finalizeRunIfComplete(args: { + payload: Payload + runId: string + poolName: KnowledgePoolName + callbacks: { + onError?: (args: { + providerBatchIds: string[] + error: Error + failedChunkData?: FailedChunkData[] + failedChunkCount?: number + }) => Promise + } +}): Promise<{ finalized: boolean; status?: string }> { + const { payload, runId, poolName, callbacks } = args + + // Check if run is already terminal (prevents double-finalization race) + const currentRun = await payload.findByID({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + id: runId, + }) + if (TERMINAL_STATUSES.has((currentRun as any).status)) { + return { finalized: true, status: (currentRun as any).status } + } + + // Stream through batches page-by-page, aggregating without storing them all in memory + const runIdNum = parseInt(runId, 10) + const PAGE_SIZE = 100 + let page = 1 + let totalBatchCount = 0 + let allTerminal = true + let hasAnySucceeded = false + let allCanceled = true + let totalSucceeded = 0 + let totalFailed = 0 + const allFailedChunkData: FailedChunkData[] = [] + const succeededBatchIds: number[] = [] + const providerBatchIds: string[] = [] + + while (true) { + const result = await payload.find({ + collection: BULK_EMBEDDINGS_BATCHES_SLUG, + where: { run: { equals: runIdNum } }, + limit: PAGE_SIZE, + page, + sort: 'batchIndex', + }) + const docs = (result as any)?.docs || [] + + for (const batch of docs) { + totalBatchCount++ + const status = batch.status as string + providerBatchIds.push(batch.providerBatchId as string) + + if (!TERMINAL_STATUSES.has(status)) allTerminal = false + if (status === 'succeeded') hasAnySucceeded = true + if (status !== 'canceled') allCanceled = false + + if (status === 'succeeded') { + totalSucceeded += batch.succeededCount || 0 + totalFailed += batch.failedCount || 0 + succeededBatchIds.push(parseInt(String(batch.id), 10)) + if (Array.isArray(batch.failedChunkData)) { + allFailedChunkData.push(...batch.failedChunkData) + } + } + } + + const totalPages = (result as any)?.totalPages ?? page + if (page >= totalPages || docs.length === 0) break + page++ + } + + if (totalBatchCount === 0) { + await payload.update({ + id: runId, + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { + status: 'succeeded', + inputs: 0, + succeeded: 0, + failed: 0, + completedAt: new Date().toISOString(), + }, + }) + return { finalized: true, status: 'succeeded' } + } + + if (!allTerminal) { + return { finalized: false } + } + + // All batches are terminal — finalize the run + if (allCanceled) { + await payload.update({ + id: runId, + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { status: 'canceled', completedAt: new Date().toISOString() }, + }) + return { finalized: true, status: 'canceled' } + } + + const runStatus = hasAnySucceeded ? 'succeeded' : 'failed' + + await payload.update({ + id: runId, + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { + status: runStatus, + succeeded: totalSucceeded, + failed: totalFailed, + failedChunkData: allFailedChunkData.length > 0 ? allFailedChunkData : undefined, + completedAt: new Date().toISOString(), + }, + }) + + // Cleanup metadata for succeeded batches only + if (succeededBatchIds.length > 0) { + await payload.delete({ + collection: BULK_EMBEDDINGS_INPUT_METADATA_SLUG, + where: { batch: { in: succeededBatchIds } }, + }) + } + + // Call onError if there were any failures + if (callbacks.onError && (totalFailed > 0 || !hasAnySucceeded)) { + await callbacks.onError({ + providerBatchIds, + error: new Error( + totalFailed > 0 ? `${totalFailed} chunk(s) failed during completion` : 'All batches failed', + ), + failedChunkData: allFailedChunkData.length > 0 ? allFailedChunkData : undefined, + failedChunkCount: totalFailed > 0 ? totalFailed : undefined, + }) + } + + return { finalized: true, status: runStatus } +} + export const createPrepareBulkEmbeddingTask = ({ knowledgePools, pollOrCompleteQueueName, + prepareBulkEmbedQueueName, }: { knowledgePools: Record pollOrCompleteQueueName?: string + prepareBulkEmbedQueueName?: string }): TaskConfig => { const task: TaskConfig = { slug: TASK_SLUG_PREPARE_BULK_EMBEDDING, @@ -110,7 +260,7 @@ export const createPrepareBulkEmbeddingTask = ({ throw new Error('[payloadcms-vectorize] bulk embed runId is required') } const payload = req.payload - const { poolName, dynamicConfig } = await loadRunAndConfig({ + const { run, poolName, dynamicConfig } = await loadRunAndConfig({ payload, runId: input.runId, knowledgePools, @@ -119,7 +269,76 @@ export const createPrepareBulkEmbeddingTask = ({ const callbacks = dynamicConfig.embeddingConfig.bulkEmbeddingsFns! const embeddingVersion = dynamicConfig.embeddingConfig.version - // Find baseline run information + // ============================================= + // COORDINATOR MODE: no collectionSlug in input + // ============================================= + if (!input.collectionSlug) { + // Queue one worker per collection + const collectionSlugs = Object.keys(dynamicConfig.collections) + if (collectionSlugs.length === 0) { + // No collections configured - mark run as succeeded + await payload.update({ + id: input.runId, + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { + status: 'succeeded', + totalBatches: 0, + inputs: 0, + succeeded: 0, + failed: 0, + completedAt: new Date().toISOString(), + }, + }) + return { output: { runId: input.runId, status: 'succeeded', batchCount: 0 } } + } + + for (const collectionSlug of collectionSlugs) { + await payload.jobs.queue<'payloadcms-vectorize:prepare-bulk-embedding'>({ + task: 'payloadcms-vectorize:prepare-bulk-embedding', + input: { runId: input.runId, collectionSlug, page: 1 }, + req, + ...(prepareBulkEmbedQueueName ? { queue: prepareBulkEmbedQueueName } : {}), + }) + } + + // Update run status + await payload.update({ + id: input.runId, + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { + status: 'running', + submittedAt: new Date().toISOString(), + }, + }) + + return { output: { runId: input.runId, status: 'coordinated' } } + } + + // ============================================= + // WORKER MODE: collectionSlug is set + // ============================================= + + // Early exit if run is already terminal + if (TERMINAL_STATUSES.has((run as any).status)) { + return { output: { runId: input.runId, status: (run as any).status } } + } + + const collectionSlug = input.collectionSlug + const collectionConfig = dynamicConfig.collections[collectionSlug] + if (!collectionConfig) { + throw new Error( + `[payloadcms-vectorize] collection "${collectionSlug}" not found in pool "${poolName}"`, + ) + } + + const DEFAULT_BATCH_LIMIT = 1000 + const batchLimit = + collectionConfig.batchLimit && collectionConfig.batchLimit > 0 + ? collectionConfig.batchLimit + : DEFAULT_BATCH_LIMIT + const page = input.page ?? 1 + + // Compute baseline/version for filtering const latestSucceededRun = await payload.find({ collection: BULK_EMBEDDINGS_RUNS_SLUG, where: { @@ -137,23 +356,52 @@ export const createPrepareBulkEmbeddingTask = ({ const baselineVersion: string | undefined = baselineRun?.embeddingVersion const lastBulkCompletedAt: string | undefined = baselineRun?.completedAt const versionMismatch = baselineVersion !== undefined && baselineVersion !== embeddingVersion + const includeAll = versionMismatch || !baselineRun + const lastCompletedAtDate = lastBulkCompletedAt ? new Date(lastBulkCompletedAt) : undefined - // Stream missing embeddings and create batches - let result + // Build where clause for this collection + const where = includeAll + ? undefined + : lastCompletedAtDate + ? { updatedAt: { greater_than: lastCompletedAtDate.toISOString() } } + : undefined + + // STEP 1: Query the page + const queryResult = await payload.find({ + collection: collectionSlug, + where, + limit: batchLimit, + page, + sort: 'id', + }) + + // STEP 2: If there's a next page, queue continuation BEFORE processing + if (queryResult.nextPage) { + await payload.jobs.queue<'payloadcms-vectorize:prepare-bulk-embedding'>({ + task: 'payloadcms-vectorize:prepare-bulk-embedding', + input: { runId: input.runId, collectionSlug, page: queryResult.nextPage }, + req, + ...(prepareBulkEmbedQueueName ? { queue: prepareBulkEmbedQueueName } : {}), + }) + } + + // STEP 3: Process this page's docs + let totalResult: { batchCount: number; totalInputs: number; batchIds: (string | number)[] } try { - result = await streamAndBatchMissingEmbeddings({ + totalResult = await streamAndBatchDocs({ payload, runId: input.runId, poolName, - dynamicConfig, + collectionSlug, + collectionConfig, + docs: (queryResult.docs || []) as Array, embeddingVersion, - lastBulkCompletedAt, - versionMismatch, - hasBaseline: Boolean(baselineRun), + includeAll, + lastCompletedAtDate, addChunk: callbacks.addChunk, }) } catch (error) { - // Ingestion failed (e.g., validation error) - mark run as failed + // Ingestion failed - mark run as failed const errorMessage = (error as Error).message || String(error) await payload.update({ id: input.runId, @@ -164,49 +412,47 @@ export const createPrepareBulkEmbeddingTask = ({ completedAt: new Date().toISOString(), }, }) - // Re-throw so Payload's job system marks the job as failed throw error } - if (result.totalInputs === 0) { - // No inputs to process - mark run as succeeded + // STEP 4: Accumulate counts on run record + if (totalResult.totalInputs > 0) { + const currentRun = await payload.findByID({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + id: input.runId, + }) + const existingInputs = (currentRun as any).inputs ?? 0 + const existingBatches = (currentRun as any).totalBatches ?? 0 await payload.update({ id: input.runId, collection: BULK_EMBEDDINGS_RUNS_SLUG, data: { - status: 'succeeded', - totalBatches: 0, - inputs: 0, - succeeded: 0, - failed: 0, - completedAt: new Date().toISOString(), + totalBatches: existingBatches + totalResult.batchCount, + inputs: existingInputs + totalResult.totalInputs, }, }) - return { output: { runId: input.runId, status: 'succeeded', batchCount: 0 } } } - // Update run with batch count and total inputs - await payload.update({ - id: input.runId, - collection: BULK_EMBEDDINGS_RUNS_SLUG, - data: { - status: 'running', - totalBatches: result.batchCount, - inputs: result.totalInputs, - submittedAt: new Date().toISOString(), - }, - }) + // STEP 5: Queue per-batch polling tasks + for (const batchId of totalResult.batchIds) { + await payload.jobs.queue({ + task: TASK_SLUG_POLL_OR_COMPLETE_BULK_EMBEDDING, + input: { runId: input.runId, batchId: String(batchId) }, + req, + ...(pollOrCompleteQueueName ? { queue: pollOrCompleteQueueName } : {}), + }) + } - // Queue the poll task to monitor all batches - await payload.jobs.queue({ - task: TASK_SLUG_POLL_OR_COMPLETE_BULK_EMBEDDING, - input: { runId: input.runId }, - req, - ...(pollOrCompleteQueueName ? { queue: pollOrCompleteQueueName } : {}), - }) + // If this worker produced 0 batches and has no continuation, try to finalize. + // finalizeRunIfComplete is idempotent: if other workers created batches that + // aren't terminal yet, it returns { finalized: false } and the polling tasks + // will handle finalization later. + if (totalResult.batchCount === 0 && !queryResult.nextPage) { + await finalizeRunIfComplete({ payload, runId: input.runId, poolName, callbacks }) + } return { - output: { runId: input.runId, status: 'prepared', batchCount: result.batchCount }, + output: { runId: input.runId, status: 'prepared', batchCount: totalResult.batchCount }, } }, } @@ -214,7 +460,7 @@ export const createPrepareBulkEmbeddingTask = ({ return task } -export const createPollOrCompleteBulkEmbeddingTask = ({ +export const createPollOrCompleteSingleBatchTask = ({ knowledgePools, pollOrCompleteQueueName, adapter, @@ -222,268 +468,108 @@ export const createPollOrCompleteBulkEmbeddingTask = ({ knowledgePools: Record pollOrCompleteQueueName?: string adapter: DbAdapter -}): TaskConfig => { - const task: TaskConfig = { +}): TaskConfig => { + const task: TaskConfig = { slug: TASK_SLUG_POLL_OR_COMPLETE_BULK_EMBEDDING, handler: async ({ input, req, - }): Promise> => { - if (!input?.runId) { - throw new Error('[payloadcms-vectorize] bulk embed runId is required') + }): Promise> => { + if (!input?.runId || !input?.batchId) { + throw new Error('[payloadcms-vectorize] single batch task requires runId and batchId') } + const { runId, batchId } = input const payload = req.payload const { run, poolName, dynamicConfig } = await loadRunAndConfig({ payload, - runId: input.runId, + runId, knowledgePools, }) const callbacks = dynamicConfig.embeddingConfig.bulkEmbeddingsFns! - // Check if run is already terminal - const currentStatus = run.status - if (TERMINAL_STATUSES.has(currentStatus)) { - return { output: { runId: input.runId, status: currentStatus } } + // Early exit if run is already terminal + if (TERMINAL_STATUSES.has(run.status)) { + return { output: { runId, batchId, status: run.status } } } - // Load all batches for this run with pagination to handle >1000 batches - // Convert runId to number for postgres relationship queries - const runIdNum = parseInt(input.runId, 10) - const batches: BulkEmbeddingBatchDoc[] = [] - let batchPage = 1 - const batchLimit = 100 // Smaller pages for better memory management - - while (true) { - const batchesResult = await payload.find({ - collection: BULK_EMBEDDINGS_BATCHES_SLUG, - where: { run: { equals: runIdNum } }, - limit: batchLimit, - page: batchPage, - sort: 'batchIndex', - }) - const pageDocs = (batchesResult.docs ?? []) as BulkEmbeddingBatchDoc[] - batches.push(...pageDocs) + // Load this specific batch + const batch = (await payload.findByID({ + collection: BULK_EMBEDDINGS_BATCHES_SLUG, + id: batchId, + })) as BulkEmbeddingBatchDoc - const totalPages = batchesResult.totalPages ?? batchPage - if (batchPage >= totalPages || pageDocs.length === 0) break - batchPage++ + // If batch is already terminal, just check if run can be finalized + if (TERMINAL_STATUSES.has((batch as any).status)) { + await finalizeRunIfComplete({ payload, runId, poolName, callbacks }) + return { output: { runId, batchId, status: (batch as any).status } } } - if (batches.length === 0) { - // No batches found - this shouldn't happen but handle gracefully - await payload.update({ - id: input.runId, - collection: BULK_EMBEDDINGS_RUNS_SLUG, - data: { - status: 'failed', - error: 'No batches found for run', - completedAt: new Date().toISOString(), - }, + // Poll and complete this single batch + try { + const completionResult = await pollAndCompleteSingleBatch({ + payload, + runId, + poolName, + batch, + callbacks, + adapter, }) - return { output: { runId: input.runId, status: 'failed' } } - } - - // Poll each non-terminal batch and complete succeeded ones incrementally - let anyRunning = false - let totalSucceeded = 0 - let totalFailed = 0 - const allFailedChunkData: FailedChunkData[] = [] - const batchStatuses = new Map() // Track batch statuses as we process - - // Initialize with current statuses - for (const batch of batches) { - batchStatuses.set(String(batch.id), batch.status) - // Accumulate counts from already completed batches - if (TERMINAL_STATUSES.has(batch.status)) { - if (batch.status === 'succeeded') { - totalSucceeded += batch.succeededCount || 0 - totalFailed += batch.failedCount || 0 - } - } - } - - for (const batch of batches) { - const batchStatus = batchStatuses.get(String(batch.id)) - if (!batchStatus) continue - - // Skip batches that are already completed - if (TERMINAL_STATUSES.has(batchStatus)) { - continue - } - - // Poll batch and complete if succeeded (streams embeddings via onChunk callback) - try { - const completionResult = await pollAndCompleteSingleBatch({ - payload, - runId: input.runId, - poolName, - batch, - callbacks, - adapter, - }) - - // Update batch status and counts - await payload.update({ - id: batch.id, - collection: BULK_EMBEDDINGS_BATCHES_SLUG, - data: { - status: completionResult.status, - error: completionResult.error, - ...(TERMINAL_STATUSES.has(completionResult.status) - ? { completedAt: new Date().toISOString() } - : {}), - ...(completionResult.status === 'succeeded' - ? { - succeededCount: completionResult.succeededCount, - failedCount: completionResult.failedCount, - } - : {}), - }, - }) - - // Track the new status - batchStatuses.set(String(batch.id), completionResult.status) - - // Accumulate counts from newly succeeded batches - if (completionResult.status === 'succeeded') { - totalSucceeded += completionResult.succeededCount - totalFailed += completionResult.failedCount - allFailedChunkData.push(...completionResult.failedChunkData) - } - - // Track if still running (queued or running) - if (completionResult.status === 'queued' || completionResult.status === 'running') { - anyRunning = true - } - // Failed/canceled batches - leave them, can be re-run later - } catch (error) { - // Completion failed - mark batch as failed - const errorMessage = (error as Error).message || String(error) - await payload.update({ - id: batch.id, - collection: BULK_EMBEDDINGS_BATCHES_SLUG, - data: { - status: 'failed', - error: `Completion failed: ${errorMessage}`, - completedAt: new Date().toISOString(), - }, - }) - batchStatuses.set(String(batch.id), 'failed') - } - } - - // Check if all batches are complete - const allBatchesComplete = Array.from(batchStatuses.values()).every((status) => - TERMINAL_STATUSES.has(status), - ) - - if (allBatchesComplete) { - // All batches are done - finalize the run - const hasAnySucceeded = Array.from(batchStatuses.values()).some( - (status) => status === 'succeeded', - ) + // Update batch status and counts await payload.update({ - id: input.runId, - collection: BULK_EMBEDDINGS_RUNS_SLUG, + id: batchId, + collection: BULK_EMBEDDINGS_BATCHES_SLUG, data: { - status: hasAnySucceeded ? 'succeeded' : 'failed', - succeeded: totalSucceeded, - failed: totalFailed, - failedChunkData: allFailedChunkData.length > 0 ? allFailedChunkData : undefined, - completedAt: new Date().toISOString(), + status: completionResult.status, + error: completionResult.error, + ...(TERMINAL_STATUSES.has(completionResult.status) + ? { completedAt: new Date().toISOString() } + : {}), + ...(completionResult.status === 'succeeded' + ? { + succeededCount: completionResult.succeededCount, + failedCount: completionResult.failedCount, + failedChunkData: + completionResult.failedChunkData.length > 0 + ? completionResult.failedChunkData + : undefined, + } + : {}), }, }) - // Cleanup metadata for succeeded batches only - // Keep metadata for failed batches to allow retry functionality - const succeededBatchIds = Array.from(batchStatuses.entries()) - .filter(([_, status]) => status === 'succeeded') - .map(([id, _]) => parseInt(id, 10)) - - if (succeededBatchIds.length > 0) { - await payload.delete({ - collection: BULK_EMBEDDINGS_INPUT_METADATA_SLUG, - where: { batch: { in: succeededBatchIds } }, - }) - } - - // Call onError if there were any failures - if (callbacks.onError && (totalFailed > 0 || !hasAnySucceeded)) { - const providerBatchIds = batches.map((b) => b.providerBatchId) - await callbacks.onError({ - providerBatchIds, - error: new Error( - totalFailed > 0 - ? `${totalFailed} chunk(s) failed during completion` - : 'All batches failed', - ), - failedChunkData: allFailedChunkData.length > 0 ? allFailedChunkData : undefined, - failedChunkCount: totalFailed > 0 ? totalFailed : undefined, - }) + // If batch is now terminal, check if run should be finalized + if (TERMINAL_STATUSES.has(completionResult.status)) { + await finalizeRunIfComplete({ payload, runId, poolName, callbacks }) + return { output: { runId, batchId, status: completionResult.status } } } - return { - output: { - runId: input.runId, - status: hasAnySucceeded ? 'succeeded' : 'failed', - }, - } - } - - // If still running, requeue this task - if (anyRunning) { + // Still running - re-queue self with polling delay await payload.jobs.queue({ task: TASK_SLUG_POLL_OR_COMPLETE_BULK_EMBEDDING, - input: { runId: input.runId }, + input: { runId, batchId }, req, ...(pollOrCompleteQueueName ? { queue: pollOrCompleteQueueName } : {}), }) - return { output: { runId: input.runId, status: 'polling' } } - } - // Edge case: allBatchesComplete is false but anyRunning is false - // This happens when all batches are in 'canceled' or 'failed' status but we didn't detect it above - // Check if all batches are canceled - const allCanceled = Array.from(batchStatuses.values()).every( - (status) => status === 'canceled', - ) - - if (allCanceled) { + return { output: { runId, batchId, status: completionResult.status } } + } catch (error) { + // Batch processing failed - mark batch as failed + const errorMessage = (error as Error).message || String(error) await payload.update({ - id: input.runId, - collection: BULK_EMBEDDINGS_RUNS_SLUG, + id: batchId, + collection: BULK_EMBEDDINGS_BATCHES_SLUG, data: { - status: 'canceled', + status: 'failed', + error: `Completion failed: ${errorMessage}`, completedAt: new Date().toISOString(), }, }) - return { output: { runId: input.runId, status: 'canceled' } } + // Check if this was the last batch to complete + await finalizeRunIfComplete({ payload, runId, poolName, callbacks }) + return { output: { runId, batchId, status: 'failed' } } } - - // Fallback: mark as failed with diagnostic info - const statusCounts = Array.from(batchStatuses.values()).reduce( - (acc, status) => { - acc[status] = (acc[status] || 0) + 1 - return acc - }, - {} as Record, - ) - payload.logger.warn( - `[payloadcms-vectorize] Run ${input.runId} reached unexpected state. Batch statuses: ${JSON.stringify(statusCounts)}`, - ) - - await payload.update({ - id: input.runId, - collection: BULK_EMBEDDINGS_RUNS_SLUG, - data: { - status: 'failed', - error: `Run reached unexpected state. Batch statuses: ${JSON.stringify(statusCounts)}`, - completedAt: new Date().toISOString(), - }, - }) - return { output: { runId: input.runId, status: 'failed' } } }, } @@ -491,133 +577,105 @@ export const createPollOrCompleteBulkEmbeddingTask = ({ } /** - * Stream through missing embeddings, calling addChunk for each. + * Process pre-fetched docs from a single collection, calling addChunk for each chunk. * User controls batching via addChunk return value. * * Single-pass approach using async generator to yield chunks sequentially. * This avoids the need for a pre-counting pass while correctly determining isLastChunk. */ -async function streamAndBatchMissingEmbeddings(args: { +async function streamAndBatchDocs(args: { payload: Payload runId: string poolName: KnowledgePoolName - dynamicConfig: KnowledgePoolDynamicConfig + collectionSlug: string + collectionConfig: CollectionVectorizeOption + docs: Array embeddingVersion: string - lastBulkCompletedAt?: string - versionMismatch: boolean - hasBaseline: boolean + includeAll: boolean + lastCompletedAtDate?: Date addChunk: (args: { chunk: BulkEmbeddingInput isLastChunk: boolean }) => Promise -}): Promise<{ batchCount: number; totalInputs: number }> { +}): Promise<{ batchCount: number; totalInputs: number; batchIds: (string | number)[] }> { const { payload, runId, poolName, - dynamicConfig, + collectionSlug, + collectionConfig, + docs, embeddingVersion, - lastBulkCompletedAt, - versionMismatch, - hasBaseline, + includeAll, + lastCompletedAtDate, addChunk, } = args - const includeAll = versionMismatch || !hasBaseline - const lastCompletedAtDate = lastBulkCompletedAt ? new Date(lastBulkCompletedAt) : undefined - const collectionSlugs = Object.keys(dynamicConfig.collections) - - // Async generator that yields chunks one at a time + // Async generator that yields chunks one at a time from pre-fetched docs async function* generateChunks(): AsyncGenerator { - for (const collectionSlug of collectionSlugs) { - const collectionConfig = dynamicConfig.collections[collectionSlug] - if (!collectionConfig) continue + const toKnowledgePool = collectionConfig.toKnowledgePool - const toKnowledgePool = collectionConfig.toKnowledgePool - const limit = 50 + for (const doc of docs) { + // If !includeAll, we still need to check if document has current embedding + // (can't filter this in the where clause since it's a cross-collection check) + if (!includeAll && !lastCompletedAtDate) { + const hasCurrentEmbedding = await docHasEmbeddingVersion({ + payload, + poolName, + sourceCollection: collectionSlug, + docId: String(doc.id), + embeddingVersion, + }) + if (hasCurrentEmbedding) continue + } - // Build where clause: filter by updatedAt if we have lastBulkCompletedAt and !includeAll - const where = includeAll - ? undefined - : lastCompletedAtDate - ? { - updatedAt: { - greater_than: lastCompletedAtDate.toISOString(), - }, - } - : undefined + // Check if document should be embedded + if (collectionConfig.shouldEmbedFn) { + const shouldEmbed = await collectionConfig.shouldEmbedFn(doc, payload) + if (!shouldEmbed) continue + } - let res: PaginatedDocs | undefined = await payload.find({ - collection: collectionSlug, - where, - limit, - }) - do { - const docs = res?.docs || [] - if (!docs.length) break - - for (const doc of docs) { - // If !includeAll, we still need to check if document has current embedding - // (can't filter this in the where clause since it's a cross-collection check) - if (!includeAll && !lastCompletedAtDate) { - const hasCurrentEmbedding = await docHasEmbeddingVersion({ - payload, - poolName, - sourceCollection: collectionSlug, - docId: String(doc.id), - embeddingVersion, - }) - if (hasCurrentEmbedding) continue - } - - // Check if document should be embedded - if (collectionConfig.shouldEmbedFn) { - const shouldEmbed = await collectionConfig.shouldEmbedFn(doc, payload) - if (!shouldEmbed) continue - } - - const chunkData = await toKnowledgePool(doc, payload) - - validateChunkData(chunkData, String(doc.id), collectionSlug) - - // Yield valid chunks - for (let idx = 0; idx < chunkData.length; idx++) { - const chunkEntry = chunkData[idx] - const { chunk, ...extensionFields } = chunkEntry - - yield { - id: `${collectionSlug}:${doc.id}:${idx}`, - text: chunk, - metadata: { - sourceCollection: collectionSlug, - docId: String(doc.id), - chunkIndex: idx, - embeddingVersion, - extensionFields, - }, - } - } + const chunkData = await toKnowledgePool(doc, payload) + + validateChunkData(chunkData, String(doc.id), collectionSlug) + + // Yield valid chunks + for (let idx = 0; idx < chunkData.length; idx++) { + const chunkEntry = chunkData[idx] + const { chunk, ...extensionFields } = chunkEntry + + yield { + id: `${collectionSlug}:${doc.id}:${idx}`, + text: chunk, + metadata: { + sourceCollection: collectionSlug, + docId: String(doc.id), + chunkIndex: idx, + embeddingVersion, + extensionFields, + }, } - } while ( - (res = res.nextPage - ? await payload.find({ - collection: collectionSlug, - where, - limit, - page: res.nextPage, - }) - : undefined) - ) + } } } + // Determine starting batchIndex from existing batches for this run + const runIdNum = parseInt(runId, 10) + const maxBatchResult = await payload.find({ + collection: BULK_EMBEDDINGS_BATCHES_SLUG, + where: { run: { equals: runIdNum } }, + sort: '-batchIndex', + limit: 1, + }) + let batchIndex = + maxBatchResult.docs.length > 0 ? ((maxBatchResult.docs[0] as any).batchIndex ?? 0) + 1 : 0 + // Process chunks from generator - let batchIndex = 0 let totalInputs = 0 const pendingChunks: CollectedEmbeddingInput[] = [] const chunkIterator = generateChunks() - const runIdNum = parseInt(runId, 10) let currentBatchId: number | undefined = undefined + const batchIds: (string | number)[] = [] async function processChunk( chunk: CollectedEmbeddingInput, @@ -687,12 +745,13 @@ async function streamAndBatchMissingEmbeddings(args: { }) totalInputs += inputCount + batchIds.push(currentBatchId) batchIndex++ currentBatchId = undefined // Reset for next batch } } - // Process chunks from generator + // Process chunks from generator using look-ahead for isLastChunk let prevChunk: CollectedEmbeddingInput | undefined = undefined for await (const currentChunk of chunkIterator) { if (prevChunk) { @@ -704,7 +763,7 @@ async function streamAndBatchMissingEmbeddings(args: { await processChunk(prevChunk, true) } - return { batchCount: batchIndex, totalInputs } + return { batchCount: batchIds.length, totalInputs, batchIds } } /** diff --git a/src/types.ts b/src/types.ts index 57ec48f..0e41c98 100644 --- a/src/types.ts +++ b/src/types.ts @@ -107,6 +107,10 @@ export type CollectionVectorizeOption = { shouldEmbedFn?: ShouldEmbedFn /** Function that converts a document to an array of chunks with optional extension field values */ toKnowledgePool: ToKnowledgePoolFn + /** Max documents to fetch from this collection per prepare job. + * Each page of results becomes a separate job linked to the next. + * Defaults to 1000 if not set. */ + batchLimit?: number } /** Knowledge pool name identifier */ @@ -183,7 +187,8 @@ export type PollBulkEmbeddingsResult = { export type AddChunkArgs = { /** The chunk to add */ chunk: BulkEmbeddingInput - /** True if this is the last chunk in the run */ + /** True if this is the last chunk in this job (forces flush). + * Note: may not be the last chunk in the entire run if batchLimit continuations are used. */ isLastChunk: boolean } @@ -239,8 +244,9 @@ export type BulkEmbeddingsFns = { * of them were submitted when you return a BatchSubmission. * * **About `isLastChunk`:** - * - `isLastChunk=true` indicates this is the final chunk in the run - * - Use this to flush any remaining accumulated chunks before the run completes + * - `isLastChunk=true` indicates this is the final chunk in this job + * - Use this to flush any remaining accumulated chunks before the job completes + * - When `batchLimit` is set, each job's last chunk gets `isLastChunk=true` (not just the run's last chunk) * - The plugin uses this only to know when to stop iterating, not to determine which chunks were submitted * * **Example flow when chunk would exceed limit:** diff --git a/vitest.config.js b/vitest.config.js index 5cac158..ad613db 100644 --- a/vitest.config.js +++ b/vitest.config.js @@ -22,12 +22,11 @@ export default defineConfig(() => { testTimeout: 30_000, include: ['dev/specs/**/*.spec.ts'], exclude: ['**/e2e.spec.{ts,js}', '**/node_modules/**'], - // Run test files sequentially to avoid global state interference - // (embeddingsTables map and Payload instance caching) + // Each test file gets its own forked process so memory is fully + // reclaimed between files (prevents OOM on CI). + pool: 'forks', + // Run test files sequentially to avoid DB / global-state interference. fileParallelism: false, - // Disable parallel test execution within files as well - //threads: false, - //maxConcurrency: 1, }, } })