From 7d1409e52c8f32be4ab09c1de42b642aaae7071f Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 25 Jun 2026 09:40:02 +0200 Subject: [PATCH 1/7] feat: move to continue as new pattern Signed-off-by: Umberto Sgueglia --- .../apps/packages_worker/src/activities.ts | 2 +- .../packages_worker/src/bin/maven-worker.ts | 4 +- .../packages_worker/src/maven/activities.ts | 17 ---- .../packages_worker/src/maven/schedule.ts | 93 +++++++------------ .../packages_worker/src/maven/workflows.ts | 26 ++++-- .../packages_worker/src/workflows/index.ts | 2 +- 6 files changed, 52 insertions(+), 92 deletions(-) diff --git a/services/apps/packages_worker/src/activities.ts b/services/apps/packages_worker/src/activities.ts index acd9079f24..ebb4594484 100644 --- a/services/apps/packages_worker/src/activities.ts +++ b/services/apps/packages_worker/src/activities.ts @@ -12,7 +12,7 @@ export { } from './npm/activities' export * from './deps-dev/activities' export { osvSyncEcosystem, osvDeriveCriticalFlag } from './osv/activities' -export { processMavenCriticalBatch, processMavenNonCriticalBatch } from './maven/activities' +export { processMavenCriticalBatch } from './maven/activities' export { criticalityComputePageRank, rankPackages } from './criticality/activities' export { cargoDownloadAndLoad, diff --git a/services/apps/packages_worker/src/bin/maven-worker.ts b/services/apps/packages_worker/src/bin/maven-worker.ts index 3cbfee3ff6..3600e8556e 100644 --- a/services/apps/packages_worker/src/bin/maven-worker.ts +++ b/services/apps/packages_worker/src/bin/maven-worker.ts @@ -1,8 +1,8 @@ -import { scheduleMavenCritical } from '../maven/schedule' +import { scheduleMavenIngestion } from '../maven/schedule' import { svc } from '../service' setImmediate(async () => { await svc.init() - await scheduleMavenCritical() + await scheduleMavenIngestion() await svc.start() }) diff --git a/services/apps/packages_worker/src/maven/activities.ts b/services/apps/packages_worker/src/maven/activities.ts index bf1acbe5b3..3a0b7fad49 100644 --- a/services/apps/packages_worker/src/maven/activities.ts +++ b/services/apps/packages_worker/src/maven/activities.ts @@ -19,20 +19,3 @@ export async function processMavenCriticalBatch(): Promise { log.info({ ...result }, 'Maven critical batch complete') return result } - -export async function processMavenNonCriticalBatch(): Promise { - const config = getMavenConfig() - const qx = await getPackagesDb() - // Non-critical is DB-only (no POM fetch); the flag is unused on this path. - const result = await processBatch(qx, config, false, false) - log.info( - { - processed: result.processed, - skipped: result.skipped, - unchanged: result.unchanged, - error: result.error, - }, - 'Maven non-critical batch complete', - ) - return result -} diff --git a/services/apps/packages_worker/src/maven/schedule.ts b/services/apps/packages_worker/src/maven/schedule.ts index 71b29943f7..e9144c43b1 100644 --- a/services/apps/packages_worker/src/maven/schedule.ts +++ b/services/apps/packages_worker/src/maven/schedule.ts @@ -1,78 +1,49 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client' import { svc } from '../service' -import { mavenCriticalWorkflow } from '../workflows' +import { ingestMavenPackages } from '../workflows' -export async function scheduleMavenCritical(): Promise { +const LEGACY_SCHEDULE_ID = 'maven-critical' + +export async function scheduleMavenIngestion(): Promise { const { temporal } = svc if (!temporal) throw new Error('Temporal client not initialized') - const scheduleOptions: Parameters[0] = { - scheduleId: 'maven-critical', - spec: { - cronExpressions: ['*/1 * * * *'], - }, - policies: { - overlap: ScheduleOverlapPolicy.SKIP, - catchupWindow: '1 hour', - }, - action: { - type: 'startWorkflow', - workflowType: mavenCriticalWorkflow, - taskQueue: 'packages-worker', - workflowExecutionTimeout: '15 minutes', - retry: { - initialInterval: '30 seconds', - backoffCoefficient: 2, - maximumAttempts: 3, - }, - args: [], - }, + try { + await temporal.schedule.getHandle(LEGACY_SCHEDULE_ID).delete() + svc.log.info({ scheduleId: LEGACY_SCHEDULE_ID }, 'Deleted legacy schedule.') + } catch { + // Not found — nothing to clean up. } try { - await temporal.schedule.create(scheduleOptions) + await temporal.schedule.create({ + scheduleId: 'maven-ingestion', + spec: { + cronExpressions: ['0 0 * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.SKIP, + catchupWindow: '1 hour', + }, + action: { + type: 'startWorkflow', + workflowType: ingestMavenPackages, + taskQueue: 'packages-worker', + workflowRunTimeout: '24 hours', + retry: { + initialInterval: '30 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [], + }, + }) } catch (err) { if (err instanceof ScheduleAlreadyRunning) { - svc.log.info('Schedule maven-critical already exists, skipping creation.') + svc.log.info('Schedule maven-ingestion already exists, skipping creation.') } else { throw err } } } - -// export async function scheduleMavenNonCritical(): Promise { -// const { temporal } = svc -// if (!temporal) throw new Error('Temporal client not initialized') - -// try { -// await temporal.schedule.create({ -// scheduleId: 'maven-non-critical', -// spec: { -// cronExpressions: ['*/10 * * * *'], -// }, -// policies: { -// overlap: ScheduleOverlapPolicy.SKIP, -// catchupWindow: '1 hour', -// }, -// action: { -// type: 'startWorkflow', -// workflowType: mavenNonCriticalWorkflow, -// taskQueue: 'packages-worker', -// workflowExecutionTimeout: '5 minutes', -// retry: { -// initialInterval: '30 seconds', -// backoffCoefficient: 2, -// maximumAttempts: 3, -// }, -// args: [], -// }, -// }) -// } catch (err) { -// if (err instanceof ScheduleAlreadyRunning) { -// svc.log.info('Schedule maven-non-critical already registered.') -// } else { -// throw err -// } -// } -// } diff --git a/services/apps/packages_worker/src/maven/workflows.ts b/services/apps/packages_worker/src/maven/workflows.ts index 223cee6701..5683418c71 100644 --- a/services/apps/packages_worker/src/maven/workflows.ts +++ b/services/apps/packages_worker/src/maven/workflows.ts @@ -1,19 +1,25 @@ -import { proxyActivities } from '@temporalio/workflow' +import { continueAsNew, proxyActivities } from '@temporalio/workflow' import type * as activities from './activities' -const { processMavenCriticalBatch } = proxyActivities({ +const acts = proxyActivities({ startToCloseTimeout: '15 minutes', + retry: { + initialInterval: '30 seconds', + backoffCoefficient: 2, + maximumAttempts: 5, + }, }) -const { processMavenNonCriticalBatch } = proxyActivities({ - startToCloseTimeout: '5 minutes', -}) +const BATCHES_PER_RUN = 5 -export async function mavenCriticalWorkflow(): Promise { - await processMavenCriticalBatch() -} +export async function ingestMavenPackages(): Promise { + for (let i = 0; i < BATCHES_PER_RUN; i++) { + const result = await acts.processMavenCriticalBatch() + if (result.processed + result.skipped + result.error + result.unchanged === 0) { + return + } + } -export async function mavenNonCriticalWorkflow(): Promise { - await processMavenNonCriticalBatch() + await continueAsNew() } diff --git a/services/apps/packages_worker/src/workflows/index.ts b/services/apps/packages_worker/src/workflows/index.ts index 244bb26b6c..411ba509f8 100644 --- a/services/apps/packages_worker/src/workflows/index.ts +++ b/services/apps/packages_worker/src/workflows/index.ts @@ -15,7 +15,7 @@ export { ingestDependentCounts, } from '../deps-dev/workflows' export { osvSync } from '../osv/workflows' -export { mavenCriticalWorkflow, mavenNonCriticalWorkflow } from '../maven/workflows' +export { ingestMavenPackages } from '../maven/workflows' export { ingestScorecard } from '../scorecard/workflows' export { rankPackagesWorkflow } from '../criticality/workflow' export { cargoSyncWorkflow } from '../cargo/workflows' From cc19947e3f770995ad18c3bb5578993b765f57b8 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 25 Jun 2026 09:51:10 +0200 Subject: [PATCH 2/7] fix: simplify schedule Signed-off-by: Umberto Sgueglia --- services/apps/packages_worker/src/maven/schedule.ts | 7 +++++-- services/apps/packages_worker/src/maven/workflows.ts | 11 +++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/services/apps/packages_worker/src/maven/schedule.ts b/services/apps/packages_worker/src/maven/schedule.ts index e9144c43b1..9966c607de 100644 --- a/services/apps/packages_worker/src/maven/schedule.ts +++ b/services/apps/packages_worker/src/maven/schedule.ts @@ -12,8 +12,11 @@ export async function scheduleMavenIngestion(): Promise { try { await temporal.schedule.getHandle(LEGACY_SCHEDULE_ID).delete() svc.log.info({ scheduleId: LEGACY_SCHEDULE_ID }, 'Deleted legacy schedule.') - } catch { - // Not found — nothing to clean up. + } catch (err) { + svc.log.warn( + { err, scheduleId: LEGACY_SCHEDULE_ID }, + 'Failed to delete legacy schedule (may not exist).', + ) } try { diff --git a/services/apps/packages_worker/src/maven/workflows.ts b/services/apps/packages_worker/src/maven/workflows.ts index 5683418c71..9eefdd130e 100644 --- a/services/apps/packages_worker/src/maven/workflows.ts +++ b/services/apps/packages_worker/src/maven/workflows.ts @@ -11,15 +11,10 @@ const acts = proxyActivities({ }, }) -const BATCHES_PER_RUN = 5 - export async function ingestMavenPackages(): Promise { - for (let i = 0; i < BATCHES_PER_RUN; i++) { - const result = await acts.processMavenCriticalBatch() - if (result.processed + result.skipped + result.error + result.unchanged === 0) { - return - } + const result = await acts.processMavenCriticalBatch() + if (result.processed + result.skipped + result.error + result.unchanged === 0) { + return } - await continueAsNew() } From 46bea6ca20ea6bfb4c512237410b498803f2a20e Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 25 Jun 2026 10:05:12 +0200 Subject: [PATCH 3/7] fix: ignoring the error in continue as new logic Signed-off-by: Umberto Sgueglia --- services/apps/packages_worker/src/maven/workflows.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/packages_worker/src/maven/workflows.ts b/services/apps/packages_worker/src/maven/workflows.ts index 9eefdd130e..1c0923185b 100644 --- a/services/apps/packages_worker/src/maven/workflows.ts +++ b/services/apps/packages_worker/src/maven/workflows.ts @@ -13,7 +13,7 @@ const acts = proxyActivities({ export async function ingestMavenPackages(): Promise { const result = await acts.processMavenCriticalBatch() - if (result.processed + result.skipped + result.error + result.unchanged === 0) { + if (result.processed + result.skipped + result.unchanged === 0) { return } await continueAsNew() From b6c85c988651973ec5cb15f78a98061f5b8b0268 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 25 Jun 2026 10:16:12 +0200 Subject: [PATCH 4/7] fix: adding logs Signed-off-by: Umberto Sgueglia --- services/apps/packages_worker/src/maven/workflows.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/apps/packages_worker/src/maven/workflows.ts b/services/apps/packages_worker/src/maven/workflows.ts index 1c0923185b..97b175df94 100644 --- a/services/apps/packages_worker/src/maven/workflows.ts +++ b/services/apps/packages_worker/src/maven/workflows.ts @@ -1,4 +1,4 @@ -import { continueAsNew, proxyActivities } from '@temporalio/workflow' +import { continueAsNew, log, proxyActivities } from '@temporalio/workflow' import type * as activities from './activities' @@ -14,6 +14,7 @@ const acts = proxyActivities({ export async function ingestMavenPackages(): Promise { const result = await acts.processMavenCriticalBatch() if (result.processed + result.skipped + result.unchanged === 0) { + log.info({ ...result }, 'Maven ingestion complete — no more work, exiting.') return } await continueAsNew() From 62fc6f9e6854a01f177ce25d581d2f4cfba1fb4b Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 25 Jun 2026 10:25:41 +0200 Subject: [PATCH 5/7] fix: adding logs Signed-off-by: Umberto Sgueglia --- services/apps/packages_worker/src/maven/workflows.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/packages_worker/src/maven/workflows.ts b/services/apps/packages_worker/src/maven/workflows.ts index 97b175df94..048f5cde54 100644 --- a/services/apps/packages_worker/src/maven/workflows.ts +++ b/services/apps/packages_worker/src/maven/workflows.ts @@ -14,7 +14,7 @@ const acts = proxyActivities({ export async function ingestMavenPackages(): Promise { const result = await acts.processMavenCriticalBatch() if (result.processed + result.skipped + result.unchanged === 0) { - log.info({ ...result }, 'Maven ingestion complete — no more work, exiting.') + log.info('Maven ingestion complete — no more work, exiting.', { ...result }) return } await continueAsNew() From a958108005d861febcd65ebc93fd358a2274c33f Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 25 Jun 2026 10:32:01 +0200 Subject: [PATCH 6/7] fix: update schedule Signed-off-by: Umberto Sgueglia --- services/apps/packages_worker/src/maven/schedule.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/packages_worker/src/maven/schedule.ts b/services/apps/packages_worker/src/maven/schedule.ts index 9966c607de..66ffdd9c25 100644 --- a/services/apps/packages_worker/src/maven/schedule.ts +++ b/services/apps/packages_worker/src/maven/schedule.ts @@ -23,7 +23,7 @@ export async function scheduleMavenIngestion(): Promise { await temporal.schedule.create({ scheduleId: 'maven-ingestion', spec: { - cronExpressions: ['0 0 * * *'], + cronExpressions: ['0 9 * * *'], }, policies: { overlap: ScheduleOverlapPolicy.SKIP, From 837668d7cbb6c33f2f691e9a14fcdfe1b3f2e36c Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 25 Jun 2026 10:40:24 +0200 Subject: [PATCH 7/7] fix: update workflow id Signed-off-by: Umberto Sgueglia --- services/apps/packages_worker/src/maven/schedule.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/services/apps/packages_worker/src/maven/schedule.ts b/services/apps/packages_worker/src/maven/schedule.ts index 66ffdd9c25..2a815dcd82 100644 --- a/services/apps/packages_worker/src/maven/schedule.ts +++ b/services/apps/packages_worker/src/maven/schedule.ts @@ -32,6 +32,7 @@ export async function scheduleMavenIngestion(): Promise { action: { type: 'startWorkflow', workflowType: ingestMavenPackages, + workflowId: 'maven-daily-enrichment', taskQueue: 'packages-worker', workflowRunTimeout: '24 hours', retry: {