Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/apps/packages_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export {
} from './npm/activities'
export * from './deps-dev/activities'
export { osvSyncEcosystem, osvDeriveCriticalFlag } from './osv/activities'
export { processMavenCriticalBatch, processMavenNonCriticalBatch } from './maven/activities'
export { processMavenCriticalBatch } from './maven/activities'
export { criticalityComputePageRank, rankPackages } from './criticality/activities'
export {
cargoDownloadAndLoad,
Expand Down
4 changes: 2 additions & 2 deletions services/apps/packages_worker/src/bin/maven-worker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { scheduleMavenCritical } from '../maven/schedule'
import { scheduleMavenIngestion } from '../maven/schedule'
import { svc } from '../service'

setImmediate(async () => {
await svc.init()
await scheduleMavenCritical()
await scheduleMavenIngestion()
await svc.start()
})
17 changes: 0 additions & 17 deletions services/apps/packages_worker/src/maven/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,3 @@ export async function processMavenCriticalBatch(): Promise<BatchResult> {
log.info({ ...result }, 'Maven critical batch complete')
return result
}

export async function processMavenNonCriticalBatch(): Promise<BatchResult> {
const config = getMavenConfig()
const qx = await getPackagesDb()
// Non-critical is DB-only (no POM fetch); the flag is unused on this path.
const result = await processBatch(qx, config, false, false)
log.info(
{
processed: result.processed,
skipped: result.skipped,
unchanged: result.unchanged,
error: result.error,
},
'Maven non-critical batch complete',
)
return result
}
97 changes: 36 additions & 61 deletions services/apps/packages_worker/src/maven/schedule.ts
Original file line number Diff line number Diff line change
@@ -1,78 +1,53 @@
import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client'

import { svc } from '../service'
import { mavenCriticalWorkflow } from '../workflows'
import { ingestMavenPackages } from '../workflows'

export async function scheduleMavenCritical(): Promise<void> {
const LEGACY_SCHEDULE_ID = 'maven-critical'

export async function scheduleMavenIngestion(): Promise<void> {
const { temporal } = svc
if (!temporal) throw new Error('Temporal client not initialized')

const scheduleOptions: Parameters<typeof temporal.schedule.create>[0] = {
scheduleId: 'maven-critical',
spec: {
cronExpressions: ['*/1 * * * *'],
},
policies: {
overlap: ScheduleOverlapPolicy.SKIP,
catchupWindow: '1 hour',
},
action: {
type: 'startWorkflow',
Comment thread
ulemons marked this conversation as resolved.
workflowType: mavenCriticalWorkflow,
taskQueue: 'packages-worker',
workflowExecutionTimeout: '15 minutes',
retry: {
initialInterval: '30 seconds',
backoffCoefficient: 2,
maximumAttempts: 3,
},
args: [],
},
try {
await temporal.schedule.getHandle(LEGACY_SCHEDULE_ID).delete()
svc.log.info({ scheduleId: LEGACY_SCHEDULE_ID }, 'Deleted legacy schedule.')
} catch (err) {
svc.log.warn(
{ err, scheduleId: LEGACY_SCHEDULE_ID },
'Failed to delete legacy schedule (may not exist).',
)
}
Comment thread
ulemons marked this conversation as resolved.

try {
await temporal.schedule.create(scheduleOptions)
await temporal.schedule.create({
scheduleId: 'maven-ingestion',
spec: {
cronExpressions: ['0 9 * * *'],
},
Comment thread
ulemons marked this conversation as resolved.
policies: {
overlap: ScheduleOverlapPolicy.SKIP,
catchupWindow: '1 hour',
},
action: {
type: 'startWorkflow',
workflowType: ingestMavenPackages,
workflowId: 'maven-daily-enrichment',
taskQueue: 'packages-worker',
workflowRunTimeout: '24 hours',
retry: {
initialInterval: '30 seconds',
backoffCoefficient: 2,
maximumAttempts: 3,
},
args: [],
},
})
} catch (err) {
if (err instanceof ScheduleAlreadyRunning) {
svc.log.info('Schedule maven-critical already exists, skipping creation.')
svc.log.info('Schedule maven-ingestion already exists, skipping creation.')
} else {
throw err
}
}
}

// export async function scheduleMavenNonCritical(): Promise<void> {
// const { temporal } = svc
// if (!temporal) throw new Error('Temporal client not initialized')

// try {
// await temporal.schedule.create({
// scheduleId: 'maven-non-critical',
// spec: {
// cronExpressions: ['*/10 * * * *'],
// },
// policies: {
// overlap: ScheduleOverlapPolicy.SKIP,
// catchupWindow: '1 hour',
// },
// action: {
// type: 'startWorkflow',
// workflowType: mavenNonCriticalWorkflow,
// taskQueue: 'packages-worker',
// workflowExecutionTimeout: '5 minutes',
// retry: {
// initialInterval: '30 seconds',
// backoffCoefficient: 2,
// maximumAttempts: 3,
// },
// args: [],
// },
// })
// } catch (err) {
// if (err instanceof ScheduleAlreadyRunning) {
// svc.log.info('Schedule maven-non-critical already registered.')
// } else {
// throw err
// }
// }
// }
26 changes: 14 additions & 12 deletions services/apps/packages_worker/src/maven/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import { proxyActivities } from '@temporalio/workflow'
import { continueAsNew, log, proxyActivities } from '@temporalio/workflow'

import type * as activities from './activities'

const { processMavenCriticalBatch } = proxyActivities<typeof activities>({
const acts = proxyActivities<typeof activities>({
startToCloseTimeout: '15 minutes',
retry: {
initialInterval: '30 seconds',
backoffCoefficient: 2,
maximumAttempts: 5,
},
})

const { processMavenNonCriticalBatch } = proxyActivities<typeof activities>({
startToCloseTimeout: '5 minutes',
})

export async function mavenCriticalWorkflow(): Promise<void> {
await processMavenCriticalBatch()
}

export async function mavenNonCriticalWorkflow(): Promise<void> {
await processMavenNonCriticalBatch()
export async function ingestMavenPackages(): Promise<void> {
const result = await acts.processMavenCriticalBatch()
if (result.processed + result.skipped + result.unchanged === 0) {
log.info('Maven ingestion complete — no more work, exiting.', { ...result })
return
Comment thread
ulemons marked this conversation as resolved.
}
Comment thread
ulemons marked this conversation as resolved.
await continueAsNew<typeof ingestMavenPackages>()
Comment thread
ulemons marked this conversation as resolved.
}
2 changes: 1 addition & 1 deletion services/apps/packages_worker/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export {
ingestDependentCounts,
} from '../deps-dev/workflows'
export { osvSync } from '../osv/workflows'
export { mavenCriticalWorkflow, mavenNonCriticalWorkflow } from '../maven/workflows'
export { ingestMavenPackages } from '../maven/workflows'
export { ingestScorecard } from '../scorecard/workflows'
export { rankPackagesWorkflow } from '../criticality/workflow'
export { cargoSyncWorkflow } from '../cargo/workflows'
Expand Down
Loading