Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file.

## 0.6.0-beta.5 - 2026-02-25

- Merges main into split_db_adapter (per-batch polling, coordinator/worker architecture, destroyPayload cleanup).

## 0.6.0-beta.4 - 2026-02-20

- Merges main with should embed changes.
Expand Down Expand Up @@ -87,6 +91,26 @@ const score = result.similarity
const score = result.score
```

## 0.5.5 - 2026-02-24

### Added

- **`batchLimit` option on `CollectionVectorizeOption`** – limits the number of documents fetched per bulk-embed worker job. When set, each page of results queues a continuation job for the next page, preventing serverless time-limit issues on large collections. Defaults to 1000.

### Changed

- **Coordinator / worker architecture for `prepare-bulk-embedding`** – the initial job now acts as a coordinator that fans out one worker job per collection. Each worker processes a single page of documents, making bulk embedding parallelizable and more resilient to timeouts.
- **Per-batch polling via `poll-or-complete-single-batch`** – replaced the monolithic `poll-or-complete-bulk-embedding` task. Each provider batch now has its own polling job, improving observability and reducing memory usage.
- **Memory-efficient incremental aggregation** – `finalizeRunIfComplete` now scans batch records page-by-page instead of loading all batches into memory at once.

### Removed

- `poll-or-complete-bulk-embedding` task (replaced by `poll-or-complete-single-batch`).

### Upgrade Notes

- **Ensure no bulk embedding run is in progress when upgrading.** The `poll-or-complete-bulk-embedding` task has been removed and replaced by `poll-or-complete-single-batch`. Any in-flight bulk run that still has pending `poll-or-complete-bulk-embedding` jobs will fail because the task slug no longer exists. Wait for all active runs to complete (or cancel them) before deploying this version.

## 0.5.4 - 2026-02-20

### Added
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ type OnBulkErrorArgs = {

The plugin uses separate Payload jobs for reliability with long-running providers:

- **`prepare-bulk-embedding`**: Streams through documents, calls your `addChunk` for each chunk, creates batch records.
- **`poll-or-complete-bulk-embedding`**: Polls all batches, requeues itself until done, then writes all successful embeddings (partial chunk failures are allowed).
- **`prepare-bulk-embedding`**: A coordinator job fans out one worker per collection. Each worker streams through documents, calls your `addChunk` for each chunk, and creates batch records. When `batchLimit` is set on a collection, workers paginate and queue continuation jobs.
- **`poll-or-complete-single-batch`**: Polls a single batch, requeues itself until done, then writes successful embeddings. When all batches for a run are terminal, the run is finalized (partial chunk failures are allowed).

### Queue Configuration

Expand Down Expand Up @@ -561,6 +561,7 @@ curl -X POST http://localhost:3000/api/vector-retry-failed-batch \

- `shouldEmbedFn? (doc, payload)` – optional filter that runs **before** the document is queued for embedding. Return `false` to skip the document entirely (no job is created and `toKnowledgePool` is never called). Works for both real-time and bulk embedding. Defaults to embedding all documents when omitted.
- `toKnowledgePool (doc, payload)` – return an array of `{ chunk, ...extensionFieldValues }`. Each object becomes one embedding row and the index in the array determines `chunkIndex`.
- `batchLimit? (number)` – max documents to fetch per bulk-embed worker job. When set, each page of results becomes a separate job that queues a continuation for the next page. Useful for large collections that would exceed serverless time limits in a single job. Defaults to 1000.

Reserved column names: `sourceCollection`, `docId`, `chunkIndex`, `chunkText`, `embeddingVersion`. Avoid reusing them in `extensionFields`.

Expand Down
8 changes: 6 additions & 2 deletions adapters/pg/dev/specs/extensionFields.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { Payload } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { postgresAdapter } from '@payloadcms/db-postgres'
import { buildDummyConfig, integration, plugin } from './constants.js'
import { createTestDb, waitForVectorizationJobs } from './utils.js'
import { createTestDb, destroyPayload, waitForVectorizationJobs } from './utils.js'
import { getPayload } from 'payload'
import { PostgresPayload } from '../../src/types.js'
import { chunkText, chunkRichTextSimple as chunkRichText } from '@shared-test/helpers/chunkers'
Expand Down Expand Up @@ -113,6 +113,10 @@ describe('Extension fields integration tests', () => {
})
})

afterAll(async () => {
await destroyPayload(payload)
})

test('extension fields are added to the embeddings table schema', async () => {
const db = (payload as PostgresPayload).db
const sql = `
Expand Down
7 changes: 6 additions & 1 deletion adapters/pg/dev/specs/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
* These tests verify Postgres-specific functionality like
* vector column creation, schema modifications, etc.
*/
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import type { Payload, SanitizedConfig } from 'payload'
import { buildConfig, getPayload } from 'payload'
import { postgresAdapter } from '@payloadcms/db-postgres'
import { lexicalEditor } from '@payloadcms/richtext-lexical'
import { Client } from 'pg'
import { createPostgresVectorIntegration } from '../../src/index.js'
import { destroyPayload } from './utils.js'
import payloadcmsVectorize from 'payloadcms-vectorize'

