Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions packages/cubejs-server-core/src/core/RefreshScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,22 @@ export class RefreshScheduler {
const compilers = await compilerApi.getCompilers();
const queryForEvaluation = await compilerApi.createQueryByDataSource(compilers, {});

// Bound the dispatch fan-out of refresh-key queries to the orchestrator's
// concurrency. Without this, every cube/timezone combination is enqueued
// at once, creating a large promise fan-out (memory / event-loop pressure)
// on deployments with many cubes or timezones. The limiter is shared per
// orchestrator id so concurrent refresh runs resolving to the same
// orchestrator respect a single cap.
const limit = await this.serverCore.getRefreshKeysLimiter(context, queryingOptions.concurrency);

await Promise.all(queryForEvaluation.cubeEvaluator.cubeNames().map(async cube => {
const cubeFromPath = queryForEvaluation.cubeEvaluator.cubeFromPath(cube);
const measuresCount = Object.keys(cubeFromPath.measures || {}).length;
const dimensionsCount = Object.keys(cubeFromPath.dimensions || {}).length;
if (measuresCount === 0 && dimensionsCount === 0) {
return;
}
await Promise.all(queryingOptions.timezones.map(async timezone => {
await Promise.all(queryingOptions.timezones.map(timezone => limit(async () => {
const query = {
...queryingOptions,
...(
Expand All @@ -373,7 +381,7 @@ export class RefreshScheduler {
scheduledRefresh: true,
loadRefreshKeysOnly: true,
});
}));
})));
}));
}

Expand Down
29 changes: 29 additions & 0 deletions packages/cubejs-server-core/src/core/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ export class CubejsServerCore {

protected readonly orchestratorStorage: OrchestratorStorage = new OrchestratorStorage();

/**
* Concurrency limiters used to throttle the refresh-key dispatch fan-out of
* the scheduled refresh, keyed by orchestrator id. Sharing a single limiter
* per orchestrator id ensures that concurrent scheduled refresh runs that
* resolve to the same orchestrator (e.g. multiple security contexts) don't
* collectively flood the event loop with refresh-key queries.
*/
protected readonly refreshKeysLimiters: Map<string, { concurrency: number, limiter: pLimit.Limit }> = new Map();

// eslint-disable-next-line @typescript-eslint/no-unused-vars
protected repositoryFactory: ((context: RequestContext) => SchemaFileRepository) | (() => FileRepository);

Expand Down Expand Up @@ -570,6 +579,26 @@ export class CubejsServerCore {
this.startScheduledRefreshTimer();
}

/**
* Returns a concurrency limiter shared across scheduled refresh runs that
* resolve to the same orchestrator id. It is used to bound the refresh-key
* dispatch fan-out so that deployments with many cubes/timezones don't
* enqueue every refresh-key query at once.
*/
public async getRefreshKeysLimiter(context: RequestContext, concurrency: number): Promise<pLimit.Limit> {
const orchestratorId = await this.contextToOrchestratorId(context);
const existing = this.refreshKeysLimiters.get(orchestratorId);

if (existing && existing.concurrency === concurrency) {
return existing.limiter;
}

const limiter = pLimit(concurrency);
this.refreshKeysLimiters.set(orchestratorId, { concurrency, limiter });

return limiter;
}

public async getOrchestratorApi(context: RequestContext): Promise<OrchestratorApi> {
const orchestratorId = await this.contextToOrchestratorId(context);

Expand Down
70 changes: 70 additions & 0 deletions packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1232,4 +1232,74 @@ describe('Refresh Scheduler', () => {
// backoffDataStillActive exists, which means backoff is still in place
// (nextTimestamp may be close to current time due to test execution delays)
});

test('getRefreshKeysLimiter returns a shared limiter per orchestrator id', async () => {
const { serverCore } = setupScheduler({
repository: repositoryWithoutPreAggregations,
skipAssertSecurityContext: true,
});

const ctx = { authInfo: null, securityContext: {}, requestId: 'XXX' };

const first = await serverCore.getRefreshKeysLimiter(ctx, 2);
const second = await serverCore.getRefreshKeysLimiter(ctx, 2);
// Same orchestrator id and concurrency should reuse the limiter
expect(second).toBe(first);

// Changing the concurrency rebuilds the limiter
const third = await serverCore.getRefreshKeysLimiter(ctx, 3);
expect(third).not.toBe(first);
});

test('Refresh key dispatch is bounded by concurrency', async () => {
process.env.CUBEJS_EXTERNAL_DEFAULT = 'false';
process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true';

const { refreshScheduler, serverCore } = setupScheduler({
repository: repositoryWithPreAggregations,
});

const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' };
const concurrency = 2;

let inFlight = 0;
let maxInFlight = 0;

// Wrap the orchestrator's executeQuery once to track how many refresh-key
// dispatches are running concurrently.
const orchestratorApi = await serverCore.getOrchestratorApi(ctx);
const realExecuteQuery = orchestratorApi.executeQuery.bind(orchestratorApi);
orchestratorApi.executeQuery = async (query: any) => {
if (query.loadRefreshKeysOnly) {
inFlight += 1;
maxInFlight = Math.max(maxInFlight, inFlight);
try {
// Hold the slot briefly so concurrent dispatches can build up.
await new Promise(resolve => setTimeout(resolve, 25));
return await realExecuteQuery(query);
} finally {
inFlight -= 1;
}
}
return realExecuteQuery(query);
};
jest.spyOn(serverCore, 'getOrchestratorApi').mockResolvedValue(orchestratorApi);

// Two cubes (Foo, Bar) x two timezones => four refresh-key dispatches.
// Without the limiter all four would run at once; with it no more than
// `concurrency` should overlap.
for (let i = 0; i < 1000; i++) {
const refreshResult = await refreshScheduler.runScheduledRefresh(ctx, {
concurrency,
workerIndices: [0],
timezones: ['UTC', 'America/Los_Angeles'],
});
if (refreshResult.finished) {
break;
}
}

expect(maxInFlight).toBeGreaterThan(0);
expect(maxInFlight).toBeLessThanOrEqual(concurrency);
});
});
Loading