From da93322d0de1b3a2f98d521fd177756255615b77 Mon Sep 17 00:00:00 2001 From: techiejd <62455039+techiejd@users.noreply.github.com> Date: Fri, 20 Feb 2026 11:59:01 +0700 Subject: [PATCH 01/10] adds should embed (#38) * adds should embed * Ups version to get ready for release --- CHANGELOG.md | 6 ++ README.md | 14 ++- dev/specs/bulkEmbed/shouldEmbedFn.spec.ts | 104 +++++++++++++++++++++ dev/specs/shouldEmbedFn.spec.ts | 106 ++++++++++++++++++++++ package.json | 2 +- src/index.ts | 7 ++ src/tasks/bulkEmbedAll.ts | 6 ++ src/types.ts | 9 ++ 8 files changed, 252 insertions(+), 2 deletions(-) create mode 100644 dev/specs/bulkEmbed/shouldEmbedFn.spec.ts create mode 100644 dev/specs/shouldEmbedFn.spec.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index a9f21d2..986df6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. +## 0.5.4 - 2026-02-20 + +### Added + +- **`shouldEmbedFn` filter**: Optional function on `CollectionVectorizeOption` that runs before a document is queued for embedding. Return `false` to skip the document entirely — no job is created and `toKnowledgePool` is never called. Works for both real-time and bulk embedding. Useful for skipping drafts, archived documents, or any custom criteria. + ## 0.5.3 - 2026-01-24 ### Changed diff --git a/README.md b/README.md index 110da8d..3cd6689 100644 --- a/README.md +++ b/README.md @@ -273,7 +273,7 @@ The embeddings collection name will be the same as the knowledge pool name. **2. Dynamic Config** (passed to `payloadcmsVectorize`): -- `collections`: `Record` - Collections and their chunking configs +- `collections`: `Record` - Collections and their configs (optional `shouldEmbedFn` filter + required `toKnowledgePool` chunker) - `extensionFields?`: `Field[]` - Optional fields to extend the embeddings collection schema - `embeddingConfig`: Embedding configuration object: - `version`: `string` - Version string for tracking model changes @@ -622,10 +622,22 @@ curl -X POST http://localhost:3000/api/vector-retry-failed-batch \ #### CollectionVectorizeOption +- `shouldEmbedFn? (doc, payload)` – optional filter that runs **before** the document is queued for embedding. Return `false` to skip the document entirely (no job is created and `toKnowledgePool` is never called). Works for both real-time and bulk embedding. Defaults to embedding all documents when omitted. - `toKnowledgePool (doc, payload)` – return an array of `{ chunk, ...extensionFieldValues }`. Each object becomes one embedding row and the index in the array determines `chunkIndex`. Reserved column names: `sourceCollection`, `docId`, `chunkIndex`, `chunkText`, `embeddingVersion`. Avoid reusing them in `extensionFields`. +**Example – skip draft documents:** + +```typescript +collections: { + posts: { + shouldEmbedFn: async (doc) => doc._status === 'published', + toKnowledgePool: postsToKnowledgePool, + }, +} +``` + ## Chunkers Use chunker helpers (see `dev/helpers/chunkers.ts`) to keep `toKnowledgePool` implementations focused on orchestration. A `toKnowledgePool` can combine multiple chunkers, enrich each chunk with metadata, and return everything the embeddings collection needs. diff --git a/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts b/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts new file mode 100644 index 0000000..ebbda47 --- /dev/null +++ b/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts @@ -0,0 +1,104 @@ +import type { Payload, SanitizedConfig } from 'payload' +import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest' +import { + BULK_QUEUE_NAMES, + DEFAULT_DIMS, + buildPayloadWithIntegration, + clearAllCollections, + createMockBulkEmbeddings, + createTestDb, + waitForBulkJobs, +} from '../utils.js' +import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' +import { getVectorizedPayload, VectorizedPayload } from 'payloadcms-vectorize' +import { expectGoodResult } from '../utils.vitest.js' + +const DIMS = DEFAULT_DIMS +const dbName = `bulk_should_embed_fn_${Date.now()}` + +const basePluginOptions = { + knowledgePools: { + default: { + collections: { + posts: { + shouldEmbedFn: async (doc: any) => !doc.title?.startsWith('SKIP'), + toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], + }, + }, + embeddingConfig: { + version: testEmbeddingVersion, + queryFn: makeDummyEmbedQuery(DIMS), + bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), + }, + }, + }, + bulkQueueNames: BULK_QUEUE_NAMES, +} + +describe('Bulk embed - shouldEmbedFn', () => { + let payload: Payload + let config: SanitizedConfig + let vectorizedPayload: VectorizedPayload | null = null + + beforeAll(async () => { + await createTestDb({ dbName }) + const built = await buildPayloadWithIntegration({ + dbName, + pluginOpts: basePluginOptions, + key: `bulk-should-embed-${Date.now()}`, + }) + payload = built.payload + config = built.config + vectorizedPayload = getVectorizedPayload(payload) + }) + + beforeEach(async () => { + await clearAllCollections(payload) + }) + + afterEach(async () => { + vi.restoreAllMocks() + }) + + test('filtered-out document is not embedded during bulk run', async () => { + await payload.create({ collection: 'posts', data: { title: 'SKIP me' } as any }) + const embeddedPost = await payload.create({ + collection: 'posts', + data: { title: 'Embed me' } as any, + }) + + const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' }) + expectGoodResult(result) + + await waitForBulkJobs(payload) + + // Only the allowed post should have embeddings + const allEmbeddings = await payload.find({ + collection: 'default', + where: { sourceCollection: { equals: 'posts' } }, + }) + expect(allEmbeddings.totalDocs).toBe(1) + expect(allEmbeddings.docs[0]).toHaveProperty('docId', String(embeddedPost.id)) + }) + + test('multiple filtered-out documents produce no embeddings while allowed ones do', async () => { + await payload.create({ collection: 'posts', data: { title: 'SKIP first' } as any }) + await payload.create({ collection: 'posts', data: { title: 'SKIP second' } as any }) + const allowedPost = await payload.create({ + collection: 'posts', + data: { title: 'Allowed post' } as any, + }) + + const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' }) + expectGoodResult(result) + + await waitForBulkJobs(payload) + + const allEmbeddings = await payload.find({ + collection: 'default', + where: { sourceCollection: { equals: 'posts' } }, + }) + expect(allEmbeddings.totalDocs).toBe(1) + expect(allEmbeddings.docs[0]).toHaveProperty('docId', String(allowedPost.id)) + }) +}) diff --git a/dev/specs/shouldEmbedFn.spec.ts b/dev/specs/shouldEmbedFn.spec.ts new file mode 100644 index 0000000..acb730e --- /dev/null +++ b/dev/specs/shouldEmbedFn.spec.ts @@ -0,0 +1,106 @@ +import type { Payload, SanitizedConfig } from 'payload' +import { beforeAll, describe, expect, test } from 'vitest' +import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' +import { DIMS } from './constants.js' +import { createTestDb, waitForVectorizationJobs } from './utils.js' +import { getPayload } from 'payload' +import { postgresAdapter } from '@payloadcms/db-postgres' +import { buildConfig } from 'payload' +import { lexicalEditor } from '@payloadcms/richtext-lexical' +import { createVectorizeIntegration } from 'payloadcms-vectorize' + +const embeddingsCollection = 'default' + +describe('shouldEmbedFn - real-time', () => { + let payload: Payload + let config: SanitizedConfig + const dbName = `should_embed_fn_rt_${Date.now()}` + + beforeAll(async () => { + await createTestDb({ dbName }) + + const integration = createVectorizeIntegration({ + default: { + dims: DIMS, + ivfflatLists: 1, + }, + }) + + config = await buildConfig({ + secret: process.env.PAYLOAD_SECRET || 'test-secret', + editor: lexicalEditor(), + collections: [ + { + slug: 'posts', + fields: [{ name: 'title', type: 'text' }], + }, + ], + db: postgresAdapter({ + extensions: ['vector'], + afterSchemaInit: [integration.afterSchemaInitHook], + pool: { + connectionString: `postgresql://postgres:password@localhost:5433/${dbName}`, + }, + }), + plugins: [ + integration.payloadcmsVectorize({ + knowledgePools: { + default: { + collections: { + posts: { + shouldEmbedFn: async (doc) => !doc.title?.startsWith('SKIP'), + toKnowledgePool: async (doc) => [{ chunk: doc.title }], + }, + }, + embeddingConfig: { + version: testEmbeddingVersion, + queryFn: makeDummyEmbedQuery(DIMS), + realTimeIngestionFn: makeDummyEmbedDocs(DIMS), + }, + }, + }, + }), + ], + jobs: { + tasks: [], + autoRun: [ + { + cron: '*/5 * * * * *', + limit: 10, + }, + ], + }, + }) + + payload = await getPayload({ + config, + key: `should-embed-fn-rt-${Date.now()}`, + cron: true, + }) + }) + + test('shouldEmbedFn filters documents on real-time create', async () => { + const skippedPost = await payload.create({ + collection: 'posts', + data: { title: 'SKIP this post' } as any, + }) + const allowedPost = await payload.create({ + collection: 'posts', + data: { title: 'Embed this post' } as any, + }) + + await waitForVectorizationJobs(payload) + + const skippedEmbeddings = await payload.find({ + collection: embeddingsCollection, + where: { docId: { equals: String(skippedPost.id) } }, + }) + expect(skippedEmbeddings.docs.length).toBe(0) + + const allowedEmbeddings = await payload.find({ + collection: embeddingsCollection, + where: { docId: { equals: String(allowedPost.id) } }, + }) + expect(allowedEmbeddings.docs.length).toBeGreaterThan(0) + }) +}) diff --git a/package.json b/package.json index b217ac0..6710be3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "payloadcms-vectorize", - "version": "0.5.3", + "version": "0.5.4", "description": "A plugin to vectorize collections for RAG in Payload 3.0", "license": "MIT", "type": "module", diff --git a/src/index.ts b/src/index.ts index 2f4a6c9..71a49b4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -53,6 +53,7 @@ export type { EmbeddingConfig, // CollectionVectorizeOption + ShouldEmbedFn, ToKnowledgePoolFn, // EmbeddingConfig @@ -276,6 +277,12 @@ export const createVectorizeIntegration = const collectionConfig = dynamic.collections[collectionSlug] if (!collectionConfig) continue + // Check if document should be embedded + if (collectionConfig.shouldEmbedFn) { + const shouldEmbed = await collectionConfig.shouldEmbedFn(doc, payload) + if (!shouldEmbed) continue + } + // Only queue real-time vectorization if realTimeIngestionFn is provided if (!dynamic.embeddingConfig.realTimeIngestionFn) continue // If no realTimeIngestionFn, nothing happens on doc change diff --git a/src/tasks/bulkEmbedAll.ts b/src/tasks/bulkEmbedAll.ts index 973371c..55ba517 100644 --- a/src/tasks/bulkEmbedAll.ts +++ b/src/tasks/bulkEmbedAll.ts @@ -565,6 +565,12 @@ async function streamAndBatchMissingEmbeddings(args: { if (hasCurrentEmbedding) continue } + // Check if document should be embedded + if (collectionConfig.shouldEmbedFn) { + const shouldEmbed = await collectionConfig.shouldEmbedFn(doc, payload) + if (!shouldEmbed) continue + } + const chunkData = await toKnowledgePool(doc, payload) // Validate chunks (same validation as real-time ingestion) diff --git a/src/types.ts b/src/types.ts index 9c48a29..b517b87 100644 --- a/src/types.ts +++ b/src/types.ts @@ -89,7 +89,16 @@ export type ToKnowledgePoolFn = ( payload: Payload, ) => Promise> +export type ShouldEmbedFn = ( + doc: Record, + payload: Payload, +) => Promise | boolean + export type CollectionVectorizeOption = { + /** Optional filter: return false to skip embedding this document. + * For bulk embeddings, runs before job is queued. + * If undefined, defaults to embedding all documents. */ + shouldEmbedFn?: ShouldEmbedFn /** Function that converts a document to an array of chunks with optional extension field values */ toKnowledgePool: ToKnowledgePoolFn } From 82ac6a1ee89778fe58fffdb8c8b5814dfcd60cf5 Mon Sep 17 00:00:00 2001 From: techiejd <62455039+techiejd@users.noreply.github.com> Date: Wed, 25 Feb 2026 20:11:16 +0700 Subject: [PATCH 02/10] splits the job into one per batch (#41) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * splits the job into one per batch * fix: remove waitUntil delay and persist failedChunkData on batch records - Remove 30s waitUntil delay from per-batch task re-queue (was causing test timeouts since the original code had no such delay) - Add failedChunkData JSON field to batch collection so per-batch tasks can store chunk-level failure data independently - Aggregate failedChunkData from batch records in finalizeRunIfComplete() instead of relying on in-memory accumulation from the old single-task flow Co-Authored-By: Claude Opus 4.6 * feat: add batchLimit to CollectionVectorizeOption with coordinator/worker architecture Splits prepare-bulk-embedding into coordinator + per-collection workers. Each worker processes one page of one collection, queuing a continuation job before processing to ensure crash safety. Default batchLimit is 1000 when not explicitly set. Co-Authored-By: Claude Opus 4.6 * fix: rewrite batchLimit test 2 to reuse same Payload instance The second test was creating a separate Payload instance sharing the same DB and job queues, causing two crons to compete for jobs. This led to double-execution and mock state inconsistency (expected 4 to be 2). Now both tests use the single beforeAll instance with cleanup between. Co-Authored-By: Claude Opus 4.6 * fix: add payload.destroy() in afterAll to prevent OOM from leaked crons Every test file that creates a Payload instance now calls payload.destroy() in afterAll (or try/finally for in-test instances). This stops background cron jobs from accumulating across tests, which was causing heap exhaustion in CI. Co-Authored-By: Claude Opus 4.6 * Trying to not destroy our heap * Runs tests in parallel now that each test gets its own db * WIP * fix: fix OOM, polling test assertions, and add diagnostic logging - Add --max-old-space-size=8192 to test:int NODE_OPTIONS (cross-env was overriding the CI env var, so the heap limit never took effect) - Fix polling.spec.ts queueSpy assertions: coordinator/worker adds an extra queue call, so poll-or-complete-single-batch is now call 3 and 4 instead of 2 and 3 - Add extensive [vectorize-debug] console.log throughout task handlers (coordinator, worker, poll-single, finalize, streamAndBatchDocs) to diagnose any remaining CI hangs - Remove redundant NODE_OPTIONS from CI workflow (now in the script) Co-Authored-By: Claude Opus 4.6 * refactor: remove poll-or-complete-bulk-embedding task and aggregate incrementally Remove the backward-compatible fan-out task since the per-batch architecture hasn't been released yet. Refactor finalizeRunIfComplete to aggregate batch counts incrementally during pagination instead of collecting all batch objects into memory. Co-Authored-By: Claude Opus 4.6 * chore: bump to 0.5.5, update changelog, remove debug logging - Bump version 0.5.4 → 0.5.5 - Add 0.5.5 entry to CHANGELOG.md (coordinator/worker, batchLimit, per-batch polling) - Document batchLimit in README CollectionVectorizeOption section - Remove all diagnostic console.log statements from bulkEmbedAll.ts Co-Authored-By: Claude Opus 4.6 * Adds upgrade note --------- Co-authored-by: Claude Opus 4.6 --- CHANGELOG.md | 20 + README.md | 5 +- dev/specs/bulkEmbed/basic.spec.ts | 7 +- dev/specs/bulkEmbed/batchLimit.spec.ts | 115 +++ dev/specs/bulkEmbed/canceledBatch.spec.ts | 7 +- dev/specs/bulkEmbed/concurrentRuns.spec.ts | 7 +- dev/specs/bulkEmbed/extensionFields.spec.ts | 7 +- dev/specs/bulkEmbed/failedBatch.spec.ts | 7 +- dev/specs/bulkEmbed/ingestionFailure.spec.ts | 7 +- dev/specs/bulkEmbed/multipleBatches.spec.ts | 7 +- dev/specs/bulkEmbed/multipleChunks.spec.ts | 7 +- dev/specs/bulkEmbed/onError.spec.ts | 7 +- dev/specs/bulkEmbed/partialFailure.spec.ts | 7 +- .../bulkEmbed/partialFailureNoFail.spec.ts | 7 +- dev/specs/bulkEmbed/polling.spec.ts | 15 +- dev/specs/bulkEmbed/realtimeMode.spec.ts | 7 +- dev/specs/bulkEmbed/shouldEmbedFn.spec.ts | 7 +- dev/specs/bulkEmbed/versionBump.spec.ts | 14 +- dev/specs/chunkers.spec.ts | 47 +- dev/specs/config.spec.ts | 2 +- dev/specs/e2e.spec.ts | 6 +- dev/specs/extensionFields.spec.ts | 7 +- dev/specs/extensionFieldsVectorSearch.spec.ts | 93 +- dev/specs/failedValidation.spec.ts | 51 +- dev/specs/int.spec.ts | 7 +- dev/specs/migrationCli.spec.ts | 14 +- dev/specs/multipools.spec.ts | 8 +- dev/specs/queueName.spec.ts | 9 +- dev/specs/schemaName.spec.ts | 7 +- dev/specs/shouldEmbedFn.spec.ts | 8 +- dev/specs/utils.ts | 7 +- dev/specs/vectorSearch.spec.ts | 7 +- dev/specs/vectorizedPayload.spec.ts | 7 +- package.json | 4 +- src/collections/bulkEmbeddingsBatches.ts | 8 + src/endpoints/retryFailedBatch.ts | 8 +- src/index.ts | 7 +- src/tasks/bulkEmbedAll.ts | 834 ++++++++++-------- src/types.ts | 12 +- vitest.config.js | 9 +- 40 files changed, 893 insertions(+), 529 deletions(-) create mode 100644 dev/specs/bulkEmbed/batchLimit.spec.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 986df6a..b32c453 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,26 @@ All notable changes to this project will be documented in this file. +## 0.5.5 - 2026-02-24 + +### Added + +- **`batchLimit` option on `CollectionVectorizeOption`** – limits the number of documents fetched per bulk-embed worker job. When set, each page of results queues a continuation job for the next page, preventing serverless time-limit issues on large collections. Defaults to 1000. + +### Changed + +- **Coordinator / worker architecture for `prepare-bulk-embedding`** – the initial job now acts as a coordinator that fans out one worker job per collection. Each worker processes a single page of documents, making bulk embedding parallelizable and more resilient to timeouts. +- **Per-batch polling via `poll-or-complete-single-batch`** – replaced the monolithic `poll-or-complete-bulk-embedding` task. Each provider batch now has its own polling job, improving observability and reducing memory usage. +- **Memory-efficient incremental aggregation** – `finalizeRunIfComplete` now scans batch records page-by-page instead of loading all batches into memory at once. + +### Removed + +- `poll-or-complete-bulk-embedding` task (replaced by `poll-or-complete-single-batch`). + +### Upgrade Notes + +- **Ensure no bulk embedding run is in progress when upgrading.** The `poll-or-complete-bulk-embedding` task has been removed and replaced by `poll-or-complete-single-batch`. Any in-flight bulk run that still has pending `poll-or-complete-bulk-embedding` jobs will fail because the task slug no longer exists. Wait for all active runs to complete (or cancel them) before deploying this version. + ## 0.5.4 - 2026-02-20 ### Added diff --git a/README.md b/README.md index 3cd6689..e236dd9 100644 --- a/README.md +++ b/README.md @@ -436,8 +436,8 @@ type OnBulkErrorArgs = { The plugin uses separate Payload jobs for reliability with long-running providers: -- **`prepare-bulk-embedding`**: Streams through documents, calls your `addChunk` for each chunk, creates batch records. -- **`poll-or-complete-bulk-embedding`**: Polls all batches, requeues itself until done, then writes all successful embeddings (partial chunk failures are allowed). +- **`prepare-bulk-embedding`**: A coordinator job fans out one worker per collection. Each worker streams through documents, calls your `addChunk` for each chunk, and creates batch records. When `batchLimit` is set on a collection, workers paginate and queue continuation jobs. +- **`poll-or-complete-single-batch`**: Polls a single batch, requeues itself until done, then writes successful embeddings. When all batches for a run are terminal, the run is finalized (partial chunk failures are allowed). ### Queue Configuration @@ -624,6 +624,7 @@ curl -X POST http://localhost:3000/api/vector-retry-failed-batch \ - `shouldEmbedFn? (doc, payload)` – optional filter that runs **before** the document is queued for embedding. Return `false` to skip the document entirely (no job is created and `toKnowledgePool` is never called). Works for both real-time and bulk embedding. Defaults to embedding all documents when omitted. - `toKnowledgePool (doc, payload)` – return an array of `{ chunk, ...extensionFieldValues }`. Each object becomes one embedding row and the index in the array determines `chunkIndex`. +- `batchLimit? (number)` – max documents to fetch per bulk-embed worker job. When set, each page of results becomes a separate job that queues a continuation for the next page. Useful for large collections that would exceed serverless time limits in a single job. Defaults to 1000. Reserved column names: `sourceCollection`, `docId`, `chunkIndex`, `chunkText`, `embeddingVersion`. Avoid reusing them in `extensionFields`. diff --git a/dev/specs/bulkEmbed/basic.spec.ts b/dev/specs/bulkEmbed/basic.spec.ts index bb0219f..19d6e00 100644 --- a/dev/specs/bulkEmbed/basic.spec.ts +++ b/dev/specs/bulkEmbed/basic.spec.ts @@ -1,5 +1,5 @@ import type { Payload, SanitizedConfig } from 'payload' -import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js' import { BULK_EMBEDDINGS_INPUT_METADATA_SLUG } from '../../../src/collections/bulkEmbeddingInputMetadata.js' @@ -10,6 +10,7 @@ import { clearAllCollections, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -54,6 +55,10 @@ describe('Bulk embed - basic tests', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + beforeEach(async () => { await clearAllCollections(payload) }) diff --git a/dev/specs/bulkEmbed/batchLimit.spec.ts b/dev/specs/bulkEmbed/batchLimit.spec.ts new file mode 100644 index 0000000..db089ef --- /dev/null +++ b/dev/specs/bulkEmbed/batchLimit.spec.ts @@ -0,0 +1,115 @@ +import type { Payload } from 'payload' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' +import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' +import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js' +import { + BULK_QUEUE_NAMES, + DEFAULT_DIMS, + buildPayloadWithIntegration, + createMockBulkEmbeddings, + createTestDb, + destroyPayload, + waitForBulkJobs, +} from '../utils.js' +import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' +import { getVectorizedPayload, VectorizedPayload } from 'payloadcms-vectorize' +import { expectGoodResult } from '../utils.vitest.js' + +const DIMS = DEFAULT_DIMS +const dbName = `bulk_batchlimit_${Date.now()}` + +describe('Bulk embed - batchLimit', () => { + let payload: Payload + let vectorizedPayload: VectorizedPayload | null = null + + beforeAll(async () => { + await createTestDb({ dbName }) + const built = await buildPayloadWithIntegration({ + dbName, + pluginOpts: { + knowledgePools: { + default: { + collections: { + posts: { + toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], + batchLimit: 2, + }, + }, + embeddingConfig: { + version: testEmbeddingVersion, + queryFn: makeDummyEmbedQuery(DIMS), + bulkEmbeddingsFns: createMockBulkEmbeddings({ + statusSequence: ['succeeded'], + }), + }, + }, + }, + bulkQueueNames: BULK_QUEUE_NAMES, + }, + key: `batchlimit-${Date.now()}`, + }) + payload = built.payload + vectorizedPayload = getVectorizedPayload(payload) + }) + + afterAll(async () => { + await destroyPayload(payload) + }) + + test('batchLimit splits docs across continuation jobs and all get embedded', async () => { + // Create 5 posts with batchLimit: 2 + // Should result in 3 prepare jobs (2 docs, 2 docs, 1 doc) + for (let i = 0; i < 5; i++) { + await payload.create({ collection: 'posts', data: { title: `BatchLimit Post ${i}` } as any }) + } + + const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' }) + expectGoodResult(result) + + await waitForBulkJobs(payload, 30000) + + // All 5 posts should have embeddings + const embeds = await payload.find({ collection: 'default' }) + expect(embeds.totalDocs).toBe(5) + + // Run should be succeeded + const runDoc = ( + await (payload as any).find({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + where: { id: { equals: result!.runId } }, + }) + ).docs[0] + expect(runDoc.status).toBe('succeeded') + expect(runDoc.inputs).toBe(5) + }) + + test('batchLimit equal to doc count does not create extra continuations', async () => { + // Clean up from prior test: delete all posts and embeddings + await payload.delete({ collection: 'posts', where: {} }) + await payload.delete({ collection: 'default' as any, where: {} }) + + // Create exactly 2 posts (matching batchLimit: 2) + for (let i = 0; i < 2; i++) { + await payload.create({ + collection: 'posts', + data: { title: `Exact Post ${i}` } as any, + }) + } + + const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' }) + expectGoodResult(result) + + await waitForBulkJobs(payload, 20000) + + const embeds = await payload.find({ collection: 'default' }) + expect(embeds.totalDocs).toBe(2) + + const runDoc = ( + await (payload as any).find({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + where: { id: { equals: result!.runId } }, + }) + ).docs[0] + expect(runDoc.status).toBe('succeeded') + }) +}) diff --git a/dev/specs/bulkEmbed/canceledBatch.spec.ts b/dev/specs/bulkEmbed/canceledBatch.spec.ts index 2b99b88..d2695e8 100644 --- a/dev/specs/bulkEmbed/canceledBatch.spec.ts +++ b/dev/specs/bulkEmbed/canceledBatch.spec.ts @@ -1,11 +1,12 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_QUEUE_NAMES, DEFAULT_DIMS, buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -49,6 +50,10 @@ describe('Bulk embed - canceled batch', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('canceled batch marks entire run as failed', async () => { const post = await payload.create({ collection: 'posts', data: { title: 'Cancel' } as any }) const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' }) diff --git a/dev/specs/bulkEmbed/concurrentRuns.spec.ts b/dev/specs/bulkEmbed/concurrentRuns.spec.ts index c03f212..0349f2a 100644 --- a/dev/specs/bulkEmbed/concurrentRuns.spec.ts +++ b/dev/specs/bulkEmbed/concurrentRuns.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { getVectorizedPayload } from '../../../src/types.js' import { @@ -8,6 +8,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -45,6 +46,10 @@ describe('Bulk embed - concurrent runs prevention', () => { payload = built.payload }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('cannot start concurrent bulk embed runs for the same pool', async () => { const vectorizedPayload = getVectorizedPayload<'default'>(payload)! // Create a test post first diff --git a/dev/specs/bulkEmbed/extensionFields.spec.ts b/dev/specs/bulkEmbed/extensionFields.spec.ts index c564bea..4701978 100644 --- a/dev/specs/bulkEmbed/extensionFields.spec.ts +++ b/dev/specs/bulkEmbed/extensionFields.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_QUEUE_NAMES, @@ -7,6 +7,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -53,6 +54,10 @@ describe('Bulk embed - extension fields', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('extension fields are merged when writing embeddings', async () => { const post = await payload.create({ collection: 'posts', diff --git a/dev/specs/bulkEmbed/failedBatch.spec.ts b/dev/specs/bulkEmbed/failedBatch.spec.ts index 037fdb5..a971811 100644 --- a/dev/specs/bulkEmbed/failedBatch.spec.ts +++ b/dev/specs/bulkEmbed/failedBatch.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js' import { BULK_EMBEDDINGS_INPUT_METADATA_SLUG } from '../../../src/collections/bulkEmbeddingInputMetadata.js' @@ -10,6 +10,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -49,6 +50,10 @@ describe('Bulk embed - failed batch', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('failed batch marks entire run as failed', async () => { const post = await payload.create({ collection: 'posts', data: { title: 'Fail' } as any }) diff --git a/dev/specs/bulkEmbed/ingestionFailure.spec.ts b/dev/specs/bulkEmbed/ingestionFailure.spec.ts index e73d3b9..619432a 100644 --- a/dev/specs/bulkEmbed/ingestionFailure.spec.ts +++ b/dev/specs/bulkEmbed/ingestionFailure.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js' import { @@ -8,6 +8,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -24,6 +25,10 @@ describe('Bulk embed - ingestion validation failures', () => { await createTestDb({ dbName }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('malformed chunk entry fails the bulk embedding run', async () => { // Use unique version to ensure this test only processes its own data const testVersion = `${testEmbeddingVersion}-ingestion-fail-${Date.now()}` diff --git a/dev/specs/bulkEmbed/multipleBatches.spec.ts b/dev/specs/bulkEmbed/multipleBatches.spec.ts index aa2de86..dd611cd 100644 --- a/dev/specs/bulkEmbed/multipleBatches.spec.ts +++ b/dev/specs/bulkEmbed/multipleBatches.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js' import { @@ -8,6 +8,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -51,6 +52,10 @@ describe('Bulk embed - multiple batches', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('multiple batches are created when flushing after N chunks', async () => { // Create 5 posts (should result in 3 batches: 2, 2, 1) for (let i = 0; i < 5; i++) { diff --git a/dev/specs/bulkEmbed/multipleChunks.spec.ts b/dev/specs/bulkEmbed/multipleChunks.spec.ts index 0f99eab..f669380 100644 --- a/dev/specs/bulkEmbed/multipleChunks.spec.ts +++ b/dev/specs/bulkEmbed/multipleChunks.spec.ts @@ -1,11 +1,12 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_QUEUE_NAMES, DEFAULT_DIMS, buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -51,6 +52,10 @@ describe('Bulk embed - multiple chunks with extension fields', () => { payload = built.payload }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('multiple chunks keep their respective extension fields', async () => { const post = await payload.create({ collection: 'posts', diff --git a/dev/specs/bulkEmbed/onError.spec.ts b/dev/specs/bulkEmbed/onError.spec.ts index f128009..1dfb483 100644 --- a/dev/specs/bulkEmbed/onError.spec.ts +++ b/dev/specs/bulkEmbed/onError.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_QUEUE_NAMES, @@ -7,6 +7,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -56,6 +57,10 @@ describe('Bulk embed - onError callback', () => { payload = built.payload }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('onError callback is called when batch fails', async () => { await payload.create({ collection: 'posts', data: { title: 'Error Test' } as any }) diff --git a/dev/specs/bulkEmbed/partialFailure.spec.ts b/dev/specs/bulkEmbed/partialFailure.spec.ts index d3ef57e..e0a73f8 100644 --- a/dev/specs/bulkEmbed/partialFailure.spec.ts +++ b/dev/specs/bulkEmbed/partialFailure.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_QUEUE_NAMES, @@ -7,6 +7,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -30,6 +31,10 @@ describe('Bulk embed - partial chunk failures', () => { await createTestDb({ dbName }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('partial chunk failures are tracked and passed to onError', async () => { // Reset state onErrorCalled = false diff --git a/dev/specs/bulkEmbed/partialFailureNoFail.spec.ts b/dev/specs/bulkEmbed/partialFailureNoFail.spec.ts index 35e877f..ad3ba07 100644 --- a/dev/specs/bulkEmbed/partialFailureNoFail.spec.ts +++ b/dev/specs/bulkEmbed/partialFailureNoFail.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_QUEUE_NAMES, @@ -7,6 +7,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -30,6 +31,10 @@ describe('Bulk embed - partial failures', () => { await createTestDb({ dbName }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('run with no partial failures does not call onError', async () => { // Reset state onErrorCalled = false diff --git a/dev/specs/bulkEmbed/polling.spec.ts b/dev/specs/bulkEmbed/polling.spec.ts index eedd32a..8365799 100644 --- a/dev/specs/bulkEmbed/polling.spec.ts +++ b/dev/specs/bulkEmbed/polling.spec.ts @@ -1,5 +1,5 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test, vi } from 'vitest' +import { afterAll, beforeAll, describe, expect, test, vi } from 'vitest' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js' import { BULK_QUEUE_NAMES, @@ -7,6 +7,7 @@ import { buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { getVectorizedPayload } from 'payloadcms-vectorize' @@ -47,6 +48,10 @@ describe('Bulk embed - polling requeue', () => { payload = built.payload }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('polling requeues when non-terminal then succeeds', async () => { const post = await payload.create({ collection: 'posts', data: { title: 'Loop' } as any }) const queueSpy = vi.spyOn(payload.jobs, 'queue') @@ -57,12 +62,12 @@ describe('Bulk embed - polling requeue', () => { await waitForBulkJobs(payload, 15000) expect(queueSpy).toHaveBeenNthCalledWith( - 2, // 2nd call - expect.objectContaining({ task: 'payloadcms-vectorize:poll-or-complete-bulk-embedding' }), + 3, // 3rd call - per-batch task queued from worker (1=coordinator, 2=worker) + expect.objectContaining({ task: 'payloadcms-vectorize:poll-or-complete-single-batch' }), ) expect(queueSpy).toHaveBeenNthCalledWith( - 3, // 3rd call - expect.objectContaining({ task: 'payloadcms-vectorize:poll-or-complete-bulk-embedding' }), + 4, // 4th call - per-batch task re-queued after 'running' status + expect.objectContaining({ task: 'payloadcms-vectorize:poll-or-complete-single-batch' }), ) const embeds = await payload.find({ diff --git a/dev/specs/bulkEmbed/realtimeMode.spec.ts b/dev/specs/bulkEmbed/realtimeMode.spec.ts index 8e4c224..4b5bf8c 100644 --- a/dev/specs/bulkEmbed/realtimeMode.spec.ts +++ b/dev/specs/bulkEmbed/realtimeMode.spec.ts @@ -1,11 +1,12 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_QUEUE_NAMES, DEFAULT_DIMS, buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForVectorizationJobs, } from '../utils.js' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -46,6 +47,10 @@ describe('Bulk embed - realtime mode', () => { payload = built.payload }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('realtime mode queues vectorize jobs when realTimeIngestionFn is provided', async () => { const post = await payload.create({ collection: 'posts', diff --git a/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts b/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts index ebbda47..6b20191 100644 --- a/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts +++ b/dev/specs/bulkEmbed/shouldEmbedFn.spec.ts @@ -1,5 +1,5 @@ import type { Payload, SanitizedConfig } from 'payload' -import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest' import { BULK_QUEUE_NAMES, DEFAULT_DIMS, @@ -7,6 +7,7 @@ import { clearAllCollections, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' @@ -52,6 +53,10 @@ describe('Bulk embed - shouldEmbedFn', () => { vectorizedPayload = getVectorizedPayload(payload) }) + afterAll(async () => { + await destroyPayload(payload) + }) + beforeEach(async () => { await clearAllCollections(payload) }) diff --git a/dev/specs/bulkEmbed/versionBump.spec.ts b/dev/specs/bulkEmbed/versionBump.spec.ts index 5d3dcde..a376342 100644 --- a/dev/specs/bulkEmbed/versionBump.spec.ts +++ b/dev/specs/bulkEmbed/versionBump.spec.ts @@ -1,10 +1,11 @@ -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { BULK_QUEUE_NAMES, DEFAULT_DIMS, buildPayloadWithIntegration, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForBulkJobs, } from '../utils.js' import { makeDummyEmbedQuery } from 'helpers/embed.js' @@ -25,10 +26,17 @@ const BULK_QUEUE_NAMES_1 = { describe('Bulk embed - version bump', () => { let post: any + const payloadsToDestroy: any[] = [] beforeAll(async () => { await createTestDb({ dbName }) }) + afterAll(async () => { + for (const p of payloadsToDestroy) { + await destroyPayload(p) + } + }) + test('version bump re-embeds all even without updates', async () => { const payload0 = ( await buildPayloadWithIntegration({ @@ -54,6 +62,8 @@ describe('Bulk embed - version bump', () => { }) ).payload + payloadsToDestroy.push(payload0) + post = await payload0.create({ collection: 'posts', data: { title: 'Old' } as any }) const vectorizedPayload0 = getVectorizedPayload(payload0) @@ -95,6 +105,8 @@ describe('Bulk embed - version bump', () => { }) ).payload + payloadsToDestroy.push(payload1) + const vectorizedPayload1 = getVectorizedPayload(payload1) const result1 = await vectorizedPayload1?.bulkEmbed({ knowledgePool: 'default' }) expectGoodResult(result1) diff --git a/dev/specs/chunkers.spec.ts b/dev/specs/chunkers.spec.ts index 445a6c7..26cc196 100644 --- a/dev/specs/chunkers.spec.ts +++ b/dev/specs/chunkers.spec.ts @@ -2,7 +2,7 @@ import { describe, expect, test } from 'vitest' import { chunkText, chunkRichText } from 'helpers/chunkers.js' import { postgresAdapter } from '@payloadcms/db-postgres' import { buildDummyConfig, getInitialMarkdownContent, integration } from './constants.js' -import { createTestDb } from './utils.js' +import { createTestDb, destroyPayload } from './utils.js' import { getPayload } from 'payload' describe('Chunkers', () => { @@ -36,25 +36,30 @@ describe('Chunkers', () => { key: `chunkers-test-${Date.now()}`, cron: true, }) - const chunks = await chunkRichText(markdownContent, thisPayload) - - expect(chunks.length).toBe(3) - - // Intro chunk - expect(chunks[0]).toContain('Title') - expect(chunks[0]).toContain('Quote') - expect(chunks[0]).toContain('Paragraph 0') - - // First H2 section - expect(chunks[1]).toContain('## Header 1') - expect(chunks[1]).toContain('Paragraph 1') - expect(chunks[1]).toContain('Paragraph 2') - expect(chunks[1]).toContain('Paragraph 3') - - // Second H2 section - expect(chunks[2]).toContain('## Header 2') - expect(chunks[2]).toContain('Paragraph 4') - expect(chunks[2]).toContain('Paragraph 5') - expect(chunks[2]).toContain('Paragraph 6') + + try { + const chunks = await chunkRichText(markdownContent, thisPayload) + + expect(chunks.length).toBe(3) + + // Intro chunk + expect(chunks[0]).toContain('Title') + expect(chunks[0]).toContain('Quote') + expect(chunks[0]).toContain('Paragraph 0') + + // First H2 section + expect(chunks[1]).toContain('## Header 1') + expect(chunks[1]).toContain('Paragraph 1') + expect(chunks[1]).toContain('Paragraph 2') + expect(chunks[1]).toContain('Paragraph 3') + + // Second H2 section + expect(chunks[2]).toContain('## Header 2') + expect(chunks[2]).toContain('Paragraph 4') + expect(chunks[2]).toContain('Paragraph 5') + expect(chunks[2]).toContain('Paragraph 6') + } finally { + await destroyPayload(thisPayload) + } }) }) diff --git a/dev/specs/config.spec.ts b/dev/specs/config.spec.ts index b183af7..a7c7736 100644 --- a/dev/specs/config.spec.ts +++ b/dev/specs/config.spec.ts @@ -11,7 +11,7 @@ describe('jobs.tasks merging', () => { { slug: 'payloadcms-vectorize:vectorize', handler: expect.any(Function) }, { slug: 'payloadcms-vectorize:prepare-bulk-embedding', handler: expect.any(Function) }, { - slug: 'payloadcms-vectorize:poll-or-complete-bulk-embedding', + slug: 'payloadcms-vectorize:poll-or-complete-single-batch', handler: expect.any(Function), }, ]), diff --git a/dev/specs/e2e.spec.ts b/dev/specs/e2e.spec.ts index 5b8bf85..7d81b5e 100644 --- a/dev/specs/e2e.spec.ts +++ b/dev/specs/e2e.spec.ts @@ -3,7 +3,7 @@ import type { Payload, SanitizedConfig } from 'payload' import config from '@payload-config' import { getPayload } from 'payload' import { getInitialMarkdownContent } from './constants.js' -import { waitForVectorizationJobs, waitForBulkJobs } from './utils.js' +import { destroyPayload, waitForVectorizationJobs, waitForBulkJobs } from './utils.js' import { testEmbeddingVersion } from 'helpers/embed.js' import { devUser } from 'helpers/credentials.js' import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../src/collections/bulkEmbeddingsRuns.js' @@ -66,6 +66,10 @@ test.describe('Vector embedding e2e tests', () => { payload = await getPayload({ config: _config, key: `e2e-test-${Date.now()}` }) }) + test.afterAll(async () => { + await destroyPayload(payload) + }) + test('querying the endpoint should return the title with testEmbeddingVersion', async ({ request, }) => { diff --git a/dev/specs/extensionFields.spec.ts b/dev/specs/extensionFields.spec.ts index 80a91fe..bd08ac6 100644 --- a/dev/specs/extensionFields.spec.ts +++ b/dev/specs/extensionFields.spec.ts @@ -1,9 +1,10 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { postgresAdapter } from '@payloadcms/db-postgres' import { buildDummyConfig, integration, plugin } from './constants.js' import { createTestDb, + destroyPayload, waitForVectorizationJobs, } from './utils.js' import { getPayload } from 'payload' @@ -116,6 +117,10 @@ describe('Extension fields integration tests', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('extension fields are added to the embeddings table schema', async () => { const db = (payload as PostgresPayload).db const sql = ` diff --git a/dev/specs/extensionFieldsVectorSearch.spec.ts b/dev/specs/extensionFieldsVectorSearch.spec.ts index 825798a..ec40683 100644 --- a/dev/specs/extensionFieldsVectorSearch.spec.ts +++ b/dev/specs/extensionFieldsVectorSearch.spec.ts @@ -3,6 +3,7 @@ import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'h import { buildDummyConfig, DIMS, integration, plugin } from './constants.js' import { createTestDb, + destroyPayload, waitForVectorizationJobs, } from './utils.js' import { getPayload } from 'payload' @@ -112,55 +113,59 @@ describe('extensionFields', () => { cron: true, }) - // Create a post with extension field values - const testQuery = 'Extension fields test content' - const post = await payloadWithExtensions.create({ - collection: 'posts', - data: { - title: testQuery, - content: null, - category: 'tech', - priorityLevel: 42, - } as unknown as any, - }) + try { + // Create a post with extension field values + const testQuery = 'Extension fields test content' + const post = await payloadWithExtensions.create({ + collection: 'posts', + data: { + title: testQuery, + content: null, + category: 'tech', + priorityLevel: 42, + } as unknown as any, + }) - // Wait for vectorization jobs to complete - await waitForVectorizationJobs(payloadWithExtensions) + // Wait for vectorization jobs to complete + await waitForVectorizationJobs(payloadWithExtensions) - // Perform vector search - const knowledgePools: Record = { - default: defaultKnowledgePool, - } - const searchHandler = createVectorSearchHandlers(knowledgePools).requestHandler - const mockRequest = { - json: async () => ({ - query: testQuery, - knowledgePool: 'default', - }), - payload: payloadWithExtensions, - } as any - const response = await searchHandler(mockRequest) - const json = await response.json() + // Perform vector search + const knowledgePools: Record = { + default: defaultKnowledgePool, + } + const searchHandler = createVectorSearchHandlers(knowledgePools).requestHandler + const mockRequest = { + json: async () => ({ + query: testQuery, + knowledgePool: 'default', + }), + payload: payloadWithExtensions, + } as any + const response = await searchHandler(mockRequest) + const json = await response.json() - // Verify results contain extensionFields - expect(json).toHaveProperty('results') - expect(Array.isArray(json.results)).toBe(true) - expect(json.results.length).toBeGreaterThan(0) + // Verify results contain extensionFields + expect(json).toHaveProperty('results') + expect(Array.isArray(json.results)).toBe(true) + expect(json.results.length).toBeGreaterThan(0) - // Find a result that matches our post - const matchingResult = json.results.find( - (r: any) => r.docId === String(post.id) && r.chunkText === testQuery, - ) - expect(matchingResult).toBeDefined() + // Find a result that matches our post + const matchingResult = json.results.find( + (r: any) => r.docId === String(post.id) && r.chunkText === testQuery, + ) + expect(matchingResult).toBeDefined() - // Verify extensionFields are present - expect(matchingResult).toHaveProperty('category') - expect(matchingResult).toHaveProperty('priorityLevel') + // Verify extensionFields are present + expect(matchingResult).toHaveProperty('category') + expect(matchingResult).toHaveProperty('priorityLevel') - // Verify types are correct - expect(typeof matchingResult.category).toBe('string') - expect(matchingResult.category).toBe('tech') - expect(typeof matchingResult.priorityLevel).toBe('number') - expect(matchingResult.priorityLevel).toBe(42) + // Verify types are correct + expect(typeof matchingResult.category).toBe('string') + expect(matchingResult.category).toBe('tech') + expect(typeof matchingResult.priorityLevel).toBe('number') + expect(matchingResult.priorityLevel).toBe(42) + } finally { + await destroyPayload(payloadWithExtensions) + } }) }) diff --git a/dev/specs/failedValidation.spec.ts b/dev/specs/failedValidation.spec.ts index 558c74e..d3fe9f9 100644 --- a/dev/specs/failedValidation.spec.ts +++ b/dev/specs/failedValidation.spec.ts @@ -5,6 +5,7 @@ import { describe, expect, test } from 'vitest' import { createVectorizeIntegration } from '../../src/index.js' import { createTestDb, + destroyPayload, waitForVectorizationJobs, } from './utils.js' import { getPayload } from 'payload' @@ -82,31 +83,35 @@ describe('Validation failures mark jobs as errored', () => { cron: true, }) - await payload.create({ - collection: 'posts', - data: { title: 'bad chunks' }, - }) + try { + await payload.create({ + collection: 'posts', + data: { title: 'bad chunks' }, + }) - // Wait for the queued job to finish (success or failure) - await waitForVectorizationJobs(payload, 30000) + // Wait for the queued job to finish (success or failure) + await waitForVectorizationJobs(payload, 30000) - // Then assert failure - const res = await payload.find({ - collection: 'payload-jobs', - where: { - and: [{ taskSlug: { equals: 'payloadcms-vectorize:vectorize' } }], - }, - limit: 1, - sort: '-createdAt', - }) - const failedJob = (res as any)?.docs?.[0] - expect(failedJob.hasError).toBe(true) - const errMsg = failedJob.error.message - expect(errMsg).toMatch(/chunk/i) - expect(errMsg).toMatch(/Invalid indices: 1/) + // Then assert failure + const res = await payload.find({ + collection: 'payload-jobs', + where: { + and: [{ taskSlug: { equals: 'payloadcms-vectorize:vectorize' } }], + }, + limit: 1, + sort: '-createdAt', + }) + const failedJob = (res as any)?.docs?.[0] + expect(failedJob.hasError).toBe(true) + const errMsg = failedJob.error.message + expect(errMsg).toMatch(/chunk/i) + expect(errMsg).toMatch(/Invalid indices: 1/) - // Ensure no embeddings were created (all-or-nothing validation) - const embeddingsCount = await payload.count({ collection: 'default' }) - expect(embeddingsCount.totalDocs).toBe(0) + // Ensure no embeddings were created (all-or-nothing validation) + const embeddingsCount = await payload.count({ collection: 'default' }) + expect(embeddingsCount.totalDocs).toBe(0) + } finally { + await destroyPayload(payload) + } }, 60000) }) diff --git a/dev/specs/int.spec.ts b/dev/specs/int.spec.ts index 0521deb..c9db4cd 100644 --- a/dev/specs/int.spec.ts +++ b/dev/specs/int.spec.ts @@ -1,6 +1,6 @@ import type { Payload, SanitizedConfig } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' import { chunkRichText, chunkText } from 'helpers/chunkers.js' import { createHeadlessEditor } from '@payloadcms/richtext-lexical/lexical/headless' @@ -16,6 +16,7 @@ import { editorConfigFactory, getEnabledNodes, lexicalEditor } from '@payloadcms import { DIMS, getInitialMarkdownContent } from './constants.js' import { createTestDb, + destroyPayload, waitForVectorizationJobs, } from './utils.js' import { getPayload } from 'payload' @@ -112,6 +113,10 @@ describe('Plugin integration tests', () => { markdownContent = await getInitialMarkdownContent(config) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('adds embeddings collection with vector column', async () => { // Check schema for embeddings collection const collections = payload.collections diff --git a/dev/specs/migrationCli.spec.ts b/dev/specs/migrationCli.spec.ts index 015706f..40f4c45 100644 --- a/dev/specs/migrationCli.spec.ts +++ b/dev/specs/migrationCli.spec.ts @@ -4,7 +4,7 @@ import { postgresAdapter } from '@payloadcms/db-postgres' import { buildConfig, getPayload } from 'payload' import { createVectorizeIntegration } from 'payloadcms-vectorize' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from '../helpers/embed.js' -import { createTestDb } from './utils.js' +import { createTestDb, destroyPayload } from './utils.js' import { DIMS } from './constants.js' import type { PostgresPayload } from '../../src/types.js' import { script as vectorizeMigrateScript } from '../../src/bin/vectorize-migrate.js' @@ -73,6 +73,10 @@ describe('Migration CLI integration tests', () => { payload = await getPayload({ config, cron: true }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('VectorizedPayload has _staticConfigs', async () => { const { getVectorizedPayload } = await import('payloadcms-vectorize') const vectorizedPayload = getVectorizedPayload(payload) @@ -152,6 +156,10 @@ describe('Migration CLI integration tests', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('vector search fails with descriptive error when embedding column missing', async () => { const { getVectorizedPayload } = await import('payloadcms-vectorize') const vectorizedPayload = getVectorizedPayload(payload) @@ -200,7 +208,7 @@ describe('Migration CLI integration tests', () => { }) afterAll(async () => { - // Cleanup: remove test migrations directory + await destroyPayload(autoPayload) if (existsSync(migrationsDir)) { rmSync(migrationsDir, { recursive: true, force: true }) } @@ -420,6 +428,7 @@ describe('Migration CLI integration tests', () => { }) afterAll(async () => { + await destroyPayload(dimsPayload) if (existsSync(migrationsDir)) { rmSync(migrationsDir, { recursive: true, force: true }) } @@ -718,6 +727,7 @@ describe('Migration CLI integration tests', () => { }) afterAll(async () => { + await destroyPayload(multiPayload) if (existsSync(migrationsDir)) { rmSync(migrationsDir, { recursive: true, force: true }) } diff --git a/dev/specs/multipools.spec.ts b/dev/specs/multipools.spec.ts index 9695953..aa92110 100644 --- a/dev/specs/multipools.spec.ts +++ b/dev/specs/multipools.spec.ts @@ -1,11 +1,11 @@ import type { Payload, SanitizedConfig } from 'payload' import { buildConfig } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { createVectorizeIntegration } from 'payloadcms-vectorize' import { lexicalEditor } from '@payloadcms/richtext-lexical' import { postgresAdapter } from '@payloadcms/db-postgres' -import { createTestDb } from './utils.js' +import { createTestDb, destroyPayload } from './utils.js' import { getPayload } from 'payload' import type { PostgresPayload } from '../../src/types.js' @@ -75,6 +75,10 @@ describe('Multiple knowledge pools', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('creates two embeddings collections with vector columns', async () => { const collections = payload.collections expect(collections).toHaveProperty('pool1') diff --git a/dev/specs/queueName.spec.ts b/dev/specs/queueName.spec.ts index 8721d74..2395af2 100644 --- a/dev/specs/queueName.spec.ts +++ b/dev/specs/queueName.spec.ts @@ -1,10 +1,10 @@ import type { Payload, SanitizedConfig } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { chunkText, chunkRichText } from 'helpers/chunkers.js' import type { SerializedEditorState } from '@payloadcms/richtext-lexical/lexical' import { postgresAdapter } from '@payloadcms/db-postgres' import { buildDummyConfig, getInitialMarkdownContent, integration, plugin } from './constants.js' -import { createTestDb } from './utils.js' +import { createTestDb, destroyPayload } from './utils.js' import { getPayload } from 'payload' describe('Queue tests', () => { @@ -74,6 +74,11 @@ describe('Queue tests', () => { }) markdownContent = await getInitialMarkdownContent(config) }) + + afterAll(async () => { + await destroyPayload(payload) + }) + test('vectorization jobs are queued using the queueName', async () => { // There is no autoRun so previous jobs are queued and never removed between tests const prevJobs = await payload.find({ diff --git a/dev/specs/schemaName.spec.ts b/dev/specs/schemaName.spec.ts index d038d2d..a69ca1a 100644 --- a/dev/specs/schemaName.spec.ts +++ b/dev/specs/schemaName.spec.ts @@ -3,13 +3,14 @@ import type { Payload } from 'payload' import { postgresAdapter } from '@payloadcms/db-postgres' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' import { Client } from 'pg' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import type { PostgresPayload } from '../../src/types.js' import { buildDummyConfig, DIMS, integration, plugin } from './constants.js' import { createTestDb, + destroyPayload, waitForVectorizationJobs, } from './utils.js' import { getPayload } from 'payload' @@ -95,6 +96,10 @@ describe('Custom schemaName support', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('embeddings table is created in custom schema', async () => { const db = (payload as PostgresPayload).db const tablesRes = await db.pool?.query( diff --git a/dev/specs/shouldEmbedFn.spec.ts b/dev/specs/shouldEmbedFn.spec.ts index acb730e..89daa8c 100644 --- a/dev/specs/shouldEmbedFn.spec.ts +++ b/dev/specs/shouldEmbedFn.spec.ts @@ -1,8 +1,8 @@ import type { Payload, SanitizedConfig } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' import { DIMS } from './constants.js' -import { createTestDb, waitForVectorizationJobs } from './utils.js' +import { createTestDb, destroyPayload, waitForVectorizationJobs } from './utils.js' import { getPayload } from 'payload' import { postgresAdapter } from '@payloadcms/db-postgres' import { buildConfig } from 'payload' @@ -79,6 +79,10 @@ describe('shouldEmbedFn - real-time', () => { }) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('shouldEmbedFn filters documents on real-time create', async () => { const skippedPost = await payload.create({ collection: 'posts', diff --git a/dev/specs/utils.ts b/dev/specs/utils.ts index 62b7de2..532ece0 100644 --- a/dev/specs/utils.ts +++ b/dev/specs/utils.ts @@ -146,7 +146,7 @@ export async function waitForBulkJobs(payload: Payload, maxWaitMs = 10000) { payload, [ 'payloadcms-vectorize:prepare-bulk-embedding', - 'payloadcms-vectorize:poll-or-complete-bulk-embedding', + 'payloadcms-vectorize:poll-or-complete-single-batch', ], maxWaitMs, ) @@ -356,6 +356,11 @@ export const clearAllCollections = async (pl: Payload) => { await safeDelete('payload-jobs') } +/** Safely destroy a Payload instance (stops crons, closes DB pool). */ +export async function destroyPayload(payload: Payload | null | undefined) { + if (payload) await payload.destroy() +} + export async function createSucceededBaselineRun( payload: Payload, { diff --git a/dev/specs/vectorSearch.spec.ts b/dev/specs/vectorSearch.spec.ts index 510f4eb..de6397b 100644 --- a/dev/specs/vectorSearch.spec.ts +++ b/dev/specs/vectorSearch.spec.ts @@ -1,6 +1,6 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' import { type SerializedEditorState } from '@payloadcms/richtext-lexical/lexical' import { buildDummyConfig, DIMS, getInitialMarkdownContent } from './constants.js' @@ -8,6 +8,7 @@ import { BULK_QUEUE_NAMES, createMockBulkEmbeddings, createTestDb, + destroyPayload, waitForVectorizationJobs, } from './utils.js' import { getPayload } from 'payload' @@ -200,6 +201,10 @@ describe('Search endpoint integration tests', () => { markdownContent = await getInitialMarkdownContent(config) }) + afterAll(async () => { + await destroyPayload(payload) + }) + test('querying a title should return the title', async () => { // This should create multiple embeddings for the title and content const post = await payload.create({ diff --git a/dev/specs/vectorizedPayload.spec.ts b/dev/specs/vectorizedPayload.spec.ts index 28cbf82..f43d746 100644 --- a/dev/specs/vectorizedPayload.spec.ts +++ b/dev/specs/vectorizedPayload.spec.ts @@ -1,10 +1,11 @@ import type { Payload } from 'payload' -import { beforeAll, describe, expect, test } from 'vitest' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' import { getVectorizedPayload, VectorizedPayload } from '../../src/types.js' import { buildDummyConfig, DIMS, getInitialMarkdownContent } from './constants.js' import { createTestDb, + destroyPayload, waitForVectorizationJobs, } from './utils.js' import { getPayload } from 'payload' @@ -103,6 +104,10 @@ describe('VectorizedPayload', () => { markdownContent = await getInitialMarkdownContent(config) }) + afterAll(async () => { + await destroyPayload(payload) + }) + describe('getVectorizedPayload', () => { test('returns vectorized payload object for a payload instance with vectorize extensions', () => { const vectorizedPayload = getVectorizedPayload(payload) diff --git a/package.json b/package.json index 6710be3..2b0d2ba 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "payloadcms-vectorize", - "version": "0.5.4", + "version": "0.5.5", "description": "A plugin to vectorize collections for RAG in Payload 3.0", "license": "MIT", "type": "module", @@ -40,7 +40,7 @@ "test:teardown": "docker-compose -f dev/docker-compose.test.yml down", "test": "pnpm test:int && pnpm test:e2e", "test:e2e": "playwright test", - "test:int": "cross-env DOTENV_CONFIG_PATH=dev/.env.test NODE_OPTIONS='--require=dotenv/config --import=tsx' vitest" + "test:int": "cross-env DOTENV_CONFIG_PATH=dev/.env.test NODE_OPTIONS='--require=dotenv/config --import=tsx --max-old-space-size=8192' vitest" }, "devDependencies": { "@eslint/eslintrc": "^3.2.0", diff --git a/src/collections/bulkEmbeddingsBatches.ts b/src/collections/bulkEmbeddingsBatches.ts index e47c18a..43aa02a 100644 --- a/src/collections/bulkEmbeddingsBatches.ts +++ b/src/collections/bulkEmbeddingsBatches.ts @@ -115,6 +115,14 @@ export const createBulkEmbeddingsBatchesCollection = (): CollectionConfig => ({ description: 'Error message if the batch failed', }, }, + { + name: 'failedChunkData', + type: 'json', + admin: { + description: + 'Collection, documentId and chunkIndex for each chunk that failed in this batch', + }, + }, { name: 'retriedBatch', type: 'relationship', diff --git a/src/endpoints/retryFailedBatch.ts b/src/endpoints/retryFailedBatch.ts index 8d86e50..678a093 100644 --- a/src/endpoints/retryFailedBatch.ts +++ b/src/endpoints/retryFailedBatch.ts @@ -209,10 +209,10 @@ export async function retryBatch({ - task: 'payloadcms-vectorize:poll-or-complete-bulk-embedding', - input: { runId: String(runId) }, + // Queue a per-batch task for the retried batch + await payload.jobs.queue<'payloadcms-vectorize:poll-or-complete-single-batch'>({ + task: 'payloadcms-vectorize:poll-or-complete-single-batch', + input: { runId: String(runId), batchId: String(newBatch.id) }, ...(queueName ? { queue: queueName } : {}), }) diff --git a/src/index.ts b/src/index.ts index 71a49b4..5a56f35 100644 --- a/src/index.ts +++ b/src/index.ts @@ -35,7 +35,7 @@ import { } from './collections/bulkEmbeddingsBatches.js' import { createPrepareBulkEmbeddingTask, - createPollOrCompleteBulkEmbeddingTask, + createPollOrCompleteSingleBatchTask, } from './tasks/bulkEmbedAll.js' import { createBulkEmbedHandler, startBulkEmbed } from './endpoints/bulkEmbed.js' import { createRetryFailedBatchHandler, retryBatch } from './endpoints/retryFailedBatch.js' @@ -245,14 +245,15 @@ export const createVectorizeIntegration = const prepareBulkEmbedTask = createPrepareBulkEmbeddingTask({ knowledgePools: pluginOptions.knowledgePools, pollOrCompleteQueueName: pluginOptions.bulkQueueNames?.pollOrCompleteQueueName, + prepareBulkEmbedQueueName: pluginOptions.bulkQueueNames?.prepareBulkEmbedQueueName, }) tasks.push(prepareBulkEmbedTask) - const pollOrCompleteBulkEmbedTask = createPollOrCompleteBulkEmbeddingTask({ + const pollOrCompleteSingleBatchTask = createPollOrCompleteSingleBatchTask({ knowledgePools: pluginOptions.knowledgePools, pollOrCompleteQueueName: pluginOptions.bulkQueueNames?.pollOrCompleteQueueName, }) - tasks.push(pollOrCompleteBulkEmbedTask) + tasks.push(pollOrCompleteSingleBatchTask) config.jobs = { ...incomingJobs, diff --git a/src/tasks/bulkEmbedAll.ts b/src/tasks/bulkEmbedAll.ts index 55ba517..4caf358 100644 --- a/src/tasks/bulkEmbedAll.ts +++ b/src/tasks/bulkEmbedAll.ts @@ -1,6 +1,5 @@ import { JsonObject, - PaginatedDocs, Payload, TaskConfig, TaskHandlerResult, @@ -10,6 +9,7 @@ import { BatchSubmission, BulkEmbeddingOutput, CollectedEmbeddingInput, + CollectionVectorizeOption, KnowledgePoolDynamicConfig, KnowledgePoolName, } from '../types.js' @@ -26,6 +26,10 @@ import toSnakeCase from 'to-snake-case' type PrepareBulkEmbeddingTaskInput = { runId: string + /** If set, this is a per-collection worker job */ + collectionSlug?: string + /** Page within the collection (default: 1) */ + page?: number } type PrepareBulkEmbeddingTaskOutput = { @@ -39,18 +43,20 @@ type PrepareBulkEmbeddingTaskInputOutput = { output: PrepareBulkEmbeddingTaskOutput } -type PollOrCompleteBulkEmbeddingTaskInput = { +type PollOrCompleteSingleBatchTaskInput = { runId: string + batchId: string } -type PollOrCompleteBulkEmbeddingTaskOutput = { +type PollOrCompleteSingleBatchTaskOutput = { runId: string + batchId: string status: string } -type PollOrCompleteBulkEmbeddingTaskInputOutput = { - input: PollOrCompleteBulkEmbeddingTaskInput - output: PollOrCompleteBulkEmbeddingTaskOutput +type PollOrCompleteSingleBatchTaskInputOutput = { + input: PollOrCompleteSingleBatchTaskInput + output: PollOrCompleteSingleBatchTaskOutput } const TERMINAL_STATUSES = new Set(['succeeded', 'failed', 'canceled', 'retried']) @@ -87,12 +93,159 @@ async function loadRunAndConfig({ return { run, poolName, dynamicConfig } } +/** + * Check if all batches for a run are terminal, and if so finalize the run. + * This function is idempotent - safe to call concurrently from multiple per-batch tasks. + */ +async function finalizeRunIfComplete(args: { + payload: Payload + runId: string + poolName: KnowledgePoolName + callbacks: { + onError?: (args: { + providerBatchIds: string[] + error: Error + failedChunkData?: FailedChunkData[] + failedChunkCount?: number + }) => Promise + } +}): Promise<{ finalized: boolean; status?: string }> { + const { payload, runId, poolName, callbacks } = args + + // Check if run is already terminal (prevents double-finalization race) + const currentRun = await payload.findByID({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + id: runId, + }) + if (TERMINAL_STATUSES.has((currentRun as any).status)) { + return { finalized: true, status: (currentRun as any).status } + } + + // Stream through batches page-by-page, aggregating without storing them all in memory + const runIdNum = parseInt(runId, 10) + const PAGE_SIZE = 100 + let page = 1 + let totalBatchCount = 0 + let allTerminal = true + let hasAnySucceeded = false + let allCanceled = true + let totalSucceeded = 0 + let totalFailed = 0 + const allFailedChunkData: FailedChunkData[] = [] + const succeededBatchIds: number[] = [] + const providerBatchIds: string[] = [] + + while (true) { + const result = await payload.find({ + collection: BULK_EMBEDDINGS_BATCHES_SLUG, + where: { run: { equals: runIdNum } }, + limit: PAGE_SIZE, + page, + sort: 'batchIndex', + }) + const docs = (result as any)?.docs || [] + + for (const batch of docs) { + totalBatchCount++ + const status = batch.status as string + providerBatchIds.push(batch.providerBatchId as string) + + if (!TERMINAL_STATUSES.has(status)) allTerminal = false + if (status === 'succeeded') hasAnySucceeded = true + if (status !== 'canceled') allCanceled = false + + if (status === 'succeeded') { + totalSucceeded += batch.succeededCount || 0 + totalFailed += batch.failedCount || 0 + succeededBatchIds.push(parseInt(String(batch.id), 10)) + if (Array.isArray(batch.failedChunkData)) { + allFailedChunkData.push(...batch.failedChunkData) + } + } + } + + const totalPages = (result as any)?.totalPages ?? page + if (page >= totalPages || docs.length === 0) break + page++ + } + + + if (totalBatchCount === 0) { + await payload.update({ + id: runId, + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { + status: 'succeeded', + inputs: 0, + succeeded: 0, + failed: 0, + completedAt: new Date().toISOString(), + }, + }) + return { finalized: true, status: 'succeeded' } + } + + if (!allTerminal) { + return { finalized: false } + } + + // All batches are terminal — finalize the run + if (allCanceled) { + await payload.update({ + id: runId, + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { status: 'canceled', completedAt: new Date().toISOString() }, + }) + return { finalized: true, status: 'canceled' } + } + + const runStatus = hasAnySucceeded ? 'succeeded' : 'failed' + + await payload.update({ + id: runId, + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { + status: runStatus, + succeeded: totalSucceeded, + failed: totalFailed, + failedChunkData: allFailedChunkData.length > 0 ? allFailedChunkData : undefined, + completedAt: new Date().toISOString(), + }, + }) + + // Cleanup metadata for succeeded batches only + if (succeededBatchIds.length > 0) { + await payload.delete({ + collection: BULK_EMBEDDINGS_INPUT_METADATA_SLUG, + where: { batch: { in: succeededBatchIds } }, + }) + } + + // Call onError if there were any failures + if (callbacks.onError && (totalFailed > 0 || !hasAnySucceeded)) { + await callbacks.onError({ + providerBatchIds, + error: new Error( + totalFailed > 0 + ? `${totalFailed} chunk(s) failed during completion` + : 'All batches failed', + ), + failedChunkData: allFailedChunkData.length > 0 ? allFailedChunkData : undefined, + failedChunkCount: totalFailed > 0 ? totalFailed : undefined, + }) + } + + return { finalized: true, status: runStatus } +} + export const createPrepareBulkEmbeddingTask = ({ knowledgePools, pollOrCompleteQueueName, + prepareBulkEmbedQueueName, }: { knowledgePools: Record pollOrCompleteQueueName?: string + prepareBulkEmbedQueueName?: string }): TaskConfig => { const task: TaskConfig = { slug: 'payloadcms-vectorize:prepare-bulk-embedding', @@ -104,7 +257,7 @@ export const createPrepareBulkEmbeddingTask = ({ throw new Error('[payloadcms-vectorize] bulk embed runId is required') } const payload = req.payload - const { poolName, dynamicConfig } = await loadRunAndConfig({ + const { run, poolName, dynamicConfig } = await loadRunAndConfig({ payload, runId: input.runId, knowledgePools, @@ -113,7 +266,76 @@ export const createPrepareBulkEmbeddingTask = ({ const callbacks = dynamicConfig.embeddingConfig.bulkEmbeddingsFns! const embeddingVersion = dynamicConfig.embeddingConfig.version - // Find baseline run information + // ============================================= + // COORDINATOR MODE: no collectionSlug in input + // ============================================= + if (!input.collectionSlug) { + // Queue one worker per collection + const collectionSlugs = Object.keys(dynamicConfig.collections) + if (collectionSlugs.length === 0) { + // No collections configured - mark run as succeeded + await payload.update({ + id: input.runId, + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { + status: 'succeeded', + totalBatches: 0, + inputs: 0, + succeeded: 0, + failed: 0, + completedAt: new Date().toISOString(), + }, + }) + return { output: { runId: input.runId, status: 'succeeded', batchCount: 0 } } + } + + for (const collectionSlug of collectionSlugs) { + await payload.jobs.queue<'payloadcms-vectorize:prepare-bulk-embedding'>({ + task: 'payloadcms-vectorize:prepare-bulk-embedding', + input: { runId: input.runId, collectionSlug, page: 1 }, + req, + ...(prepareBulkEmbedQueueName ? { queue: prepareBulkEmbedQueueName } : {}), + }) + } + + // Update run status + await payload.update({ + id: input.runId, + collection: BULK_EMBEDDINGS_RUNS_SLUG, + data: { + status: 'running', + submittedAt: new Date().toISOString(), + }, + }) + + return { output: { runId: input.runId, status: 'coordinated' } } + } + + // ============================================= + // WORKER MODE: collectionSlug is set + // ============================================= + + // Early exit if run is already terminal + if (TERMINAL_STATUSES.has((run as any).status)) { + return { output: { runId: input.runId, status: (run as any).status } } + } + + const collectionSlug = input.collectionSlug + const collectionConfig = dynamicConfig.collections[collectionSlug] + if (!collectionConfig) { + throw new Error( + `[payloadcms-vectorize] collection "${collectionSlug}" not found in pool "${poolName}"`, + ) + } + + const DEFAULT_BATCH_LIMIT = 1000 + const batchLimit = + collectionConfig.batchLimit && collectionConfig.batchLimit > 0 + ? collectionConfig.batchLimit + : DEFAULT_BATCH_LIMIT + const page = input.page ?? 1 + + // Compute baseline/version for filtering const latestSucceededRun = await payload.find({ collection: BULK_EMBEDDINGS_RUNS_SLUG, where: { @@ -126,28 +348,56 @@ export const createPrepareBulkEmbeddingTask = ({ limit: 1, sort: '-completedAt', }) - const baselineRun = (latestSucceededRun as any)?.docs?.[0] const baselineVersion: string | undefined = baselineRun?.embeddingVersion const lastBulkCompletedAt: string | undefined = baselineRun?.completedAt const versionMismatch = baselineVersion !== undefined && baselineVersion !== embeddingVersion + const includeAll = versionMismatch || !baselineRun + const lastCompletedAtDate = lastBulkCompletedAt ? new Date(lastBulkCompletedAt) : undefined + + // Build where clause for this collection + const where = includeAll + ? undefined + : lastCompletedAtDate + ? { updatedAt: { greater_than: lastCompletedAtDate.toISOString() } } + : undefined - // Stream missing embeddings and create batches - let result + // STEP 1: Query the page + const queryResult = await payload.find({ + collection: collectionSlug, + where, + limit: batchLimit, + page, + sort: 'id', + }) + + // STEP 2: If there's a next page, queue continuation BEFORE processing + if (queryResult.nextPage) { + await payload.jobs.queue<'payloadcms-vectorize:prepare-bulk-embedding'>({ + task: 'payloadcms-vectorize:prepare-bulk-embedding', + input: { runId: input.runId, collectionSlug, page: queryResult.nextPage }, + req, + ...(prepareBulkEmbedQueueName ? { queue: prepareBulkEmbedQueueName } : {}), + }) + } + + // STEP 3: Process this page's docs + let totalResult: { batchCount: number; totalInputs: number; batchIds: (string | number)[] } try { - result = await streamAndBatchMissingEmbeddings({ + totalResult = await streamAndBatchDocs({ payload, runId: input.runId, poolName, - dynamicConfig, + collectionSlug, + collectionConfig, + docs: (queryResult.docs || []) as Array, embeddingVersion, - lastBulkCompletedAt, - versionMismatch, - hasBaseline: Boolean(baselineRun), + includeAll, + lastCompletedAtDate, addChunk: callbacks.addChunk, }) } catch (error) { - // Ingestion failed (e.g., validation error) - mark run as failed + // Ingestion failed - mark run as failed const errorMessage = (error as Error).message || String(error) await payload.update({ id: input.runId, @@ -158,49 +408,48 @@ export const createPrepareBulkEmbeddingTask = ({ completedAt: new Date().toISOString(), }, }) - // Re-throw so Payload's job system marks the job as failed throw error } - if (result.totalInputs === 0) { - // No inputs to process - mark run as succeeded + + // STEP 4: Accumulate counts on run record + if (totalResult.totalInputs > 0) { + const currentRun = await payload.findByID({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + id: input.runId, + }) + const existingInputs = (currentRun as any).inputs ?? 0 + const existingBatches = (currentRun as any).totalBatches ?? 0 await payload.update({ id: input.runId, collection: BULK_EMBEDDINGS_RUNS_SLUG, data: { - status: 'succeeded', - totalBatches: 0, - inputs: 0, - succeeded: 0, - failed: 0, - completedAt: new Date().toISOString(), + totalBatches: existingBatches + totalResult.batchCount, + inputs: existingInputs + totalResult.totalInputs, }, }) - return { output: { runId: input.runId, status: 'succeeded', batchCount: 0 } } } - // Update run with batch count and total inputs - await payload.update({ - id: input.runId, - collection: BULK_EMBEDDINGS_RUNS_SLUG, - data: { - status: 'running', - totalBatches: result.batchCount, - inputs: result.totalInputs, - submittedAt: new Date().toISOString(), - }, - }) + // STEP 5: Queue per-batch polling tasks + for (const batchId of totalResult.batchIds) { + await payload.jobs.queue<'payloadcms-vectorize:poll-or-complete-single-batch'>({ + task: 'payloadcms-vectorize:poll-or-complete-single-batch', + input: { runId: input.runId, batchId: String(batchId) }, + req, + ...(pollOrCompleteQueueName ? { queue: pollOrCompleteQueueName } : {}), + }) + } - // Queue the poll task to monitor all batches - await payload.jobs.queue<'payloadcms-vectorize:poll-or-complete-bulk-embedding'>({ - task: 'payloadcms-vectorize:poll-or-complete-bulk-embedding', - input: { runId: input.runId }, - req, - ...(pollOrCompleteQueueName ? { queue: pollOrCompleteQueueName } : {}), - }) + // If this worker produced 0 batches and has no continuation, try to finalize. + // finalizeRunIfComplete is idempotent: if other workers created batches that + // aren't terminal yet, it returns { finalized: false } and the polling tasks + // will handle finalization later. + if (totalResult.batchCount === 0 && !queryResult.nextPage) { + await finalizeRunIfComplete({ payload, runId: input.runId, poolName, callbacks }) + } return { - output: { runId: input.runId, status: 'prepared', batchCount: result.batchCount }, + output: { runId: input.runId, status: 'prepared', batchCount: totalResult.batchCount }, } }, } @@ -208,277 +457,114 @@ export const createPrepareBulkEmbeddingTask = ({ return task } -export const createPollOrCompleteBulkEmbeddingTask = ({ +export const createPollOrCompleteSingleBatchTask = ({ knowledgePools, pollOrCompleteQueueName, }: { knowledgePools: Record pollOrCompleteQueueName?: string -}): TaskConfig => { - const task: TaskConfig = { - slug: 'payloadcms-vectorize:poll-or-complete-bulk-embedding', +}): TaskConfig => { + const task: TaskConfig = { + slug: 'payloadcms-vectorize:poll-or-complete-single-batch', handler: async ({ input, req, - }): Promise> => { - if (!input?.runId) { - throw new Error('[payloadcms-vectorize] bulk embed runId is required') + }): Promise> => { + if (!input?.runId || !input?.batchId) { + throw new Error('[payloadcms-vectorize] single batch task requires runId and batchId') } + const { runId, batchId } = input const payload = req.payload const { run, poolName, dynamicConfig } = await loadRunAndConfig({ payload, - runId: input.runId, + runId, knowledgePools, }) const callbacks = dynamicConfig.embeddingConfig.bulkEmbeddingsFns! - // Check if run is already terminal - const currentStatus = (run as any).status - if (TERMINAL_STATUSES.has(currentStatus)) { - return { output: { runId: input.runId, status: currentStatus } } + // Early exit if run is already terminal + if (TERMINAL_STATUSES.has((run as any).status)) { + return { output: { runId, batchId, status: (run as any).status } } } - // Load all batches for this run with pagination to handle >1000 batches - // Convert runId to number for postgres relationship queries - const runIdNum = parseInt(input.runId, 10) - const batches: any[] = [] - let batchPage = 1 - const batchLimit = 100 // Smaller pages for better memory management - - while (true) { - const batchesResult = await payload.find({ - collection: BULK_EMBEDDINGS_BATCHES_SLUG, - where: { run: { equals: runIdNum } }, - limit: batchLimit, - page: batchPage, - sort: 'batchIndex', - }) - const pageDocs = (batchesResult as any)?.docs || [] - batches.push(...pageDocs) + // Load this specific batch + const batch = await payload.findByID({ + collection: BULK_EMBEDDINGS_BATCHES_SLUG, + id: batchId, + }) - const totalPages = (batchesResult as any)?.totalPages ?? batchPage - if (batchPage >= totalPages || pageDocs.length === 0) break - batchPage++ + // If batch is already terminal, just check if run can be finalized + if (TERMINAL_STATUSES.has((batch as any).status)) { + await finalizeRunIfComplete({ payload, runId, poolName, callbacks }) + return { output: { runId, batchId, status: (batch as any).status } } } - if (batches.length === 0) { - // No batches found - this shouldn't happen but handle gracefully - await payload.update({ - id: input.runId, - collection: BULK_EMBEDDINGS_RUNS_SLUG, - data: { - status: 'failed', - error: 'No batches found for run', - completedAt: new Date().toISOString(), - }, + // Poll and complete this single batch + try { + const completionResult = await pollAndCompleteSingleBatch({ + payload, + runId, + poolName, + batch, + callbacks, }) - return { output: { runId: input.runId, status: 'failed' } } - } - - // Poll each non-terminal batch and complete succeeded ones incrementally - let anyRunning = false - let totalSucceeded = 0 - let totalFailed = 0 - const allFailedChunkData: FailedChunkData[] = [] - const batchStatuses = new Map() // Track batch statuses as we process - - // Initialize with current statuses - for (const batch of batches) { - batchStatuses.set(String(batch.id), batch.status as string) - // Accumulate counts from already completed batches - if (TERMINAL_STATUSES.has(batch.status as string)) { - if (batch.status === 'succeeded') { - totalSucceeded += batch.succeededCount || 0 - totalFailed += batch.failedCount || 0 - } - } - } - - for (const batch of batches) { - const batchStatus = batchStatuses.get(String(batch.id)) as string - - // Skip batches that are already completed - if (TERMINAL_STATUSES.has(batchStatus)) { - continue - } - - // Poll batch and complete if succeeded (streams embeddings via onChunk callback) - try { - const completionResult = await pollAndCompleteSingleBatch({ - payload, - runId: input.runId, - poolName, - batch, - callbacks, - }) - - // Update batch status and counts - await payload.update({ - id: batch.id, - collection: BULK_EMBEDDINGS_BATCHES_SLUG, - data: { - status: completionResult.status, - error: completionResult.error, - ...(TERMINAL_STATUSES.has(completionResult.status) - ? { completedAt: new Date().toISOString() } - : {}), - ...(completionResult.status === 'succeeded' - ? { - succeededCount: completionResult.succeededCount, - failedCount: completionResult.failedCount, - } - : {}), - }, - }) - - // Track the new status - batchStatuses.set(String(batch.id), completionResult.status) - - // Accumulate counts from newly succeeded batches - if (completionResult.status === 'succeeded') { - totalSucceeded += completionResult.succeededCount - totalFailed += completionResult.failedCount - allFailedChunkData.push(...completionResult.failedChunkData) - } - - // Track if still running (queued or running) - if (completionResult.status === 'queued' || completionResult.status === 'running') { - anyRunning = true - } - // Failed/canceled batches - leave them, can be re-run later - } catch (error) { - // Completion failed - mark batch as failed - const errorMessage = (error as Error).message || String(error) - await payload.update({ - id: batch.id, - collection: BULK_EMBEDDINGS_BATCHES_SLUG, - data: { - status: 'failed', - error: `Completion failed: ${errorMessage}`, - completedAt: new Date().toISOString(), - }, - }) - batchStatuses.set(String(batch.id), 'failed') - } - } - // Check if all batches are complete - const allBatchesComplete = Array.from(batchStatuses.values()).every((status) => - TERMINAL_STATUSES.has(status), - ) - - if (allBatchesComplete) { - // All batches are done - finalize the run - const hasAnySucceeded = Array.from(batchStatuses.values()).some( - (status) => status === 'succeeded', - ) - - // Check if any batches are failed (not just canceled) - we keep metadata for potential retries - const hasFailedBatches = Array.from(batchStatuses.values()).some( - (status) => status === 'failed', - ) + // Update batch status and counts await payload.update({ - id: input.runId, - collection: BULK_EMBEDDINGS_RUNS_SLUG, + id: batchId, + collection: BULK_EMBEDDINGS_BATCHES_SLUG, data: { - status: hasAnySucceeded ? 'succeeded' : 'failed', - succeeded: totalSucceeded, - failed: totalFailed, - failedChunkData: allFailedChunkData.length > 0 ? allFailedChunkData : undefined, - completedAt: new Date().toISOString(), + status: completionResult.status, + error: completionResult.error, + ...(TERMINAL_STATUSES.has(completionResult.status) + ? { completedAt: new Date().toISOString() } + : {}), + ...(completionResult.status === 'succeeded' + ? { + succeededCount: completionResult.succeededCount, + failedCount: completionResult.failedCount, + failedChunkData: + completionResult.failedChunkData.length > 0 + ? completionResult.failedChunkData + : undefined, + } + : {}), }, }) - // Cleanup metadata for succeeded batches only - // Keep metadata for failed batches to allow retry functionality - const succeededBatchIds = Array.from(batchStatuses.entries()) - .filter(([_, status]) => status === 'succeeded') - .map(([id, _]) => parseInt(id, 10)) - - if (succeededBatchIds.length > 0) { - await payload.delete({ - collection: BULK_EMBEDDINGS_INPUT_METADATA_SLUG, - where: { batch: { in: succeededBatchIds } }, - }) - } - - // Call onError if there were any failures - if (callbacks.onError && (totalFailed > 0 || !hasAnySucceeded)) { - const providerBatchIds = batches.map((b: any) => b.providerBatchId as string) - await callbacks.onError({ - providerBatchIds, - error: new Error( - totalFailed > 0 - ? `${totalFailed} chunk(s) failed during completion` - : 'All batches failed', - ), - failedChunkData: allFailedChunkData.length > 0 ? allFailedChunkData : undefined, - failedChunkCount: totalFailed > 0 ? totalFailed : undefined, - }) - } - - return { - output: { - runId: input.runId, - status: hasAnySucceeded ? 'succeeded' : 'failed', - }, + // If batch is now terminal, check if run should be finalized + if (TERMINAL_STATUSES.has(completionResult.status)) { + await finalizeRunIfComplete({ payload, runId, poolName, callbacks }) + return { output: { runId, batchId, status: completionResult.status } } } - } - // If still running, requeue this task - if (anyRunning) { - await payload.jobs.queue<'payloadcms-vectorize:poll-or-complete-bulk-embedding'>({ - task: 'payloadcms-vectorize:poll-or-complete-bulk-embedding', - input: { runId: input.runId }, + // Still running - re-queue self with polling delay + await payload.jobs.queue<'payloadcms-vectorize:poll-or-complete-single-batch'>({ + task: 'payloadcms-vectorize:poll-or-complete-single-batch', + input: { runId, batchId }, req, ...(pollOrCompleteQueueName ? { queue: pollOrCompleteQueueName } : {}), }) - return { output: { runId: input.runId, status: 'polling' } } - } - - // Edge case: allBatchesComplete is false but anyRunning is false - // This happens when all batches are in 'canceled' or 'failed' status but we didn't detect it above - // Check if all batches are canceled - const allCanceled = Array.from(batchStatuses.values()).every( - (status) => status === 'canceled', - ) - if (allCanceled) { + return { output: { runId, batchId, status: completionResult.status } } + } catch (error) { + // Batch processing failed - mark batch as failed + const errorMessage = (error as Error).message || String(error) await payload.update({ - id: input.runId, - collection: BULK_EMBEDDINGS_RUNS_SLUG, + id: batchId, + collection: BULK_EMBEDDINGS_BATCHES_SLUG, data: { - status: 'canceled', + status: 'failed', + error: `Completion failed: ${errorMessage}`, completedAt: new Date().toISOString(), }, }) - return { output: { runId: input.runId, status: 'canceled' } } + // Check if this was the last batch to complete + await finalizeRunIfComplete({ payload, runId, poolName, callbacks }) + return { output: { runId, batchId, status: 'failed' } } } - - // Fallback: mark as failed with diagnostic info - const statusCounts = Array.from(batchStatuses.values()).reduce( - (acc, status) => { - acc[status] = (acc[status] || 0) + 1 - return acc - }, - {} as Record, - ) - payload.logger.warn( - `[payloadcms-vectorize] Run ${input.runId} reached unexpected state. Batch statuses: ${JSON.stringify(statusCounts)}`, - ) - - await payload.update({ - id: input.runId, - collection: BULK_EMBEDDINGS_RUNS_SLUG, - data: { - status: 'failed', - error: `Run reached unexpected state. Batch statuses: ${JSON.stringify(statusCounts)}`, - completedAt: new Date().toISOString(), - }, - }) - return { output: { runId: input.runId, status: 'failed' } } }, } @@ -486,150 +572,123 @@ export const createPollOrCompleteBulkEmbeddingTask = ({ } /** - * Stream through missing embeddings, calling addChunk for each. + * Process pre-fetched docs from a single collection, calling addChunk for each chunk. * User controls batching via addChunk return value. * * Single-pass approach using async generator to yield chunks sequentially. * This avoids the need for a pre-counting pass while correctly determining isLastChunk. */ -async function streamAndBatchMissingEmbeddings(args: { +async function streamAndBatchDocs(args: { payload: Payload runId: string poolName: KnowledgePoolName - dynamicConfig: KnowledgePoolDynamicConfig + collectionSlug: string + collectionConfig: CollectionVectorizeOption + docs: Array embeddingVersion: string - lastBulkCompletedAt?: string - versionMismatch: boolean - hasBaseline: boolean + includeAll: boolean + lastCompletedAtDate?: Date addChunk: (args: { chunk: BulkEmbeddingInput isLastChunk: boolean }) => Promise -}): Promise<{ batchCount: number; totalInputs: number }> { +}): Promise<{ batchCount: number; totalInputs: number; batchIds: (string | number)[] }> { const { payload, runId, poolName, - dynamicConfig, + collectionSlug, + collectionConfig, + docs, embeddingVersion, - lastBulkCompletedAt, - versionMismatch, - hasBaseline, + includeAll, + lastCompletedAtDate, addChunk, } = args - const includeAll = versionMismatch || !hasBaseline - const lastCompletedAtDate = lastBulkCompletedAt ? new Date(lastBulkCompletedAt) : undefined - const collectionSlugs = Object.keys(dynamicConfig.collections) - - // Async generator that yields chunks one at a time + // Async generator that yields chunks one at a time from pre-fetched docs async function* generateChunks(): AsyncGenerator { - for (const collectionSlug of collectionSlugs) { - const collectionConfig = dynamicConfig.collections[collectionSlug] - if (!collectionConfig) continue + const toKnowledgePool = collectionConfig.toKnowledgePool + + for (const doc of docs) { + // If !includeAll, we still need to check if document has current embedding + // (can't filter this in the where clause since it's a cross-collection check) + if (!includeAll && !lastCompletedAtDate) { + const hasCurrentEmbedding = await docHasEmbeddingVersion({ + payload, + poolName, + sourceCollection: collectionSlug, + docId: String(doc.id), + embeddingVersion, + }) + if (hasCurrentEmbedding) continue + } - const toKnowledgePool = collectionConfig.toKnowledgePool - const limit = 50 + // Check if document should be embedded + if (collectionConfig.shouldEmbedFn) { + const shouldEmbed = await collectionConfig.shouldEmbedFn(doc, payload) + if (!shouldEmbed) continue + } - // Build where clause: filter by updatedAt if we have lastBulkCompletedAt and !includeAll - const where = includeAll - ? undefined - : lastCompletedAtDate - ? { - updatedAt: { - greater_than: lastCompletedAtDate.toISOString(), - }, - } - : undefined + const chunkData = await toKnowledgePool(doc, payload) - let res: PaginatedDocs | undefined = await payload.find({ - collection: collectionSlug, - where, - limit, - }) - do { - const docs = res?.docs || [] - if (!docs.length) break - - for (const doc of docs) { - // If !includeAll, we still need to check if document has current embedding - // (can't filter this in the where clause since it's a cross-collection check) - if (!includeAll && !lastCompletedAtDate) { - const hasCurrentEmbedding = await docHasEmbeddingVersion({ - payload, - poolName, - sourceCollection: collectionSlug, - docId: String(doc.id), - embeddingVersion, - }) - if (hasCurrentEmbedding) continue - } - - // Check if document should be embedded - if (collectionConfig.shouldEmbedFn) { - const shouldEmbed = await collectionConfig.shouldEmbedFn(doc, payload) - if (!shouldEmbed) continue - } - - const chunkData = await toKnowledgePool(doc, payload) - - // Validate chunks (same validation as real-time ingestion) - const invalidEntries = chunkData - .map((entry, idx) => { - if (!entry || typeof entry !== 'object') return idx - if (typeof entry.chunk !== 'string') return idx - return null - }) - .filter((idx): idx is number => idx !== null) - - if (invalidEntries.length > 0) { - throw new Error( - `[payloadcms-vectorize] toKnowledgePool returned ${invalidEntries.length} invalid entr${ - invalidEntries.length === 1 ? 'y' : 'ies' - } for document ${doc.id} in collection "${collectionSlug}". Each entry must be an object with a "chunk" string. Invalid indices: ${invalidEntries.join( - ', ', - )}`, - ) - } - - // Yield valid chunks - for (let idx = 0; idx < chunkData.length; idx++) { - const chunkEntry = chunkData[idx] - const { chunk, ...extensionFields } = chunkEntry - - yield { - id: `${collectionSlug}:${doc.id}:${idx}`, - text: chunk, - metadata: { - sourceCollection: collectionSlug, - docId: String(doc.id), - chunkIndex: idx, - embeddingVersion, - extensionFields, - }, - } - } + // Validate chunks (same validation as real-time ingestion) + const invalidEntries = chunkData + .map((entry, idx) => { + if (!entry || typeof entry !== 'object') return idx + if (typeof entry.chunk !== 'string') return idx + return null + }) + .filter((idx): idx is number => idx !== null) + + if (invalidEntries.length > 0) { + throw new Error( + `[payloadcms-vectorize] toKnowledgePool returned ${invalidEntries.length} invalid entr${ + invalidEntries.length === 1 ? 'y' : 'ies' + } for document ${doc.id} in collection "${collectionSlug}". Each entry must be an object with a "chunk" string. Invalid indices: ${invalidEntries.join( + ', ', + )}`, + ) + } + + // Yield valid chunks + for (let idx = 0; idx < chunkData.length; idx++) { + const chunkEntry = chunkData[idx] + const { chunk, ...extensionFields } = chunkEntry + + yield { + id: `${collectionSlug}:${doc.id}:${idx}`, + text: chunk, + metadata: { + sourceCollection: collectionSlug, + docId: String(doc.id), + chunkIndex: idx, + embeddingVersion, + extensionFields, + }, } - } while ( - (res = res.nextPage - ? await payload.find({ - collection: collectionSlug, - where, - limit, - page: res.nextPage, - }) - : undefined) - ) + } } } + + // Determine starting batchIndex from existing batches for this run + const runIdNum = parseInt(runId, 10) + const maxBatchResult = await payload.find({ + collection: BULK_EMBEDDINGS_BATCHES_SLUG, + where: { run: { equals: runIdNum } }, + sort: '-batchIndex', + limit: 1, + }) + let batchIndex = + maxBatchResult.docs.length > 0 ? ((maxBatchResult.docs[0] as any).batchIndex ?? 0) + 1 : 0 + // Process chunks from generator - let batchIndex = 0 let totalInputs = 0 const pendingChunks: CollectedEmbeddingInput[] = [] const chunkIterator = generateChunks() - const runIdNum = parseInt(runId, 10) let currentBatchId: number | undefined = undefined + const batchIds: (string | number)[] = [] async function processChunk( chunk: CollectedEmbeddingInput, @@ -699,12 +758,13 @@ async function streamAndBatchMissingEmbeddings(args: { }) totalInputs += inputCount + batchIds.push(currentBatchId) batchIndex++ currentBatchId = undefined // Reset for next batch } } - // Process chunks from generator + // Process chunks from generator using look-ahead for isLastChunk let prevChunk: CollectedEmbeddingInput | undefined = undefined for await (const currentChunk of chunkIterator) { if (prevChunk) { @@ -716,7 +776,7 @@ async function streamAndBatchMissingEmbeddings(args: { await processChunk(prevChunk, true) } - return { batchCount: batchIndex, totalInputs } + return { batchCount: batchIds.length, totalInputs, batchIds } } /** diff --git a/src/types.ts b/src/types.ts index b517b87..bedab7c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -101,6 +101,10 @@ export type CollectionVectorizeOption = { shouldEmbedFn?: ShouldEmbedFn /** Function that converts a document to an array of chunks with optional extension field values */ toKnowledgePool: ToKnowledgePoolFn + /** Max documents to fetch from this collection per prepare job. + * Each page of results becomes a separate job linked to the next. + * Defaults to 1000 if not set. */ + batchLimit?: number } /** Knowledge pool name identifier */ @@ -186,7 +190,8 @@ export type PollBulkEmbeddingsResult = { export type AddChunkArgs = { /** The chunk to add */ chunk: BulkEmbeddingInput - /** True if this is the last chunk in the run */ + /** True if this is the last chunk in this job (forces flush). + * Note: may not be the last chunk in the entire run if batchLimit continuations are used. */ isLastChunk: boolean } @@ -242,8 +247,9 @@ export type BulkEmbeddingsFns = { * of them were submitted when you return a BatchSubmission. * * **About `isLastChunk`:** - * - `isLastChunk=true` indicates this is the final chunk in the run - * - Use this to flush any remaining accumulated chunks before the run completes + * - `isLastChunk=true` indicates this is the final chunk in this job + * - Use this to flush any remaining accumulated chunks before the job completes + * - When `batchLimit` is set, each job's last chunk gets `isLastChunk=true` (not just the run's last chunk) * - The plugin uses this only to know when to stop iterating, not to determine which chunks were submitted * * **Example flow when chunk would exceed limit:** diff --git a/vitest.config.js b/vitest.config.js index 9d7b479..99a6e15 100644 --- a/vitest.config.js +++ b/vitest.config.js @@ -21,12 +21,11 @@ export default defineConfig(() => { hookTimeout: 30_000, testTimeout: 30_000, exclude: ['**/e2e.spec.{ts,js}', '**/node_modules/**'], - // Run test files sequentially to avoid global state interference - // (embeddingsTables map and Payload instance caching) + // Each test file gets its own forked process so memory is fully + // reclaimed between files (prevents OOM on CI). + pool: 'forks', + // Run test files sequentially to avoid DB / global-state interference. fileParallelism: false, - // Disable parallel test execution within files as well - //threads: false, - //maxConcurrency: 1, }, } }) From d5f29f23db72a42f65c6bd2cacdcef9d09169f8b Mon Sep 17 00:00:00 2001 From: techiejd <62455039+techiejd@users.noreply.github.com> Date: Wed, 25 Feb 2026 21:15:45 +0700 Subject: [PATCH 03/10] chore: bump version to 0.6.0-beta.5 Co-Authored-By: Claude Opus 4.6 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 8a52a89..7641980 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "payloadcms-vectorize", - "version": "0.6.0-beta.4", + "version": "0.6.0-beta.5", "description": "A plugin to vectorize collections for RAG in Payload 3.0", "license": "MIT", "type": "module", From ac4f56503362d9b173da56bbf5b6b95082b47657 Mon Sep 17 00:00:00 2001 From: techiejd <62455039+techiejd@users.noreply.github.com> Date: Wed, 25 Feb 2026 21:54:46 +0700 Subject: [PATCH 04/10] fix: resolve 4 CI test failures from merge - chunkers.spec.ts: remove getPayload() call that crashes on dummy db, pass SanitizedConfig directly to chunkRichText - batchLimit.spec.ts: add missing dbAdapter (createMockAdapter) required by split_db_adapter architecture - extensionFieldsVectorSearch.spec.ts: pass adapter as second arg to createVectorSearchHandlers (new signature from split_db_adapter) - versionBump.spec.ts: destroy payload0 before creating payload1 to prevent cron worker race condition between two instances Co-Authored-By: Claude Opus 4.6 --- dev/specs/bulkEmbed/batchLimit.spec.ts | 2 + dev/specs/bulkEmbed/versionBump.spec.ts | 16 +++---- dev/specs/chunkers.spec.ts | 46 ++++++++----------- dev/specs/extensionFieldsVectorSearch.spec.ts | 2 +- 4 files changed, 27 insertions(+), 39 deletions(-) diff --git a/dev/specs/bulkEmbed/batchLimit.spec.ts b/dev/specs/bulkEmbed/batchLimit.spec.ts index db089ef..d9170a8 100644 --- a/dev/specs/bulkEmbed/batchLimit.spec.ts +++ b/dev/specs/bulkEmbed/batchLimit.spec.ts @@ -14,6 +14,7 @@ import { import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js' import { getVectorizedPayload, VectorizedPayload } from 'payloadcms-vectorize' import { expectGoodResult } from '../utils.vitest.js' +import { createMockAdapter } from 'helpers/mockAdapter.js' const DIMS = DEFAULT_DIMS const dbName = `bulk_batchlimit_${Date.now()}` @@ -27,6 +28,7 @@ describe('Bulk embed - batchLimit', () => { const built = await buildPayloadWithIntegration({ dbName, pluginOpts: { + dbAdapter: createMockAdapter(), knowledgePools: { default: { collections: { diff --git a/dev/specs/bulkEmbed/versionBump.spec.ts b/dev/specs/bulkEmbed/versionBump.spec.ts index 22a84c0..32c631e 100644 --- a/dev/specs/bulkEmbed/versionBump.spec.ts +++ b/dev/specs/bulkEmbed/versionBump.spec.ts @@ -27,15 +27,13 @@ const BULK_QUEUE_NAMES_1 = { describe('Bulk embed - version bump', () => { let post: any - const payloadsToDestroy: any[] = [] + let payload1: any = null beforeAll(async () => { await createTestDb({ dbName }) }) afterAll(async () => { - for (const p of payloadsToDestroy) { - await destroyPayload(p) - } + await destroyPayload(payload1) }) test('version bump re-embeds all even without updates', async () => { @@ -64,8 +62,6 @@ describe('Bulk embed - version bump', () => { }) ).payload - payloadsToDestroy.push(payload0) - post = await payload0.create({ collection: 'posts', data: { title: 'Old' } as any }) const vectorizedPayload0 = getVectorizedPayload(payload0) @@ -74,7 +70,6 @@ describe('Bulk embed - version bump', () => { await waitForBulkJobs(payload0) - // Debug: log embeddings after first run const embeds0 = await payload0.find({ collection: 'default', where: { docId: { equals: String(post.id) } }, @@ -82,7 +77,10 @@ describe('Bulk embed - version bump', () => { expect(embeds0.totalDocs).toBe(1) expect(embeds0.docs[0].embeddingVersion).toBe('old-version') - const payload1 = ( + // Destroy payload0 before creating payload1 to prevent cron worker interference + await destroyPayload(payload0) + + payload1 = ( await buildPayloadWithIntegration({ dbName, pluginOpts: { @@ -108,8 +106,6 @@ describe('Bulk embed - version bump', () => { }) ).payload - payloadsToDestroy.push(payload1) - const vectorizedPayload1 = getVectorizedPayload(payload1) const result1 = await vectorizedPayload1?.bulkEmbed({ knowledgePool: 'default' }) expectGoodResult(result1) diff --git a/dev/specs/chunkers.spec.ts b/dev/specs/chunkers.spec.ts index a6f87ad..cbf7848 100644 --- a/dev/specs/chunkers.spec.ts +++ b/dev/specs/chunkers.spec.ts @@ -1,8 +1,6 @@ import { describe, expect, test } from 'vitest' import { chunkText, chunkRichText } from 'helpers/chunkers.js' import { buildDummyConfig, getInitialMarkdownContent } from './constants.js' -import { destroyPayload } from './utils.js' -import { getPayload } from 'payload' describe('Chunkers', () => { test('textChunker', () => { @@ -21,35 +19,27 @@ describe('Chunkers', () => { }) const markdownContent = await getInitialMarkdownContent(cfg) - const thisPayload = await getPayload({ - config: cfg, - key: `chunkers-test-${Date.now()}`, - cron: true, - }) - - try { - const chunks = await chunkRichText(markdownContent, thisPayload) + // chunkRichText only needs the SanitizedConfig for Lexical editor setup, + // no real db required + const chunks = await chunkRichText(markdownContent, cfg) - expect(chunks.length).toBe(3) + expect(chunks.length).toBe(3) - // Intro chunk - expect(chunks[0]).toContain('Title') - expect(chunks[0]).toContain('Quote') - expect(chunks[0]).toContain('Paragraph 0') + // Intro chunk + expect(chunks[0]).toContain('Title') + expect(chunks[0]).toContain('Quote') + expect(chunks[0]).toContain('Paragraph 0') - // First H2 section - expect(chunks[1]).toContain('## Header 1') - expect(chunks[1]).toContain('Paragraph 1') - expect(chunks[1]).toContain('Paragraph 2') - expect(chunks[1]).toContain('Paragraph 3') + // First H2 section + expect(chunks[1]).toContain('## Header 1') + expect(chunks[1]).toContain('Paragraph 1') + expect(chunks[1]).toContain('Paragraph 2') + expect(chunks[1]).toContain('Paragraph 3') - // Second H2 section - expect(chunks[2]).toContain('## Header 2') - expect(chunks[2]).toContain('Paragraph 4') - expect(chunks[2]).toContain('Paragraph 5') - expect(chunks[2]).toContain('Paragraph 6') - } finally { - await destroyPayload(thisPayload) - } + // Second H2 section + expect(chunks[2]).toContain('## Header 2') + expect(chunks[2]).toContain('Paragraph 4') + expect(chunks[2]).toContain('Paragraph 5') + expect(chunks[2]).toContain('Paragraph 6') }) }) diff --git a/dev/specs/extensionFieldsVectorSearch.spec.ts b/dev/specs/extensionFieldsVectorSearch.spec.ts index c84b615..0eada79 100644 --- a/dev/specs/extensionFieldsVectorSearch.spec.ts +++ b/dev/specs/extensionFieldsVectorSearch.spec.ts @@ -135,7 +135,7 @@ describe('extensionFields', () => { const knowledgePools: Record = { default: defaultKnowledgePool, } - const searchHandler = createVectorSearchHandlers(knowledgePools).requestHandler + const searchHandler = createVectorSearchHandlers(knowledgePools, adapter).requestHandler const mockRequest = { json: async () => ({ query: testQuery, From 98aaf3262d9f9cb4b5dea106e817c373d91a4959 Mon Sep 17 00:00:00 2001 From: techiejd <62455039+techiejd@users.noreply.github.com> Date: Thu, 26 Feb 2026 01:14:21 +0700 Subject: [PATCH 05/10] Cleans a nit double line --- src/tasks/bulkEmbedAll.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/tasks/bulkEmbedAll.ts b/src/tasks/bulkEmbedAll.ts index 24a0056..4f77883 100644 --- a/src/tasks/bulkEmbedAll.ts +++ b/src/tasks/bulkEmbedAll.ts @@ -175,7 +175,6 @@ async function finalizeRunIfComplete(args: { page++ } - if (totalBatchCount === 0) { await payload.update({ id: runId, @@ -232,9 +231,7 @@ async function finalizeRunIfComplete(args: { await callbacks.onError({ providerBatchIds, error: new Error( - totalFailed > 0 - ? `${totalFailed} chunk(s) failed during completion` - : 'All batches failed', + totalFailed > 0 ? `${totalFailed} chunk(s) failed during completion` : 'All batches failed', ), failedChunkData: allFailedChunkData.length > 0 ? allFailedChunkData : undefined, failedChunkCount: totalFailed > 0 ? totalFailed : undefined, @@ -418,7 +415,6 @@ export const createPrepareBulkEmbeddingTask = ({ throw error } - // STEP 4: Accumulate counts on run record if (totalResult.totalInputs > 0) { const currentRun = await payload.findByID({ @@ -663,7 +659,6 @@ async function streamAndBatchDocs(args: { } } - // Determine starting batchIndex from existing batches for this run const runIdNum = parseInt(runId, 10) const maxBatchResult = await payload.find({ From 2d02bc6ed6e7184a6f8508e0ba717860b5114c39 Mon Sep 17 00:00:00 2001 From: techiejd <62455039+techiejd@users.noreply.github.com> Date: Thu, 26 Feb 2026 01:39:41 +0700 Subject: [PATCH 06/10] Undoes a weird test fix done by the bot --- dev/specs/bulkEmbed/versionBump.spec.ts | 44 +++++++++++++++---------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/dev/specs/bulkEmbed/versionBump.spec.ts b/dev/specs/bulkEmbed/versionBump.spec.ts index 32c631e..d00564f 100644 --- a/dev/specs/bulkEmbed/versionBump.spec.ts +++ b/dev/specs/bulkEmbed/versionBump.spec.ts @@ -16,24 +16,31 @@ import { createMockAdapter } from 'helpers/mockAdapter.js' const DIMS = DEFAULT_DIMS const dbName = `bulk_version_${Date.now()}` -// Use distinct bulk queue names per payload instance so that -// the second payload's cron worker handles its own bulk runs, -// instead of the first payload instance continuing to process them. -const BULK_QUEUE_NAMES_0 = BULK_QUEUE_NAMES -const BULK_QUEUE_NAMES_1 = { - prepareBulkEmbedQueueName: `${BULK_QUEUE_NAMES.prepareBulkEmbedQueueName}-v2`, - pollOrCompleteQueueName: `${BULK_QUEUE_NAMES.pollOrCompleteQueueName}-v2`, +// Use distinct queue names per payload instance so that each instance's +// cron worker only processes its own jobs and doesn't interfere with the other. +const QUEUE_NAMES_0 = { + realtimeQueueName: 'vectorize-realtime-v0', + bulkQueueNames: BULK_QUEUE_NAMES, +} +const QUEUE_NAMES_1 = { + realtimeQueueName: 'vectorize-realtime-v1', + bulkQueueNames: { + prepareBulkEmbedQueueName: `${BULK_QUEUE_NAMES.prepareBulkEmbedQueueName}-v2`, + pollOrCompleteQueueName: `${BULK_QUEUE_NAMES.pollOrCompleteQueueName}-v2`, + }, } describe('Bulk embed - version bump', () => { let post: any - let payload1: any = null + const payloadsToDestroy: any[] = [] beforeAll(async () => { await createTestDb({ dbName }) }) afterAll(async () => { - await destroyPayload(payload1) + for (const p of payloadsToDestroy) { + await destroyPayload(p) + } }) test('version bump re-embeds all even without updates', async () => { @@ -56,12 +63,15 @@ describe('Bulk embed - version bump', () => { }, }, }, - bulkQueueNames: BULK_QUEUE_NAMES_0, + realtimeQueueName: QUEUE_NAMES_0.realtimeQueueName, + bulkQueueNames: QUEUE_NAMES_0.bulkQueueNames, }, - key: `payload0`, + key: `payload0-${Date.now()}`, }) ).payload + payloadsToDestroy.push(payload0) + post = await payload0.create({ collection: 'posts', data: { title: 'Old' } as any }) const vectorizedPayload0 = getVectorizedPayload(payload0) @@ -77,10 +87,7 @@ describe('Bulk embed - version bump', () => { expect(embeds0.totalDocs).toBe(1) expect(embeds0.docs[0].embeddingVersion).toBe('old-version') - // Destroy payload0 before creating payload1 to prevent cron worker interference - await destroyPayload(payload0) - - payload1 = ( + const payload1 = ( await buildPayloadWithIntegration({ dbName, pluginOpts: { @@ -99,13 +106,16 @@ describe('Bulk embed - version bump', () => { }, }, }, - bulkQueueNames: BULK_QUEUE_NAMES_1, + realtimeQueueName: QUEUE_NAMES_1.realtimeQueueName, + bulkQueueNames: QUEUE_NAMES_1.bulkQueueNames, }, - key: `payload1`, + key: `payload1-${Date.now()}`, skipMigrations: true, }) ).payload + payloadsToDestroy.push(payload1) + const vectorizedPayload1 = getVectorizedPayload(payload1) const result1 = await vectorizedPayload1?.bulkEmbed({ knowledgePool: 'default' }) expectGoodResult(result1) From 3f463ea6f310e4b85b6e0dba136c3a9582a7cf16 Mon Sep 17 00:00:00 2001 From: techiejd <62455039+techiejd@users.noreply.github.com> Date: Thu, 26 Feb 2026 02:01:13 +0700 Subject: [PATCH 07/10] fix: harden versionBump test with sequential steps and queue isolation - Use test.step() to enforce sequential execution of each phase - Add separate realtimeQueueName per payload instance to prevent cron worker cross-talk on the default queue - Use dynamic Date.now() keys to avoid cached state interference - Increase waitForBulkJobs timeout to 30s for CI Co-Authored-By: Claude Opus 4.6 --- dev/specs/bulkEmbed/versionBump.spec.ts | 149 +++++++++++++----------- 1 file changed, 80 insertions(+), 69 deletions(-) diff --git a/dev/specs/bulkEmbed/versionBump.spec.ts b/dev/specs/bulkEmbed/versionBump.spec.ts index d00564f..2402938 100644 --- a/dev/specs/bulkEmbed/versionBump.spec.ts +++ b/dev/specs/bulkEmbed/versionBump.spec.ts @@ -44,90 +44,101 @@ describe('Bulk embed - version bump', () => { }) test('version bump re-embeds all even without updates', async () => { - const payload0 = ( - await buildPayloadWithIntegration({ - dbName, - pluginOpts: { - dbAdapter: createMockAdapter(), - knowledgePools: { - default: { - collections: { - posts: { - toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], + // STEP 1: Build payload0 with old-version and run bulk embed + const payload0 = await test.step('create payload0 and embed with old-version', async () => { + const p = ( + await buildPayloadWithIntegration({ + dbName, + pluginOpts: { + dbAdapter: createMockAdapter(), + knowledgePools: { + default: { + collections: { + posts: { + toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], + }, + }, + embeddingConfig: { + version: 'old-version', + queryFn: makeDummyEmbedQuery(DIMS), + bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), }, - }, - embeddingConfig: { - version: 'old-version', - queryFn: makeDummyEmbedQuery(DIMS), - bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), }, }, + realtimeQueueName: QUEUE_NAMES_0.realtimeQueueName, + bulkQueueNames: QUEUE_NAMES_0.bulkQueueNames, }, - realtimeQueueName: QUEUE_NAMES_0.realtimeQueueName, - bulkQueueNames: QUEUE_NAMES_0.bulkQueueNames, - }, - key: `payload0-${Date.now()}`, - }) - ).payload - - payloadsToDestroy.push(payload0) + key: `payload0-${Date.now()}`, + }) + ).payload + payloadsToDestroy.push(p) - post = await payload0.create({ collection: 'posts', data: { title: 'Old' } as any }) + post = await p.create({ collection: 'posts', data: { title: 'Old' } as any }) - const vectorizedPayload0 = getVectorizedPayload(payload0) - const result0 = await vectorizedPayload0?.bulkEmbed({ knowledgePool: 'default' }) - expectGoodResult(result0) + const vectorizedPayload0 = getVectorizedPayload(p) + const result0 = await vectorizedPayload0?.bulkEmbed({ knowledgePool: 'default' }) + expectGoodResult(result0) - await waitForBulkJobs(payload0) + await waitForBulkJobs(p, 30000) + return p + }) - const embeds0 = await payload0.find({ - collection: 'default', - where: { docId: { equals: String(post.id) } }, + // STEP 2: Verify payload0's embeddings before proceeding + await test.step('verify old-version embedding exists', async () => { + const embeds0 = await payload0.find({ + collection: 'default', + where: { docId: { equals: String(post.id) } }, + }) + expect(embeds0.totalDocs).toBe(1) + expect(embeds0.docs[0].embeddingVersion).toBe('old-version') }) - expect(embeds0.totalDocs).toBe(1) - expect(embeds0.docs[0].embeddingVersion).toBe('old-version') - const payload1 = ( - await buildPayloadWithIntegration({ - dbName, - pluginOpts: { - dbAdapter: createMockAdapter(), - knowledgePools: { - default: { - collections: { - posts: { - toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], + // STEP 3: Build payload1 with new-version and run bulk embed + const payload1 = await test.step('create payload1 and embed with new-version', async () => { + const p = ( + await buildPayloadWithIntegration({ + dbName, + pluginOpts: { + dbAdapter: createMockAdapter(), + knowledgePools: { + default: { + collections: { + posts: { + toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], + }, + }, + embeddingConfig: { + version: 'new-version', + queryFn: makeDummyEmbedQuery(DIMS), + bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), }, - }, - embeddingConfig: { - version: 'new-version', - queryFn: makeDummyEmbedQuery(DIMS), - bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), }, }, + realtimeQueueName: QUEUE_NAMES_1.realtimeQueueName, + bulkQueueNames: QUEUE_NAMES_1.bulkQueueNames, }, - realtimeQueueName: QUEUE_NAMES_1.realtimeQueueName, - bulkQueueNames: QUEUE_NAMES_1.bulkQueueNames, - }, - key: `payload1-${Date.now()}`, - skipMigrations: true, - }) - ).payload - - payloadsToDestroy.push(payload1) - - const vectorizedPayload1 = getVectorizedPayload(payload1) - const result1 = await vectorizedPayload1?.bulkEmbed({ knowledgePool: 'default' }) - expectGoodResult(result1) - - await waitForBulkJobs(payload1) - - const embeds1 = await payload1.find({ - collection: 'default', - where: { docId: { equals: String(post.id) } }, + key: `payload1-${Date.now()}`, + skipMigrations: true, + }) + ).payload + payloadsToDestroy.push(p) + + const vectorizedPayload1 = getVectorizedPayload(p) + const result1 = await vectorizedPayload1?.bulkEmbed({ knowledgePool: 'default' }) + expectGoodResult(result1) + + await waitForBulkJobs(p, 30000) + return p }) - expect(embeds1.totalDocs).toBe(1) - expect(embeds1.docs[0].embeddingVersion).toBe('new-version') + // STEP 4: Verify new-version replaced old-version + await test.step('verify new-version embedding replaced old', async () => { + const embeds1 = await payload1.find({ + collection: 'default', + where: { docId: { equals: String(post.id) } }, + }) + expect(embeds1.totalDocs).toBe(1) + expect(embeds1.docs[0].embeddingVersion).toBe('new-version') + }) }) }) From f5254ecd0cc0fdd0b3e9cf2a0cf0d917e5f44ad1 Mon Sep 17 00:00:00 2001 From: techiejd <62455039+techiejd@users.noreply.github.com> Date: Thu, 26 Feb 2026 09:15:05 +0700 Subject: [PATCH 08/10] fix: prevent waitForBulkJobs from returning prematurely MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit waitForBulkJobs could return early in the coordinator/worker fan-out pattern when there's a brief window with 0 pending jobs between job transitions. Now it also checks the bulk embeddings run status — only returns when both no pending jobs exist AND no runs are in queued/running state. Co-Authored-By: Claude Opus 4.6 --- dev/specs/utils.ts | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/dev/specs/utils.ts b/dev/specs/utils.ts index 71a87ee..8739b10 100644 --- a/dev/specs/utils.ts +++ b/dev/specs/utils.ts @@ -75,12 +75,39 @@ export async function waitForVectorizationJobs(payload: Payload, maxWaitMs = 100 await waitForTasks(payload, [TASK_SLUG_VECTORIZE], maxWaitMs) } -export async function waitForBulkJobs(payload: Payload, maxWaitMs = 10000) { - await waitForTasks( - payload, - [TASK_SLUG_PREPARE_BULK_EMBEDDING, TASK_SLUG_POLL_OR_COMPLETE_BULK_EMBEDDING], - maxWaitMs, +export async function waitForBulkJobs(payload: Payload, maxWaitMs = 10000, intervalMs = 250) { + const hasJobsCollection = (payload as any)?.config?.collections?.some( + (c: any) => c.slug === 'payload-jobs', ) + if (!hasJobsCollection) return + + const taskSlugs = [TASK_SLUG_PREPARE_BULK_EMBEDDING, TASK_SLUG_POLL_OR_COMPLETE_BULK_EMBEDDING] + const startTime = Date.now() + + while (Date.now() - startTime < maxWaitMs) { + const pending = await payload.find({ + collection: 'payload-jobs', + where: { + and: [{ taskSlug: { in: taskSlugs } }, { completedAt: { exists: false } }], + }, + }) + + if (pending.totalDocs === 0) { + // No pending jobs — but with coordinator/worker fan-out, new jobs may + // appear between the coordinator completing and the worker being queued. + // Double-check: if any bulk run is still non-terminal, keep waiting. + const activeRuns = await payload.find({ + collection: BULK_EMBEDDINGS_RUNS_SLUG, + where: { status: { in: ['queued', 'running'] } }, + limit: 1, + }) + if (activeRuns.totalDocs === 0) return + } + + await new Promise((resolve) => setTimeout(resolve, intervalMs)) + } + // One last grace wait + await new Promise((resolve) => setTimeout(resolve, 500)) } export const DEFAULT_DIMS = 8 From 59d67402a8ac7a1b4b703c69d25300d339c5cbbd Mon Sep 17 00:00:00 2001 From: techiejd <62455039+techiejd@users.noreply.github.com> Date: Thu, 26 Feb 2026 11:59:06 +0700 Subject: [PATCH 09/10] =?UTF-8?q?fix:=20remove=20test.step()=20=E2=80=94?= =?UTF-8?q?=20not=20available=20in=20Vitest?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test.step() is a Playwright API, not Vitest. Reverted to flat sequential code with phase comments for readability. Co-Authored-By: Claude Opus 4.6 --- dev/specs/bulkEmbed/versionBump.spec.ts | 148 +++++++++++------------- 1 file changed, 70 insertions(+), 78 deletions(-) diff --git a/dev/specs/bulkEmbed/versionBump.spec.ts b/dev/specs/bulkEmbed/versionBump.spec.ts index 2402938..ad64a89 100644 --- a/dev/specs/bulkEmbed/versionBump.spec.ts +++ b/dev/specs/bulkEmbed/versionBump.spec.ts @@ -44,101 +44,93 @@ describe('Bulk embed - version bump', () => { }) test('version bump re-embeds all even without updates', async () => { - // STEP 1: Build payload0 with old-version and run bulk embed - const payload0 = await test.step('create payload0 and embed with old-version', async () => { - const p = ( - await buildPayloadWithIntegration({ - dbName, - pluginOpts: { - dbAdapter: createMockAdapter(), - knowledgePools: { - default: { - collections: { - posts: { - toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], - }, - }, - embeddingConfig: { - version: 'old-version', - queryFn: makeDummyEmbedQuery(DIMS), - bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), + // Phase 1: Build payload0 with old-version and run bulk embed + const payload0 = ( + await buildPayloadWithIntegration({ + dbName, + pluginOpts: { + dbAdapter: createMockAdapter(), + knowledgePools: { + default: { + collections: { + posts: { + toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], }, }, + embeddingConfig: { + version: 'old-version', + queryFn: makeDummyEmbedQuery(DIMS), + bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), + }, }, - realtimeQueueName: QUEUE_NAMES_0.realtimeQueueName, - bulkQueueNames: QUEUE_NAMES_0.bulkQueueNames, }, - key: `payload0-${Date.now()}`, - }) - ).payload - payloadsToDestroy.push(p) + realtimeQueueName: QUEUE_NAMES_0.realtimeQueueName, + bulkQueueNames: QUEUE_NAMES_0.bulkQueueNames, + }, + key: `payload0-${Date.now()}`, + }) + ).payload - post = await p.create({ collection: 'posts', data: { title: 'Old' } as any }) + payloadsToDestroy.push(payload0) - const vectorizedPayload0 = getVectorizedPayload(p) - const result0 = await vectorizedPayload0?.bulkEmbed({ knowledgePool: 'default' }) - expectGoodResult(result0) + post = await payload0.create({ collection: 'posts', data: { title: 'Old' } as any }) - await waitForBulkJobs(p, 30000) - return p - }) + const vectorizedPayload0 = getVectorizedPayload(payload0) + const result0 = await vectorizedPayload0?.bulkEmbed({ knowledgePool: 'default' }) + expectGoodResult(result0) - // STEP 2: Verify payload0's embeddings before proceeding - await test.step('verify old-version embedding exists', async () => { - const embeds0 = await payload0.find({ - collection: 'default', - where: { docId: { equals: String(post.id) } }, - }) - expect(embeds0.totalDocs).toBe(1) - expect(embeds0.docs[0].embeddingVersion).toBe('old-version') + await waitForBulkJobs(payload0, 30000) + + // Phase 2: Verify payload0's embeddings before proceeding + const embeds0 = await payload0.find({ + collection: 'default', + where: { docId: { equals: String(post.id) } }, }) + expect(embeds0.totalDocs).toBe(1) + expect(embeds0.docs[0].embeddingVersion).toBe('old-version') - // STEP 3: Build payload1 with new-version and run bulk embed - const payload1 = await test.step('create payload1 and embed with new-version', async () => { - const p = ( - await buildPayloadWithIntegration({ - dbName, - pluginOpts: { - dbAdapter: createMockAdapter(), - knowledgePools: { - default: { - collections: { - posts: { - toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], - }, - }, - embeddingConfig: { - version: 'new-version', - queryFn: makeDummyEmbedQuery(DIMS), - bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), + // Phase 3: Build payload1 with new-version and run bulk embed + const payload1 = ( + await buildPayloadWithIntegration({ + dbName, + pluginOpts: { + dbAdapter: createMockAdapter(), + knowledgePools: { + default: { + collections: { + posts: { + toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], }, }, + embeddingConfig: { + version: 'new-version', + queryFn: makeDummyEmbedQuery(DIMS), + bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), + }, }, - realtimeQueueName: QUEUE_NAMES_1.realtimeQueueName, - bulkQueueNames: QUEUE_NAMES_1.bulkQueueNames, }, - key: `payload1-${Date.now()}`, - skipMigrations: true, - }) - ).payload - payloadsToDestroy.push(p) + realtimeQueueName: QUEUE_NAMES_1.realtimeQueueName, + bulkQueueNames: QUEUE_NAMES_1.bulkQueueNames, + }, + key: `payload1-${Date.now()}`, + skipMigrations: true, + }) + ).payload - const vectorizedPayload1 = getVectorizedPayload(p) - const result1 = await vectorizedPayload1?.bulkEmbed({ knowledgePool: 'default' }) - expectGoodResult(result1) + payloadsToDestroy.push(payload1) - await waitForBulkJobs(p, 30000) - return p - }) + const vectorizedPayload1 = getVectorizedPayload(payload1) + const result1 = await vectorizedPayload1?.bulkEmbed({ knowledgePool: 'default' }) + expectGoodResult(result1) - // STEP 4: Verify new-version replaced old-version - await test.step('verify new-version embedding replaced old', async () => { - const embeds1 = await payload1.find({ - collection: 'default', - where: { docId: { equals: String(post.id) } }, - }) - expect(embeds1.totalDocs).toBe(1) - expect(embeds1.docs[0].embeddingVersion).toBe('new-version') + await waitForBulkJobs(payload1, 30000) + + // Phase 4: Verify new-version replaced old-version + const embeds1 = await payload1.find({ + collection: 'default', + where: { docId: { equals: String(post.id) } }, }) + expect(embeds1.totalDocs).toBe(1) + expect(embeds1.docs[0].embeddingVersion).toBe('new-version') }) }) From d374465b3062f02e05e213407fe7ad59bcf094bd Mon Sep 17 00:00:00 2001 From: techiejd <62455039+techiejd@users.noreply.github.com> Date: Thu, 26 Feb 2026 12:51:21 +0700 Subject: [PATCH 10/10] fix: rewrite versionBump test with single Payload instance Instead of creating two Payload instances (which caused cron cross-talk, timeout, and queue isolation issues on CI), use one instance and mutate the knowledgePools config version between bulk embed runs. Tests the same code path (versionMismatch in streamAndBatchDocs) without the multi-instance fragility. Co-Authored-By: Claude Opus 4.6 --- dev/specs/bulkEmbed/versionBump.spec.ts | 122 ++++++++---------------- 1 file changed, 40 insertions(+), 82 deletions(-) diff --git a/dev/specs/bulkEmbed/versionBump.spec.ts b/dev/specs/bulkEmbed/versionBump.spec.ts index ad64a89..b60afbe 100644 --- a/dev/specs/bulkEmbed/versionBump.spec.ts +++ b/dev/specs/bulkEmbed/versionBump.spec.ts @@ -12,121 +12,79 @@ import { makeDummyEmbedQuery } from 'helpers/embed.js' import { getVectorizedPayload } from '../../../src/types.js' import { expectGoodResult } from '../utils.vitest.js' import { createMockAdapter } from 'helpers/mockAdapter.js' +import type { Payload } from 'payload' const DIMS = DEFAULT_DIMS const dbName = `bulk_version_${Date.now()}` -// Use distinct queue names per payload instance so that each instance's -// cron worker only processes its own jobs and doesn't interfere with the other. -const QUEUE_NAMES_0 = { - realtimeQueueName: 'vectorize-realtime-v0', - bulkQueueNames: BULK_QUEUE_NAMES, -} -const QUEUE_NAMES_1 = { - realtimeQueueName: 'vectorize-realtime-v1', - bulkQueueNames: { - prepareBulkEmbedQueueName: `${BULK_QUEUE_NAMES.prepareBulkEmbedQueueName}-v2`, - pollOrCompleteQueueName: `${BULK_QUEUE_NAMES.pollOrCompleteQueueName}-v2`, - }, -} - describe('Bulk embed - version bump', () => { - let post: any - const payloadsToDestroy: any[] = [] + let payload: Payload + let knowledgePools: any + beforeAll(async () => { await createTestDb({ dbName }) - }) - afterAll(async () => { - for (const p of payloadsToDestroy) { - await destroyPayload(p) + knowledgePools = { + default: { + collections: { + posts: { + toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], + }, + }, + embeddingConfig: { + version: 'old-version', + queryFn: makeDummyEmbedQuery(DIMS), + bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), + }, + }, } - }) - test('version bump re-embeds all even without updates', async () => { - // Phase 1: Build payload0 with old-version and run bulk embed - const payload0 = ( + payload = ( await buildPayloadWithIntegration({ dbName, pluginOpts: { dbAdapter: createMockAdapter(), - knowledgePools: { - default: { - collections: { - posts: { - toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], - }, - }, - embeddingConfig: { - version: 'old-version', - queryFn: makeDummyEmbedQuery(DIMS), - bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), - }, - }, - }, - realtimeQueueName: QUEUE_NAMES_0.realtimeQueueName, - bulkQueueNames: QUEUE_NAMES_0.bulkQueueNames, + knowledgePools, + bulkQueueNames: BULK_QUEUE_NAMES, }, - key: `payload0-${Date.now()}`, + key: `version-bump-${Date.now()}`, }) ).payload + }) - payloadsToDestroy.push(payload0) + afterAll(async () => { + await destroyPayload(payload) + }) - post = await payload0.create({ collection: 'posts', data: { title: 'Old' } as any }) + test('version bump re-embeds all even without updates', async () => { + // Phase 1: Bulk embed with old-version + const post = await payload.create({ collection: 'posts', data: { title: 'Old' } as any }) - const vectorizedPayload0 = getVectorizedPayload(payload0) - const result0 = await vectorizedPayload0?.bulkEmbed({ knowledgePool: 'default' }) + const vp = getVectorizedPayload(payload) + const result0 = await vp?.bulkEmbed({ knowledgePool: 'default' }) expectGoodResult(result0) - await waitForBulkJobs(payload0, 30000) + await waitForBulkJobs(payload, 30000) - // Phase 2: Verify payload0's embeddings before proceeding - const embeds0 = await payload0.find({ + const embeds0 = await payload.find({ collection: 'default', where: { docId: { equals: String(post.id) } }, }) expect(embeds0.totalDocs).toBe(1) expect(embeds0.docs[0].embeddingVersion).toBe('old-version') - // Phase 3: Build payload1 with new-version and run bulk embed - const payload1 = ( - await buildPayloadWithIntegration({ - dbName, - pluginOpts: { - dbAdapter: createMockAdapter(), - knowledgePools: { - default: { - collections: { - posts: { - toKnowledgePool: async (doc: any) => [{ chunk: doc.title }], - }, - }, - embeddingConfig: { - version: 'new-version', - queryFn: makeDummyEmbedQuery(DIMS), - bulkEmbeddingsFns: createMockBulkEmbeddings({ statusSequence: ['succeeded'] }), - }, - }, - }, - realtimeQueueName: QUEUE_NAMES_1.realtimeQueueName, - bulkQueueNames: QUEUE_NAMES_1.bulkQueueNames, - }, - key: `payload1-${Date.now()}`, - skipMigrations: true, - }) - ).payload - - payloadsToDestroy.push(payload1) + // Phase 2: Mutate config to new-version and re-embed + knowledgePools.default.embeddingConfig.version = 'new-version' + knowledgePools.default.embeddingConfig.bulkEmbeddingsFns = createMockBulkEmbeddings({ + statusSequence: ['succeeded'], + }) - const vectorizedPayload1 = getVectorizedPayload(payload1) - const result1 = await vectorizedPayload1?.bulkEmbed({ knowledgePool: 'default' }) + const result1 = await vp?.bulkEmbed({ knowledgePool: 'default' }) expectGoodResult(result1) - await waitForBulkJobs(payload1, 30000) + await waitForBulkJobs(payload, 30000) - // Phase 4: Verify new-version replaced old-version - const embeds1 = await payload1.find({ + const embeds1 = await payload.find({ collection: 'default', where: { docId: { equals: String(post.id) } }, })