const DIMS = 8
Expand Down Expand Up @@ -88,6 +89,10 @@ describe('Postgres-specific integration tests', () => {
})
})

afterAll(async () => {
await destroyPayload(payload)
})

test('adds embeddings collection with vector column', async () => {
// Check schema for embeddings collection
const collections = payload.collections
Expand Down
14 changes: 12 additions & 2 deletions adapters/pg/dev/specs/migrationCli.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { postgresAdapter } from '@payloadcms/db-postgres'
import { buildConfig, getPayload } from 'payload'
import { createPostgresVectorIntegration } from '../../src/index.js'
import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from '@shared-test/helpers/embed'
import { createTestDb } from './utils.js'
import { createTestDb, destroyPayload } from './utils.js'
import { DIMS } from './constants.js'

const createVectorizeIntegration = createPostgresVectorIntegration
Expand Down Expand Up @@ -77,6 +77,10 @@ describe('Migration CLI integration tests', () => {
payload = await getPayload({ config, cron: true })
})

afterAll(async () => {
await destroyPayload(payload)
})

test('VectorizedPayload has _staticConfigs via getDbAdapterCustom', async () => {
const { getVectorizedPayload } = await import('payloadcms-vectorize')
const vectorizedPayload = getVectorizedPayload(payload)
Expand Down Expand Up @@ -159,6 +163,10 @@ describe('Migration CLI integration tests', () => {
})
})

afterAll(async () => {
await destroyPayload(payload)
})

test('vector search fails with descriptive error when embedding column missing', async () => {
const { getVectorizedPayload } = await import('payloadcms-vectorize')
const vectorizedPayload = getVectorizedPayload(payload)
Expand Down Expand Up @@ -207,7 +215,7 @@ describe('Migration CLI integration tests', () => {
})

afterAll(async () => {
// Cleanup: remove test migrations directory
await destroyPayload(autoPayload)
if (existsSync(migrationsDir)) {
rmSync(migrationsDir, { recursive: true, force: true })
}
Expand Down Expand Up @@ -429,6 +437,7 @@ describe('Migration CLI integration tests', () => {
})

afterAll(async () => {
await destroyPayload(dimsPayload)
if (existsSync(migrationsDir)) {
rmSync(migrationsDir, { recursive: true, force: true })
}
Expand Down Expand Up @@ -729,6 +738,7 @@ describe('Migration CLI integration tests', () => {
})

afterAll(async () => {
await destroyPayload(multiPayload)
if (existsSync(migrationsDir)) {
rmSync(migrationsDir, { recursive: true, force: true })
}
Expand Down
8 changes: 6 additions & 2 deletions adapters/pg/dev/specs/multipools.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { Payload, SanitizedConfig } from 'payload'

import { buildConfig } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { lexicalEditor } from '@payloadcms/richtext-lexical'
import { postgresAdapter } from '@payloadcms/db-postgres'
import { createTestDb } from './utils.js'
import { createTestDb, destroyPayload } from './utils.js'
import { getPayload } from 'payload'
import { createPostgresVectorIntegration } from '../../src/index.js'
import payloadcmsVectorize from 'payloadcms-vectorize'
Expand Down Expand Up @@ -79,6 +79,10 @@ describe('Multiple knowledge pools', () => {
})
})

afterAll(async () => {
await destroyPayload(payload)
})

test('creates two embeddings collections with vector columns', async () => {
const collections = payload.collections
expect(collections).toHaveProperty('pool1')
Expand Down
12 changes: 10 additions & 2 deletions adapters/pg/dev/specs/schemaName.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ import type { Payload } from 'payload'
import { postgresAdapter } from '@payloadcms/db-postgres'
import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from '@shared-test/helpers/embed'
import { Client } from 'pg'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'

import type { PostgresPayload } from '../../src/types.js'

import { buildDummyConfig, DIMS, integration, plugin } from './constants.js'
import { createTestDb, waitForVectorizationJobs } from './utils.js'
import {
createTestDb,
destroyPayload,
waitForVectorizationJobs,
} from './utils.js'
import { getPayload } from 'payload'
import { getVectorizedPayload } from 'payloadcms-vectorize'
const CUSTOM_SCHEMA = 'custom'
Expand Down Expand Up @@ -91,6 +95,10 @@ describe('Custom schemaName support', () => {
})
})

afterAll(async () => {
await destroyPayload(payload)
})

test('embeddings table is created in custom schema', async () => {
const db = (payload as PostgresPayload).db
const tablesRes = await db.pool?.query(
Expand Down
2 changes: 1 addition & 1 deletion adapters/pg/dev/specs/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Client } from 'pg'

export { waitForVectorizationJobs } from '@shared-test/utils'
export { waitForVectorizationJobs, destroyPayload } from '@shared-test/utils'

export const createTestDb = async ({ dbName }: { dbName: string }) => {
const adminUri =
Expand Down
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/basic.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Payload, SanitizedConfig } from 'payload'
import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js'
import { BULK_EMBEDDINGS_INPUT_METADATA_SLUG } from '../../../src/collections/bulkEmbeddingInputMetadata.js'
Expand All @@ -10,6 +10,7 @@ import {
clearAllCollections,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
Expand Down Expand Up @@ -56,6 +57,10 @@ describe('Bulk embed - basic tests', () => {
vectorizedPayload = getVectorizedPayload(payload)
})

afterAll(async () => {
await destroyPayload(payload)
})

beforeEach(async () => {
await clearAllCollections(payload)
})
Expand Down
117 changes: 117 additions & 0 deletions dev/specs/bulkEmbed/batchLimit.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import type { Payload } from 'payload'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js'
import {
BULK_QUEUE_NAMES,
DEFAULT_DIMS,
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
import { getVectorizedPayload, VectorizedPayload } from 'payloadcms-vectorize'
import { expectGoodResult } from '../utils.vitest.js'
import { createMockAdapter } from 'helpers/mockAdapter.js'

const DIMS = DEFAULT_DIMS
const dbName = `bulk_batchlimit_${Date.now()}`

describe('Bulk embed - batchLimit', () => {
let payload: Payload
let vectorizedPayload: VectorizedPayload | null = null

beforeAll(async () => {
await createTestDb({ dbName })
const built = await buildPayloadWithIntegration({
dbName,
pluginOpts: {
dbAdapter: createMockAdapter(),
knowledgePools: {
default: {
collections: {
posts: {
toKnowledgePool: async (doc: any) => [{ chunk: doc.title }],
batchLimit: 2,
},
},
embeddingConfig: {
version: testEmbeddingVersion,
queryFn: makeDummyEmbedQuery(DIMS),
bulkEmbeddingsFns: createMockBulkEmbeddings({
statusSequence: ['succeeded'],
}),
},
},
},
bulkQueueNames: BULK_QUEUE_NAMES,
},
key: `batchlimit-${Date.now()}`,
})
payload = built.payload
vectorizedPayload = getVectorizedPayload(payload)
})

afterAll(async () => {
await destroyPayload(payload)
})

test('batchLimit splits docs across continuation jobs and all get embedded', async () => {
// Create 5 posts with batchLimit: 2
// Should result in 3 prepare jobs (2 docs, 2 docs, 1 doc)
for (let i = 0; i < 5; i++) {
await payload.create({ collection: 'posts', data: { title: `BatchLimit Post ${i}` } as any })
}

const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' })
expectGoodResult(result)

await waitForBulkJobs(payload, 30000)

// All 5 posts should have embeddings
const embeds = await payload.find({ collection: 'default' })
expect(embeds.totalDocs).toBe(5)

// Run should be succeeded
const runDoc = (
await (payload as any).find({
collection: BULK_EMBEDDINGS_RUNS_SLUG,
where: { id: { equals: result!.runId } },
})
).docs[0]
expect(runDoc.status).toBe('succeeded')
expect(runDoc.inputs).toBe(5)
})

test('batchLimit equal to doc count does not create extra continuations', async () => {
// Clean up from prior test: delete all posts and embeddings
await payload.delete({ collection: 'posts', where: {} })
await payload.delete({ collection: 'default' as any, where: {} })

// Create exactly 2 posts (matching batchLimit: 2)
for (let i = 0; i < 2; i++) {
await payload.create({
collection: 'posts',
data: { title: `Exact Post ${i}` } as any,
})
}

const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' })
expectGoodResult(result)

await waitForBulkJobs(payload, 20000)

const embeds = await payload.find({ collection: 'default' })
expect(embeds.totalDocs).toBe(2)

const runDoc = (
await (payload as any).find({
collection: BULK_EMBEDDINGS_RUNS_SLUG,
where: { id: { equals: result!.runId } },
})
).docs[0]
expect(runDoc.status).toBe('succeeded')
})
})
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/canceledBatch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import type { Payload } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import {
BULK_QUEUE_NAMES,
DEFAULT_DIMS,
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
Expand Down Expand Up @@ -51,6 +52,10 @@ describe('Bulk embed - canceled batch', () => {
vectorizedPayload = getVectorizedPayload(payload)
})

afterAll(async () => {
await destroyPayload(payload)
})

test('canceled batch marks entire run as failed', async () => {
const post = await payload.create({ collection: 'posts', data: { title: 'Cancel' } as any })
const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' })
Expand Down
Loading