|
1 | | -import { DiffTriggerOperation } from "@powersync/common" |
| 1 | +import { DiffTriggerOperation, sanitizeSQL } from "@powersync/common" |
| 2 | +import { DEFAULT_BATCH_SIZE } from "./definitions" |
2 | 3 | import { asPowerSyncRecord, mapOperation } from "./helpers" |
3 | 4 | import { PendingOperationStore } from "./PendingOperationStore" |
4 | 5 | import { PowerSyncTransactor } from "./PowerSyncTransactor" |
5 | | -import type { TriggerDiffRecord } from "@powersync/common" |
6 | | -import type { StandardSchemaV1 } from "@standard-schema/spec" |
7 | | -import type { |
8 | | - CollectionConfig, |
9 | | - InferSchemaOutput, |
10 | | - SyncConfig, |
11 | | -} from "@tanstack/db" |
12 | 6 | import type { |
13 | 7 | EnhancedPowerSyncCollectionConfig, |
14 | 8 | PowerSyncCollectionConfig, |
15 | 9 | PowerSyncCollectionUtils, |
16 | 10 | } from "./definitions" |
17 | 11 | import type { PendingOperation } from "./PendingOperationStore" |
| 12 | +import type { |
| 13 | + CollectionConfig, |
| 14 | + InferSchemaOutput, |
| 15 | + SyncConfig, |
| 16 | +} from "@tanstack/db" |
| 17 | +import type { StandardSchemaV1 } from "@standard-schema/spec" |
| 18 | +import type { TriggerDiffRecord } from "@powersync/common" |
18 | 19 |
|
19 | 20 | /** |
20 | 21 | * Creates PowerSync collection options for use with a standard Collection |
@@ -100,7 +101,12 @@ export function powerSyncCollectionOptions< |
100 | 101 | >( |
101 | 102 | config: PowerSyncCollectionConfig<T, TSchema> |
102 | 103 | ): EnhancedPowerSyncCollectionConfig<T, TSchema> { |
103 | | - const { database, tableName, ...restConfig } = config |
| 104 | + const { |
| 105 | + database, |
| 106 | + tableName, |
| 107 | + syncBatchSize = DEFAULT_BATCH_SIZE, |
| 108 | + ...restConfig |
| 109 | + } = config |
104 | 110 |
|
105 | 111 | /** |
106 | 112 | * The onInsert, onUpdate, onDelete handlers should only return |
@@ -202,16 +208,24 @@ export function powerSyncCollectionOptions< |
202 | 208 | }, |
203 | 209 | hooks: { |
204 | 210 | beforeCreate: async (context) => { |
205 | | - begin() |
206 | | - for (const row of await context.getAll<T>( |
207 | | - `SELECT * FROM ${tableName}` |
208 | | - )) { |
209 | | - write({ |
210 | | - type: `insert`, |
211 | | - value: row, |
212 | | - }) |
| 211 | + let currentBatchCount = syncBatchSize |
| 212 | + let cursor = 0 |
| 213 | + while (currentBatchCount == syncBatchSize) { |
| 214 | + begin() |
| 215 | + const batchItems = await context.getAll<T>( |
| 216 | + sanitizeSQL`SELECT * FROM ${tableName} LIMIT ? OFFSET ?`, |
| 217 | + [syncBatchSize, cursor] |
| 218 | + ) |
| 219 | + currentBatchCount = batchItems.length |
| 220 | + cursor += currentBatchCount |
| 221 | + for (const row of batchItems) { |
| 222 | + write({ |
| 223 | + type: `insert`, |
| 224 | + value: row, |
| 225 | + }) |
| 226 | + } |
| 227 | + commit() |
213 | 228 | } |
214 | | - commit() |
215 | 229 | markReady() |
216 | 230 | database.logger.info(`Sync is ready`) |
217 | 231 | }, |
|
0 commit comments