diff --git a/services/apps/packages_worker/src/activities.ts b/services/apps/packages_worker/src/activities.ts index acd9079f24..ebb4594484 100644 --- a/services/apps/packages_worker/src/activities.ts +++ b/services/apps/packages_worker/src/activities.ts @@ -12,7 +12,7 @@ export { } from './npm/activities' export * from './deps-dev/activities' export { osvSyncEcosystem, osvDeriveCriticalFlag } from './osv/activities' -export { processMavenCriticalBatch, processMavenNonCriticalBatch } from './maven/activities' +export { processMavenCriticalBatch } from './maven/activities' export { criticalityComputePageRank, rankPackages } from './criticality/activities' export { cargoDownloadAndLoad, diff --git a/services/apps/packages_worker/src/bin/maven-worker.ts b/services/apps/packages_worker/src/bin/maven-worker.ts index 3cbfee3ff6..3600e8556e 100644 --- a/services/apps/packages_worker/src/bin/maven-worker.ts +++ b/services/apps/packages_worker/src/bin/maven-worker.ts @@ -1,8 +1,8 @@ -import { scheduleMavenCritical } from '../maven/schedule' +import { scheduleMavenIngestion } from '../maven/schedule' import { svc } from '../service' setImmediate(async () => { await svc.init() - await scheduleMavenCritical() + await scheduleMavenIngestion() await svc.start() }) diff --git a/services/apps/packages_worker/src/maven/activities.ts b/services/apps/packages_worker/src/maven/activities.ts index bf1acbe5b3..3a0b7fad49 100644 --- a/services/apps/packages_worker/src/maven/activities.ts +++ b/services/apps/packages_worker/src/maven/activities.ts @@ -19,20 +19,3 @@ export async function processMavenCriticalBatch(): Promise { log.info({ ...result }, 'Maven critical batch complete') return result } - -export async function processMavenNonCriticalBatch(): Promise { - const config = getMavenConfig() - const qx = await getPackagesDb() - // Non-critical is DB-only (no POM fetch); the flag is unused on this path. - const result = await processBatch(qx, config, false, false) - log.info( - { - processed: result.processed, - skipped: result.skipped, - unchanged: result.unchanged, - error: result.error, - }, - 'Maven non-critical batch complete', - ) - return result -} diff --git a/services/apps/packages_worker/src/maven/schedule.ts b/services/apps/packages_worker/src/maven/schedule.ts index 71b29943f7..2a815dcd82 100644 --- a/services/apps/packages_worker/src/maven/schedule.ts +++ b/services/apps/packages_worker/src/maven/schedule.ts @@ -1,78 +1,53 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client' import { svc } from '../service' -import { mavenCriticalWorkflow } from '../workflows' +import { ingestMavenPackages } from '../workflows' -export async function scheduleMavenCritical(): Promise { +const LEGACY_SCHEDULE_ID = 'maven-critical' + +export async function scheduleMavenIngestion(): Promise { const { temporal } = svc if (!temporal) throw new Error('Temporal client not initialized') - const scheduleOptions: Parameters[0] = { - scheduleId: 'maven-critical', - spec: { - cronExpressions: ['*/1 * * * *'], - }, - policies: { - overlap: ScheduleOverlapPolicy.SKIP, - catchupWindow: '1 hour', - }, - action: { - type: 'startWorkflow', - workflowType: mavenCriticalWorkflow, - taskQueue: 'packages-worker', - workflowExecutionTimeout: '15 minutes', - retry: { - initialInterval: '30 seconds', - backoffCoefficient: 2, - maximumAttempts: 3, - }, - args: [], - }, + try { + await temporal.schedule.getHandle(LEGACY_SCHEDULE_ID).delete() + svc.log.info({ scheduleId: LEGACY_SCHEDULE_ID }, 'Deleted legacy schedule.') + } catch (err) { + svc.log.warn( + { err, scheduleId: LEGACY_SCHEDULE_ID }, + 'Failed to delete legacy schedule (may not exist).', + ) } try { - await temporal.schedule.create(scheduleOptions) + await temporal.schedule.create({ + scheduleId: 'maven-ingestion', + spec: { + cronExpressions: ['0 9 * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.SKIP, + catchupWindow: '1 hour', + }, + action: { + type: 'startWorkflow', + workflowType: ingestMavenPackages, + workflowId: 'maven-daily-enrichment', + taskQueue: 'packages-worker', + workflowRunTimeout: '24 hours', + retry: { + initialInterval: '30 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [], + }, + }) } catch (err) { if (err instanceof ScheduleAlreadyRunning) { - svc.log.info('Schedule maven-critical already exists, skipping creation.') + svc.log.info('Schedule maven-ingestion already exists, skipping creation.') } else { throw err } } } - -// export async function scheduleMavenNonCritical(): Promise { -// const { temporal } = svc -// if (!temporal) throw new Error('Temporal client not initialized') - -// try { -// await temporal.schedule.create({ -// scheduleId: 'maven-non-critical', -// spec: { -// cronExpressions: ['*/10 * * * *'], -// }, -// policies: { -// overlap: ScheduleOverlapPolicy.SKIP, -// catchupWindow: '1 hour', -// }, -// action: { -// type: 'startWorkflow', -// workflowType: mavenNonCriticalWorkflow, -// taskQueue: 'packages-worker', -// workflowExecutionTimeout: '5 minutes', -// retry: { -// initialInterval: '30 seconds', -// backoffCoefficient: 2, -// maximumAttempts: 3, -// }, -// args: [], -// }, -// }) -// } catch (err) { -// if (err instanceof ScheduleAlreadyRunning) { -// svc.log.info('Schedule maven-non-critical already registered.') -// } else { -// throw err -// } -// } -// } diff --git a/services/apps/packages_worker/src/maven/workflows.ts b/services/apps/packages_worker/src/maven/workflows.ts index 223cee6701..048f5cde54 100644 --- a/services/apps/packages_worker/src/maven/workflows.ts +++ b/services/apps/packages_worker/src/maven/workflows.ts @@ -1,19 +1,21 @@ -import { proxyActivities } from '@temporalio/workflow' +import { continueAsNew, log, proxyActivities } from '@temporalio/workflow' import type * as activities from './activities' -const { processMavenCriticalBatch } = proxyActivities({ +const acts = proxyActivities({ startToCloseTimeout: '15 minutes', + retry: { + initialInterval: '30 seconds', + backoffCoefficient: 2, + maximumAttempts: 5, + }, }) -const { processMavenNonCriticalBatch } = proxyActivities({ - startToCloseTimeout: '5 minutes', -}) - -export async function mavenCriticalWorkflow(): Promise { - await processMavenCriticalBatch() -} - -export async function mavenNonCriticalWorkflow(): Promise { - await processMavenNonCriticalBatch() +export async function ingestMavenPackages(): Promise { + const result = await acts.processMavenCriticalBatch() + if (result.processed + result.skipped + result.unchanged === 0) { + log.info('Maven ingestion complete — no more work, exiting.', { ...result }) + return + } + await continueAsNew() } diff --git a/services/apps/packages_worker/src/workflows/index.ts b/services/apps/packages_worker/src/workflows/index.ts index 244bb26b6c..411ba509f8 100644 --- a/services/apps/packages_worker/src/workflows/index.ts +++ b/services/apps/packages_worker/src/workflows/index.ts @@ -15,7 +15,7 @@ export { ingestDependentCounts, } from '../deps-dev/workflows' export { osvSync } from '../osv/workflows' -export { mavenCriticalWorkflow, mavenNonCriticalWorkflow } from '../maven/workflows' +export { ingestMavenPackages } from '../maven/workflows' export { ingestScorecard } from '../scorecard/workflows' export { rankPackagesWorkflow } from '../criticality/workflow' export { cargoSyncWorkflow } from '../cargo/workflows'