diff --git a/packages/cubejs-server-core/src/core/RefreshScheduler.ts b/packages/cubejs-server-core/src/core/RefreshScheduler.ts index e94001b9a6bb4..c1cd3990934d4 100644 --- a/packages/cubejs-server-core/src/core/RefreshScheduler.ts +++ b/packages/cubejs-server-core/src/core/RefreshScheduler.ts @@ -342,6 +342,14 @@ export class RefreshScheduler { const compilers = await compilerApi.getCompilers(); const queryForEvaluation = await compilerApi.createQueryByDataSource(compilers, {}); + // Bound the dispatch fan-out of refresh-key queries to the orchestrator's + // concurrency. Without this, every cube/timezone combination is enqueued + // at once, creating a large promise fan-out (memory / event-loop pressure) + // on deployments with many cubes or timezones. The limiter is shared per + // orchestrator id so concurrent refresh runs resolving to the same + // orchestrator respect a single cap. + const limit = await this.serverCore.getRefreshKeysLimiter(context, queryingOptions.concurrency); + await Promise.all(queryForEvaluation.cubeEvaluator.cubeNames().map(async cube => { const cubeFromPath = queryForEvaluation.cubeEvaluator.cubeFromPath(cube); const measuresCount = Object.keys(cubeFromPath.measures || {}).length; @@ -349,7 +357,7 @@ export class RefreshScheduler { if (measuresCount === 0 && dimensionsCount === 0) { return; } - await Promise.all(queryingOptions.timezones.map(async timezone => { + await Promise.all(queryingOptions.timezones.map(timezone => limit(async () => { const query = { ...queryingOptions, ...( @@ -373,7 +381,7 @@ export class RefreshScheduler { scheduledRefresh: true, loadRefreshKeysOnly: true, }); - })); + }))); })); } diff --git a/packages/cubejs-server-core/src/core/server.ts b/packages/cubejs-server-core/src/core/server.ts index da04f2468415f..6912d23df0180 100644 --- a/packages/cubejs-server-core/src/core/server.ts +++ b/packages/cubejs-server-core/src/core/server.ts @@ -130,6 +130,15 @@ export class CubejsServerCore { protected readonly orchestratorStorage: OrchestratorStorage = new OrchestratorStorage(); + /** + * Concurrency limiters used to throttle the refresh-key dispatch fan-out of + * the scheduled refresh, keyed by orchestrator id. Sharing a single limiter + * per orchestrator id ensures that concurrent scheduled refresh runs that + * resolve to the same orchestrator (e.g. multiple security contexts) don't + * collectively flood the event loop with refresh-key queries. + */ + protected readonly refreshKeysLimiters: Map = new Map(); + // eslint-disable-next-line @typescript-eslint/no-unused-vars protected repositoryFactory: ((context: RequestContext) => SchemaFileRepository) | (() => FileRepository); @@ -570,6 +579,26 @@ export class CubejsServerCore { this.startScheduledRefreshTimer(); } + /** + * Returns a concurrency limiter shared across scheduled refresh runs that + * resolve to the same orchestrator id. It is used to bound the refresh-key + * dispatch fan-out so that deployments with many cubes/timezones don't + * enqueue every refresh-key query at once. + */ + public async getRefreshKeysLimiter(context: RequestContext, concurrency: number): Promise { + const orchestratorId = await this.contextToOrchestratorId(context); + const existing = this.refreshKeysLimiters.get(orchestratorId); + + if (existing && existing.concurrency === concurrency) { + return existing.limiter; + } + + const limiter = pLimit(concurrency); + this.refreshKeysLimiters.set(orchestratorId, { concurrency, limiter }); + + return limiter; + } + public async getOrchestratorApi(context: RequestContext): Promise { const orchestratorId = await this.contextToOrchestratorId(context); diff --git a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts index 8a274e0256f63..3c7ec9be4ca8c 100644 --- a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts +++ b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts @@ -1232,4 +1232,74 @@ describe('Refresh Scheduler', () => { // backoffDataStillActive exists, which means backoff is still in place // (nextTimestamp may be close to current time due to test execution delays) }); + + test('getRefreshKeysLimiter returns a shared limiter per orchestrator id', async () => { + const { serverCore } = setupScheduler({ + repository: repositoryWithoutPreAggregations, + skipAssertSecurityContext: true, + }); + + const ctx = { authInfo: null, securityContext: {}, requestId: 'XXX' }; + + const first = await serverCore.getRefreshKeysLimiter(ctx, 2); + const second = await serverCore.getRefreshKeysLimiter(ctx, 2); + // Same orchestrator id and concurrency should reuse the limiter + expect(second).toBe(first); + + // Changing the concurrency rebuilds the limiter + const third = await serverCore.getRefreshKeysLimiter(ctx, 3); + expect(third).not.toBe(first); + }); + + test('Refresh key dispatch is bounded by concurrency', async () => { + process.env.CUBEJS_EXTERNAL_DEFAULT = 'false'; + process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true'; + + const { refreshScheduler, serverCore } = setupScheduler({ + repository: repositoryWithPreAggregations, + }); + + const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' }; + const concurrency = 2; + + let inFlight = 0; + let maxInFlight = 0; + + // Wrap the orchestrator's executeQuery once to track how many refresh-key + // dispatches are running concurrently. + const orchestratorApi = await serverCore.getOrchestratorApi(ctx); + const realExecuteQuery = orchestratorApi.executeQuery.bind(orchestratorApi); + orchestratorApi.executeQuery = async (query: any) => { + if (query.loadRefreshKeysOnly) { + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + try { + // Hold the slot briefly so concurrent dispatches can build up. + await new Promise(resolve => setTimeout(resolve, 25)); + return await realExecuteQuery(query); + } finally { + inFlight -= 1; + } + } + return realExecuteQuery(query); + }; + jest.spyOn(serverCore, 'getOrchestratorApi').mockResolvedValue(orchestratorApi); + + // Two cubes (Foo, Bar) x two timezones => four refresh-key dispatches. + // Without the limiter all four would run at once; with it no more than + // `concurrency` should overlap. + for (let i = 0; i < 1000; i++) { + const refreshResult = await refreshScheduler.runScheduledRefresh(ctx, { + concurrency, + workerIndices: [0], + timezones: ['UTC', 'America/Los_Angeles'], + }); + if (refreshResult.finished) { + break; + } + } + + expect(maxInFlight).toBeGreaterThan(0); + expect(maxInFlight).toBeLessThanOrEqual(concurrency); + }); });