From dd132b7721fcbbecae38082eedd8ffb809ae635a Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Thu, 25 Jun 2026 13:31:26 +0100 Subject: [PATCH 1/3] feat: implement nuget worker Signed-off-by: Mouad BANI --- .../V1782345600__nuget_total_downloads.sql | 1 + scripts/builders/packages.env | 2 +- scripts/services/nuget-worker.yaml | 65 ++++ services/apps/packages_worker/package.json | 3 + .../apps/packages_worker/src/activities.ts | 1 + .../packages_worker/src/bin/nuget-worker.ts | 8 + services/apps/packages_worker/src/config.ts | 10 + .../packages_worker/src/nuget/activities.ts | 19 ++ .../apps/packages_worker/src/nuget/client.ts | 151 +++++++++ .../packages_worker/src/nuget/normalize.ts | 154 +++++++++ .../src/nuget/runNuGetEnrichmentLoop.ts | 235 +++++++++++++ .../packages_worker/src/nuget/schedule.ts | 41 +++ .../apps/packages_worker/src/nuget/types.ts | 99 ++++++ .../packages_worker/src/nuget/workflows.ts | 21 ++ .../src/scripts/triggerNuGetSync.ts | 33 ++ .../src/scripts/triggerOsvSync.ts | 36 ++ .../packages_worker/src/workflows/index.ts | 1 + services/libs/data-access-layer/src/index.ts | 1 + .../data-access-layer/src/osspckgs/nuget.ts | 314 ++++++++++++++++++ 19 files changed, 1194 insertions(+), 1 deletion(-) create mode 100644 backend/src/osspckgs/migrations/V1782345600__nuget_total_downloads.sql create mode 100644 scripts/services/nuget-worker.yaml create mode 100644 services/apps/packages_worker/src/bin/nuget-worker.ts create mode 100644 services/apps/packages_worker/src/nuget/activities.ts create mode 100644 services/apps/packages_worker/src/nuget/client.ts create mode 100644 services/apps/packages_worker/src/nuget/normalize.ts create mode 100644 services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts create mode 100644 services/apps/packages_worker/src/nuget/schedule.ts create mode 100644 services/apps/packages_worker/src/nuget/types.ts create mode 100644 services/apps/packages_worker/src/nuget/workflows.ts create mode 100644 services/apps/packages_worker/src/scripts/triggerNuGetSync.ts create mode 100644 services/apps/packages_worker/src/scripts/triggerOsvSync.ts create mode 100644 services/libs/data-access-layer/src/osspckgs/nuget.ts diff --git a/backend/src/osspckgs/migrations/V1782345600__nuget_total_downloads.sql b/backend/src/osspckgs/migrations/V1782345600__nuget_total_downloads.sql new file mode 100644 index 0000000000..e942aa2e1a --- /dev/null +++ b/backend/src/osspckgs/migrations/V1782345600__nuget_total_downloads.sql @@ -0,0 +1 @@ +ALTER TABLE packages ADD COLUMN IF NOT EXISTS total_downloads bigint; diff --git a/scripts/builders/packages.env b/scripts/builders/packages.env index 03cc6369b7..191e44ea95 100644 --- a/scripts/builders/packages.env +++ b/scripts/builders/packages.env @@ -1,4 +1,4 @@ DOCKERFILE="./services/docker/Dockerfile.packages" CONTEXT="../" REPO="sjc.ocir.io/axbydjxa5zuh/packages" -SERVICES="github-repos-enricher bq-dataset-ingest npm-worker maven-worker osv-worker dockerhub-sync cargo-worker go-worker" +SERVICES="github-repos-enricher bq-dataset-ingest npm-worker maven-worker osv-worker dockerhub-sync cargo-worker go-worker nuget-worker" diff --git a/scripts/services/nuget-worker.yaml b/scripts/services/nuget-worker.yaml new file mode 100644 index 0000000000..660f609446 --- /dev/null +++ b/scripts/services/nuget-worker.yaml @@ -0,0 +1,65 @@ +version: '3.1' + +x-env-args: &env-args + DOCKER_BUILDKIT: 1 + NODE_ENV: docker + SERVICE: nuget-worker + SHELL: /bin/sh + SUPPRESS_NO_CONFIG_WARNING: 'true' + CROWD_TEMPORAL_TASKQUEUE: nuget-worker + +services: + nuget-worker: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.packages + command: 'pnpm run start:nuget-worker' + working_dir: /usr/crowd/app/services/apps/packages_worker + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + restart: always + networks: + - crowd-bridge + + nuget-worker-dev: + build: + context: ../../ + dockerfile: ./scripts/services/docker/Dockerfile.packages + command: 'pnpm run dev:nuget-worker' + working_dir: /usr/crowd/app/services/apps/packages_worker + env_file: + - ../../backend/.env.dist.local + - ../../backend/.env.dist.composed + - ../../backend/.env.override.local + - ../../backend/.env.override.composed + environment: + <<: *env-args + hostname: nuget-worker + networks: + - crowd-bridge + volumes: + - ../../services/libs/audit-logs/src:/usr/crowd/app/services/libs/audit-logs/src + - ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src + - ../../services/libs/common_services/src:/usr/crowd/app/services/libs/common_services/src + - ../../services/libs/data-access-layer/src:/usr/crowd/app/services/libs/data-access-layer/src + - ../../services/libs/database/src:/usr/crowd/app/services/libs/database/src + - ../../services/libs/integrations/src:/usr/crowd/app/services/libs/integrations/src + - ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src + - ../../services/libs/nango/src:/usr/crowd/app/services/libs/nango/src + - ../../services/libs/opensearch/src:/usr/crowd/app/services/libs/opensearch/src + - ../../services/libs/queue/src:/usr/crowd/app/services/libs/queue/src + - ../../services/libs/redis/src:/usr/crowd/app/services/libs/redis/src + - ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src + - ../../services/libs/telemetry/src:/usr/crowd/app/services/libs/telemetry/src + - ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src + - ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src + - ../../services/apps/packages_worker/src:/usr/crowd/app/services/apps/packages_worker/src + +networks: + crowd-bridge: + external: true diff --git a/services/apps/packages_worker/package.json b/services/apps/packages_worker/package.json index d49e5c2f34..1388953be4 100644 --- a/services/apps/packages_worker/package.json +++ b/services/apps/packages_worker/package.json @@ -38,6 +38,9 @@ "start:go-worker": "CROWD_TEMPORAL_TASKQUEUE=go-worker SERVICE=go-worker tsx src/bin/go-worker.ts", "dev:go-worker": "CROWD_TEMPORAL_TASKQUEUE=go-worker SERVICE=go-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9241 src/bin/go-worker.ts", "dev:go-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=go-worker SERVICE=go-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9241 src/bin/go-worker.ts", + "start:nuget-worker": "CROWD_TEMPORAL_TASKQUEUE=nuget-worker SERVICE=nuget-worker tsx src/bin/nuget-worker.ts", + "dev:nuget-worker": "CROWD_TEMPORAL_TASKQUEUE=nuget-worker SERVICE=nuget-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9242 src/bin/nuget-worker.ts", + "dev:nuget-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=nuget-worker SERVICE=nuget-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9242 src/bin/nuget-worker.ts", "backfill:maven": "SERVICE=maven tsx src/bin/maven-backfill.ts", "backfill:maven:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=maven LOG_LEVEL=info tsx src/bin/maven-backfill.ts", "backfill:stewardship": "SERVICE=stewardship-backfill tsx src/bin/stewardship-backfill.ts", diff --git a/services/apps/packages_worker/src/activities.ts b/services/apps/packages_worker/src/activities.ts index ebb4594484..aa564ce3fa 100644 --- a/services/apps/packages_worker/src/activities.ts +++ b/services/apps/packages_worker/src/activities.ts @@ -25,3 +25,4 @@ export { cargoCleanup, } from './cargo/activities' export { enrichGoVersionsBatch, enrichGoStatusBatch } from './go/activities' +export { processNuGetBatch } from './nuget/activities' diff --git a/services/apps/packages_worker/src/bin/nuget-worker.ts b/services/apps/packages_worker/src/bin/nuget-worker.ts new file mode 100644 index 0000000000..3895edf949 --- /dev/null +++ b/services/apps/packages_worker/src/bin/nuget-worker.ts @@ -0,0 +1,8 @@ +import { scheduleNuGetIngestion } from '../nuget/schedule' +import { svc } from '../service' + +setImmediate(async () => { + await svc.init() + await scheduleNuGetIngestion() + await svc.start() +}) diff --git a/services/apps/packages_worker/src/config.ts b/services/apps/packages_worker/src/config.ts index 3fb773261c..c2c7a7875e 100644 --- a/services/apps/packages_worker/src/config.ts +++ b/services/apps/packages_worker/src/config.ts @@ -70,6 +70,16 @@ export function getGoConfig() { } } +export function getNuGetConfig() { + return { + batchSize: parseInt(process.env.NUGET_FETCHER_BATCH_SIZE ?? '500', 10), + concurrency: parseInt(process.env.NUGET_FETCHER_CONCURRENCY ?? '8', 10), + groupDelayMs: parseInt(process.env.NUGET_FETCHER_GROUP_DELAY_MS ?? '250', 10), + isCritical: (process.env.NUGET_FETCHER_IS_CRITICAL ?? 'false') === 'true', + userAgent: process.env.NUGET_USER_AGENT, + } +} + export function getDockerhubConfig() { return { hubBaseUrl: requireEnv('DOCKERHUB_API_BASE_URL'), diff --git a/services/apps/packages_worker/src/nuget/activities.ts b/services/apps/packages_worker/src/nuget/activities.ts new file mode 100644 index 0000000000..5f0fd8f50b --- /dev/null +++ b/services/apps/packages_worker/src/nuget/activities.ts @@ -0,0 +1,19 @@ +import { getServiceChildLogger } from '@crowd/logging' + +import { getNuGetConfig } from '../config' +import { getPackagesDb } from '../db' + +import { processBatch } from './runNuGetEnrichmentLoop' +import { BatchResult } from './types' + +const log = getServiceChildLogger('nuget-activity') + +export async function processNuGetBatch(): Promise { + const config = getNuGetConfig() + const qx = await getPackagesDb() + + const today = new Date().toISOString().split('T')[0] + const result = await processBatch(qx, config, today) + log.info({ ...result }, 'NuGet batch complete') + return result +} diff --git a/services/apps/packages_worker/src/nuget/client.ts b/services/apps/packages_worker/src/nuget/client.ts new file mode 100644 index 0000000000..90afef67f0 --- /dev/null +++ b/services/apps/packages_worker/src/nuget/client.ts @@ -0,0 +1,151 @@ +import axios from 'axios' + +import { NuGetFetchError, NuGetRegistrationIndex, NuGetRegistrationPage, NuGetSearchItem } from './types' + +const SERVICE_INDEX_URL = 'https://api.nuget.org/v3/index.json' + +interface ServiceIndexResource { + '@id': string + '@type': string +} + +interface ServiceIndex { + resources: ServiceIndexResource[] +} + +interface ResolvedEndpoints { + searchBaseUrl: string + registrationBaseUrl: string +} + +let cachedEndpoints: ResolvedEndpoints | null = null + +async function resolveEndpoints(userAgent?: string): Promise { + if (cachedEndpoints) return cachedEndpoints + + const resp = await axios.get(SERVICE_INDEX_URL, { + headers: userAgent ? { 'User-Agent': userAgent } : {}, + timeout: 10000, + }) + + const resources = resp.data.resources + + const findFirst = (...types: string[]): string | undefined => { + for (const t of types) { + const found = resources.find((r) => r['@type'] === t) + if (found) return found['@id'] + } + return undefined + } + + const searchBaseUrl = findFirst('SearchQueryService/3.5.0', 'SearchQueryService') + const registrationBaseUrl = findFirst( + 'RegistrationsBaseUrl/3.6.0', + 'RegistrationsBaseUrl/3.5.0', + 'RegistrationsBaseUrl', + ) + + if (!searchBaseUrl || !registrationBaseUrl) { + throw new Error('NuGet service index missing required endpoints') + } + + cachedEndpoints = { searchBaseUrl, registrationBaseUrl } + return cachedEndpoints +} + +function classifyError(err: unknown): NuGetFetchError | null { + if (!axios.isAxiosError(err)) return null + const status = err.response?.status + if (status === 404) return { kind: 'NOT_FOUND', status, message: err.message } + if (status === 429) return { kind: 'RATE_LIMIT', status, message: err.message } + return null +} + +export async function fetchSearch( + packageId: string, + userAgent?: string, +): Promise { + const { searchBaseUrl } = await resolveEndpoints(userAgent) + const lowerPackageId = packageId.toLowerCase() + + try { + const resp = await axios.get<{ totalHits: number; data: NuGetSearchItem[] }>(searchBaseUrl, { + params: { + q: `packageid:${packageId}`, + prerelease: 'true', + semVerLevel: '2.0.0', + take: 20, + }, + headers: { + ...(userAgent ? { 'User-Agent': userAgent } : {}), + 'Accept-Encoding': 'gzip', + }, + timeout: 15000, + }) + + const match = resp.data.data.find((item) => item.id.toLowerCase() === lowerPackageId) + if (!match) { + return { kind: 'NOT_FOUND', message: `Package ${packageId} not found in search results` } + } + return match + } catch (err) { + const classified = classifyError(err) + if (classified) return classified + throw err + } +} + +async function fetchRegistrationPage( + pageId: string, + userAgent?: string, +): Promise { + const resp = await axios.get(pageId, { + headers: { + ...(userAgent ? { 'User-Agent': userAgent } : {}), + 'Accept-Encoding': 'gzip', + }, + timeout: 15000, + }) + return resp.data +} + +export async function fetchRegistration( + packageId: string, + userAgent?: string, +): Promise { + const { registrationBaseUrl } = await resolveEndpoints(userAgent) + const lowerId = packageId.toLowerCase() + + try { + const resp = await axios.get( + `${registrationBaseUrl}${lowerId}/index.json`, + { + headers: { + ...(userAgent ? { 'User-Agent': userAgent } : {}), + 'Accept-Encoding': 'gzip', + }, + timeout: 15000, + }, + ) + + const index = resp.data + + for (let i = 0; i < index.items.length; i++) { + const page = index.items[i] + if (!page.items) { + try { + const fullPage = await fetchRegistrationPage(page['@id'], userAgent) + index.items[i] = { ...page, items: fullPage.items ?? [] } + } catch { + index.items[i] = { ...page, items: [] } + } + } + } + + return index + } catch (err) { + const classified = classifyError(err) + if (classified) return classified + throw err + } +} diff --git a/services/apps/packages_worker/src/nuget/normalize.ts b/services/apps/packages_worker/src/nuget/normalize.ts new file mode 100644 index 0000000000..0126f421c0 --- /dev/null +++ b/services/apps/packages_worker/src/nuget/normalize.ts @@ -0,0 +1,154 @@ +import { + NormalizedNuGetPackage, + NormalizedNuGetVersion, + NuGetCatalogEntry, + NuGetRegistrationIndex, + NuGetSearchItem, +} from './types' + +function parseAuthors(authors: string | string[] | undefined): string[] { + if (!authors) return [] + if (Array.isArray(authors)) return authors.filter(Boolean) + return authors + .split(',') + .map((a) => a.trim()) + .filter(Boolean) +} + +function isPrerelease(version: string): boolean { + return version.includes('-') +} + +function normalizeRepoUrl(url: string | undefined): string | null { + if (!url) return null + return url + .trim() + .replace(/\.git$/, '') + .replace(/^git\+/, '') + .replace(/^git:\/\//, 'https://') + .replace(/^http:\/\/github\.com\//, 'https://github.com/') +} + +function parseLicense( + licenseExpression: string | undefined, + licenseUrl: string | undefined, +): { licenses: string[] | null; licensesRaw: string | null } { + if (licenseExpression) { + return { licenses: [licenseExpression], licensesRaw: licenseExpression } + } + if (licenseUrl) { + return { licenses: null, licensesRaw: licenseUrl } + } + return { licenses: null, licensesRaw: null } +} + +export function normalizeNuGetPackage( + packageId: string, + searchResult: NuGetSearchItem | null, + registration: NuGetRegistrationIndex, +): NormalizedNuGetPackage { + const allLeaves = registration.items.flatMap((page) => page.items ?? []) + const allEntries: NuGetCatalogEntry[] = allLeaves.map((leaf) => leaf.catalogEntry) + + const listedEntries = allEntries.filter((e) => e.listed !== false) + const latestListedEntry = + listedEntries.length > 0 ? listedEntries[listedEntries.length - 1] : null + const latestEntry = allEntries.length > 0 ? allEntries[allEntries.length - 1] : null + + const latestVersion = + searchResult?.version ?? latestListedEntry?.version ?? latestEntry?.version ?? null + + const latestEntry4License = latestListedEntry ?? latestEntry + const { licenses, licensesRaw } = parseLicense( + latestEntry4License?.licenseExpression, + latestEntry4License?.licenseUrl, + ) + + const description = + searchResult?.description || + searchResult?.summary || + latestListedEntry?.description || + null + + const homepage = + searchResult?.projectUrl || latestListedEntry?.projectUrl || null + + const declaredRepositoryUrl = + latestEntry4License?.repository?.url + ? latestEntry4License.repository.url + : null + const repositoryUrl = normalizeRepoUrl(declaredRepositoryUrl) + + const keywords = + searchResult?.tags && searchResult.tags.length > 0 ? searchResult.tags : null + + let status: 'active' | 'deprecated' | 'unpublished' + if (listedEntries.length === 0) { + status = 'unpublished' + } else if (latestListedEntry?.deprecation) { + status = 'deprecated' + } else { + status = 'active' + } + + const publishedDates = allEntries + .filter((e) => e.published) + .map((e) => new Date(e.published!)) + .filter((d) => !isNaN(d.getTime())) + .sort((a, b) => a.getTime() - b.getTime()) + + const firstReleaseAt = publishedDates.length > 0 ? publishedDates[0] : null + + const latestEntry4Date = latestListedEntry ?? latestEntry + const latestReleaseAt = + latestEntry4Date?.published ? new Date(latestEntry4Date.published) : null + + const totalDownloads = searchResult?.totalDownloads ?? 0 + + const owners = searchResult?.owners ?? [] + + const authors = parseAuthors( + searchResult?.authors ?? latestEntry4License?.authors, + ) + + const searchVersionMap = new Map() + if (searchResult?.versions) { + for (const v of searchResult.versions) { + searchVersionMap.set(v.version.toLowerCase(), v.downloads) + } + } + + const versions: NormalizedNuGetVersion[] = allEntries.map((entry) => { + const ver = entry.version + const dlCount = searchVersionMap.get(ver.toLowerCase()) ?? null + const { licenses: vLicenses } = parseLicense(entry.licenseExpression, entry.licenseUrl) + return { + number: ver, + publishedAt: entry.published ? new Date(entry.published) : null, + isLatest: ver === latestVersion, + isPrerelease: isPrerelease(ver), + isYanked: entry.listed === false, + licenses: vLicenses, + downloadCount: dlCount, + } + }) + + return { + description, + homepage: homepage || null, + declaredRepositoryUrl, + repositoryUrl, + licenses, + licensesRaw, + keywords, + status, + latestVersion, + versionsCount: allEntries.length, + firstReleaseAt, + latestReleaseAt: latestReleaseAt && !isNaN(latestReleaseAt.getTime()) ? latestReleaseAt : null, + totalDownloads, + owners, + authors, + versions, + } +} diff --git a/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts b/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts new file mode 100644 index 0000000000..dba2043595 --- /dev/null +++ b/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts @@ -0,0 +1,235 @@ +import { + IDbNuGetVersionUpsert, + NuGetPackageToSync, + QueryExecutor, + listNuGetPackagesToSync, + logAuditFieldChange, + recordNuGetDownloadSnapshot, + replacePackageMaintainers, + upsertMaintainer, + upsertNuGetPackage, + upsertNuGetVersionsBatch, +} from '@crowd/data-access-layer' +import { getServiceChildLogger } from '@crowd/logging' + +import { getNuGetConfig } from '../config' + +import { fetchRegistration, fetchSearch } from './client' +import { normalizeNuGetPackage } from './normalize' +import { BatchResult, isNuGetFetchError } from './types' + +const log = getServiceChildLogger('nuget') + +type NuGetConfig = ReturnType +type PackageRow = NuGetPackageToSync + +async function withDeadlockRetry(fn: () => Promise, maxAttempts = 4): Promise { + for (let attempt = 1; ; attempt++) { + try { + return await fn() + } catch (err) { + const code = (err as { code?: string }).code + const isDeadlock = + code === '40P01' || /deadlock detected/i.test(String((err as Error)?.message)) + if (isDeadlock && attempt < maxAttempts) { + await new Promise((r) => setTimeout(r, 50 * attempt + Math.random() * 100)) + log.debug({ attempt }, 'Deadlock detected — retrying transaction') + continue + } + throw err + } + } +} + +function nugetRegistryUrl(packageId: string): string { + return `https://www.nuget.org/packages/${packageId}` +} + +type PackageStatus = 'processed' | 'skipped' | 'error' | 'unchanged' + +async function processPackage( + qx: QueryExecutor, + pkg: PackageRow, + config: NuGetConfig, + today: string, +): Promise { + const packageId = pkg.name + + const [searchResult, registrationResult] = await Promise.all([ + fetchSearch(packageId, config.userAgent), + fetchRegistration(packageId, config.userAgent), + ]) + + if (isNuGetFetchError(registrationResult)) { + if (registrationResult.kind === 'NOT_FOUND') { + await upsertNuGetPackage(qx, { + purl: pkg.purl, + name: pkg.name, + description: null, + homepage: null, + declaredRepositoryUrl: null, + repositoryUrl: null, + licenses: null, + licensesRaw: null, + keywords: null, + status: null, + latestVersion: pkg.latestVersion, + versionsCount: null, + firstReleaseAt: null, + latestReleaseAt: null, + registryUrl: nugetRegistryUrl(packageId), + ingestionSource: 'nuget_not_found', + dependentPackagesCount: pkg.dependentPackagesCount, + dependentReposCount: pkg.dependentReposCount, + }) + log.warn({ purl: pkg.purl }, 'Package not found on NuGet registry — writing minimal record') + return 'skipped' + } + if (registrationResult.kind === 'RATE_LIMIT') { + log.warn({ purl: pkg.purl }, 'Rate limited by NuGet registry — will retry next pass') + return 'error' + } + throw new Error( + `Transient error fetching registration for ${pkg.purl}: ${registrationResult.message}`, + ) + } + + const searchItem = isNuGetFetchError(searchResult) ? null : searchResult + + const normalized = normalizeNuGetPackage(packageId, searchItem, registrationResult) + + await withDeadlockRetry(() => + qx.tx(async (t) => { + const changed = new Set() + + const { id: packageDbId, changedFields: pkgChanged } = await upsertNuGetPackage(t, { + purl: pkg.purl, + name: pkg.name, + description: normalized.description, + homepage: normalized.homepage, + declaredRepositoryUrl: normalized.declaredRepositoryUrl, + repositoryUrl: normalized.repositoryUrl, + licenses: normalized.licenses, + licensesRaw: normalized.licensesRaw, + keywords: normalized.keywords, + status: normalized.status, + latestVersion: normalized.latestVersion, + versionsCount: normalized.versionsCount > 0 ? normalized.versionsCount : null, + firstReleaseAt: normalized.firstReleaseAt, + latestReleaseAt: normalized.latestReleaseAt, + registryUrl: nugetRegistryUrl(packageId), + ingestionSource: 'nuget-registry', + dependentPackagesCount: pkg.dependentPackagesCount, + dependentReposCount: pkg.dependentReposCount, + }) + pkgChanged.forEach((f) => changed.add(f)) + + if (normalized.versions.length > 0) { + const versionRows: IDbNuGetVersionUpsert[] = normalized.versions.map((v) => ({ + packageId: packageDbId, + name: pkg.name, + number: v.number, + publishedAt: v.publishedAt, + isLatest: v.isLatest, + isPrerelease: v.isPrerelease, + isYanked: v.isYanked, + licenses: v.licenses, + downloadCount: v.downloadCount !== null ? BigInt(v.downloadCount) : null, + })) + const verChanged = await upsertNuGetVersionsBatch(t, versionRows) + verChanged.forEach((f) => changed.add(f)) + } + + const allPeople = [ + ...normalized.owners.map((username) => ({ username, role: 'maintainer' as const })), + ...normalized.authors + .filter((a) => !normalized.owners.includes(a)) + .map((username) => ({ username, role: 'author' as const })), + ].sort((a, b) => a.username.localeCompare(b.username)) + + const maintainerLinks: Array<{ maintainerId: number; role: 'author' | 'maintainer' }> = [] + for (const person of allPeople) { + if (!person.username) continue + const { id: maintainerId, changedFields: mChanged } = await upsertMaintainer(t, { + ecosystem: 'nuget', + username: person.username, + displayName: person.username, + url: null, + email: null, + }) + mChanged.forEach((f) => changed.add(f)) + maintainerLinks.push({ maintainerId, role: person.role }) + } + + if (maintainerLinks.length > 0) { + const pmChanged = await replacePackageMaintainers(t, packageDbId, maintainerLinks) + pmChanged.forEach((f) => changed.add(f)) + } + + if (normalized.totalDownloads > 0) { + const dlChanged = await recordNuGetDownloadSnapshot(t, { + packageId: packageDbId, + purl: pkg.purl, + totalDownloads: normalized.totalDownloads, + today, + }) + dlChanged.forEach((f) => changed.add(f)) + } + + await logAuditFieldChange(t, 'nuget', pkg.purl, Array.from(changed)) + + log.debug( + { + purl: pkg.purl, + versions: normalized.versions.length, + maintainers: maintainerLinks.length, + totalDownloads: normalized.totalDownloads, + }, + 'ok', + ) + }), + ) + + return 'processed' +} + +export async function processBatch(qx: QueryExecutor, config: NuGetConfig, today: string): Promise { + const packages = await listNuGetPackagesToSync(qx, { + limit: config.batchSize, + isCritical: config.isCritical, + }) + + if (packages.length === 0) return { processed: 0, skipped: 0, error: 0, unchanged: 0 } + + log.info({ count: packages.length }, 'Batch started') + + const counts = { processed: 0, skipped: 0, error: 0, unchanged: 0 } + + for (let batchStart = 0; batchStart < packages.length; batchStart += config.concurrency) { + const group = packages.slice(batchStart, batchStart + config.concurrency) + + if (config.groupDelayMs > 0 && batchStart > 0) { + await new Promise((r) => setTimeout(r, config.groupDelayMs)) + } + + await Promise.all( + group.map(async (pkg) => { + try { + const status = await processPackage(qx, pkg, config, today) + counts[status]++ + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + log.error({ purl: pkg.purl, error: message }, 'Unexpected error processing package') + counts.error++ + } + }), + ) + + const done = batchStart + group.length + if (done % 1000 === 0 || done === packages.length) { + log.info({ done, total: packages.length, ...counts }, 'Progress') + } + } + + return counts +} diff --git a/services/apps/packages_worker/src/nuget/schedule.ts b/services/apps/packages_worker/src/nuget/schedule.ts new file mode 100644 index 0000000000..a8a5390c33 --- /dev/null +++ b/services/apps/packages_worker/src/nuget/schedule.ts @@ -0,0 +1,41 @@ +import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client' + +import { svc } from '../service' +import { ingestNuGetPackages } from '../workflows' + +export async function scheduleNuGetIngestion(): Promise { + const { temporal } = svc + if (!temporal) throw new Error('Temporal client not initialized') + + try { + await temporal.schedule.create({ + scheduleId: 'nuget-registry-ingest', + spec: { + cronExpressions: ['0 7 * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.SKIP, + catchupWindow: '1 hour', + }, + action: { + type: 'startWorkflow', + workflowType: ingestNuGetPackages, + workflowId: 'nuget-daily-enrichment', + taskQueue: 'nuget-worker', + workflowRunTimeout: '24 hours', + retry: { + initialInterval: '30 seconds', + backoffCoefficient: 2, + maximumAttempts: 5, + }, + args: [], + }, + }) + } catch (err) { + if (err instanceof ScheduleAlreadyRunning) { + svc.log.info('Schedule nuget-registry-ingest already exists, skipping creation.') + } else { + throw err + } + } +} diff --git a/services/apps/packages_worker/src/nuget/types.ts b/services/apps/packages_worker/src/nuget/types.ts new file mode 100644 index 0000000000..f527cac023 --- /dev/null +++ b/services/apps/packages_worker/src/nuget/types.ts @@ -0,0 +1,99 @@ +export interface NuGetConfig { + batchSize: number + concurrency: number + groupDelayMs: number + isCritical: boolean + userAgent: string | undefined +} + +export interface BatchResult { + processed: number + skipped: number + error: number + unchanged: number +} + +export interface NuGetFetchError { + kind: 'NOT_FOUND' | 'RATE_LIMIT' + status?: number + message: string +} + +export type NuGetFetchResult = T | NuGetFetchError + +export function isNuGetFetchError(r: NuGetFetchResult): r is NuGetFetchError { + return typeof r === 'object' && r !== null && 'kind' in (r as object) +} + +export interface NuGetSearchItem { + id: string + version: string + description?: string + summary?: string + authors?: string | string[] + owners?: string[] + projectUrl?: string + totalDownloads?: number + tags?: string[] + versions?: Array<{ version: string; downloads: number }> +} + +export interface NuGetCatalogEntry { + id: string + version: string + description?: string + authors?: string | string[] + licenseExpression?: string + licenseUrl?: string + listed?: boolean + published?: string + projectUrl?: string + deprecation?: { message?: string; alternatePackage?: unknown } + repository?: { type?: string; url?: string } +} + +export interface NuGetRegistrationLeaf { + catalogEntry: NuGetCatalogEntry +} + +export interface NuGetRegistrationPage { + '@id': string + count: number + lower: string + upper: string + items?: NuGetRegistrationLeaf[] +} + +export interface NuGetRegistrationIndex { + count: number + items: NuGetRegistrationPage[] +} + +export interface NormalizedNuGetPackage { + description: string | null + homepage: string | null + declaredRepositoryUrl: string | null + repositoryUrl: string | null + licenses: string[] | null + licensesRaw: string | null + keywords: string[] | null + status: 'active' | 'deprecated' | 'unpublished' + latestVersion: string | null + versionsCount: number + firstReleaseAt: Date | null + latestReleaseAt: Date | null + totalDownloads: number + owners: string[] + authors: string[] + versions: NormalizedNuGetVersion[] +} + +export interface NormalizedNuGetVersion { + number: string + publishedAt: Date | null + isLatest: boolean + isPrerelease: boolean + isYanked: boolean + licenses: string[] | null + downloadCount: number | null +} diff --git a/services/apps/packages_worker/src/nuget/workflows.ts b/services/apps/packages_worker/src/nuget/workflows.ts new file mode 100644 index 0000000000..b0a57ee372 --- /dev/null +++ b/services/apps/packages_worker/src/nuget/workflows.ts @@ -0,0 +1,21 @@ +import { continueAsNew, log, proxyActivities } from '@temporalio/workflow' + +import type * as activities from './activities' + +const acts = proxyActivities({ + startToCloseTimeout: '30 minutes', + retry: { + initialInterval: '30 seconds', + backoffCoefficient: 2, + maximumAttempts: 5, + }, +}) + +export async function ingestNuGetPackages(): Promise { + const result = await acts.processNuGetBatch() + if (result.processed + result.skipped + result.error + result.unchanged === 0) { + log.info('NuGet ingestion complete — no more work, exiting.', { ...result }) + return + } + await continueAsNew() +} diff --git a/services/apps/packages_worker/src/scripts/triggerNuGetSync.ts b/services/apps/packages_worker/src/scripts/triggerNuGetSync.ts new file mode 100644 index 0000000000..e7ca7fdbde --- /dev/null +++ b/services/apps/packages_worker/src/scripts/triggerNuGetSync.ts @@ -0,0 +1,33 @@ +import { TEMPORAL_CONFIG, getTemporalClient } from '@crowd/temporal' + +import { ingestNuGetPackages } from '../nuget/workflows' + +async function main(): Promise { + const cfg = TEMPORAL_CONFIG() + if (!cfg.serverUrl || !cfg.namespace) { + console.error('Missing CROWD_TEMPORAL_SERVER_URL or CROWD_TEMPORAL_NAMESPACE') + process.exit(1) + } + + const client = await getTemporalClient(cfg) + + const workflowId = `nuget-sync-manual-${Date.now()}` + const handle = await client.workflow.start(ingestNuGetPackages, { + taskQueue: 'nuget-worker', + workflowId, + args: [], + }) + + console.log(`Started workflow ${handle.workflowId}`) + console.log('Waiting for completion...') + + await handle.result() + console.log('Done') +} + +main() + .then(() => process.exit(0)) + .catch((err) => { + console.error('Failed:', err) + process.exit(1) + }) diff --git a/services/apps/packages_worker/src/scripts/triggerOsvSync.ts b/services/apps/packages_worker/src/scripts/triggerOsvSync.ts new file mode 100644 index 0000000000..26e99a869e --- /dev/null +++ b/services/apps/packages_worker/src/scripts/triggerOsvSync.ts @@ -0,0 +1,36 @@ +import { TEMPORAL_CONFIG, getTemporalClient } from '@crowd/temporal' + +import { osvSync } from '../osv/workflows' + +async function main(): Promise { + const raw = process.argv[2] + const ecosystems = raw ? raw.split(',').map((e) => e.trim()) : ['cargo'] + + const cfg = TEMPORAL_CONFIG() + if (!cfg.serverUrl || !cfg.namespace) { + console.error('Missing CROWD_TEMPORAL_SERVER_URL or CROWD_TEMPORAL_NAMESPACE') + process.exit(1) + } + + const client = await getTemporalClient(cfg) + + const workflowId = `osv-sync-manual-${ecosystems.join('-')}-${Date.now()}` + const handle = await client.workflow.start(osvSync, { + taskQueue: 'osv-worker', + workflowId, + args: [{ ecosystems }], + }) + + console.log(`Started workflow ${handle.workflowId} (ecosystems: ${ecosystems.join(', ')})`) + console.log('Waiting for completion...') + + await handle.result() + console.log('Done') +} + +main() + .then(() => process.exit(0)) + .catch((err) => { + console.error('Failed:', err) + process.exit(1) + }) diff --git a/services/apps/packages_worker/src/workflows/index.ts b/services/apps/packages_worker/src/workflows/index.ts index 411ba509f8..219aa742fc 100644 --- a/services/apps/packages_worker/src/workflows/index.ts +++ b/services/apps/packages_worker/src/workflows/index.ts @@ -20,3 +20,4 @@ export { ingestScorecard } from '../scorecard/workflows' export { rankPackagesWorkflow } from '../criticality/workflow' export { cargoSyncWorkflow } from '../cargo/workflows' export { enrichGoVersions, enrichGoStatus } from '../go/workflows' +export { ingestNuGetPackages } from '../nuget/workflows' diff --git a/services/libs/data-access-layer/src/index.ts b/services/libs/data-access-layer/src/index.ts index 95b8cba162..2bcbdacc47 100644 --- a/services/libs/data-access-layer/src/index.ts +++ b/services/libs/data-access-layer/src/index.ts @@ -22,4 +22,5 @@ export * from './osspckgs/packages' export * from './osspckgs/repos' export * from './osspckgs/stewardships' export * from './osspckgs/versions' +export * from './osspckgs/nuget' export * from './osspckgs/api' diff --git a/services/libs/data-access-layer/src/osspckgs/nuget.ts b/services/libs/data-access-layer/src/osspckgs/nuget.ts new file mode 100644 index 0000000000..fba26a06fa --- /dev/null +++ b/services/libs/data-access-layer/src/osspckgs/nuget.ts @@ -0,0 +1,314 @@ +import { QueryExecutor } from '../queryExecutor' +import { insertDailyDownloads } from '../packages/downloadsDaily' +import { upsertLast30dDownload } from '../packages/downloadsLast30d' + +// ─── Types ──────────────────────────────────────────────────────────────────── + +export type NuGetPackageToSync = { + id: number + purl: string + name: string + dependentPackagesCount: number | null + dependentReposCount: number | null + latestVersion: string | null +} + +export type IDbNuGetPackageUpsert = { + purl: string + name: string + description: string | null + homepage: string | null + declaredRepositoryUrl: string | null + repositoryUrl: string | null + licenses: string[] | null + licensesRaw: string | null + keywords: string[] | null + status: string | null + latestVersion: string | null + versionsCount: number | null + firstReleaseAt: Date | null + latestReleaseAt: Date | null + registryUrl: string | null + ingestionSource: string + dependentPackagesCount?: number | null + dependentReposCount?: number | null +} + +export type IDbNuGetVersionUpsert = { + packageId: number + name: string + number: string + publishedAt: Date | null + isLatest: boolean + isPrerelease: boolean + isYanked: boolean | null + licenses: string[] | null + downloadCount: bigint | null +} + +// ─── Package upsert ─────────────────────────────────────────────────────────── + +export async function upsertNuGetPackage( + qx: QueryExecutor, + item: IDbNuGetPackageUpsert, +): Promise<{ id: number; changedFields: string[] }> { + const row = await qx.selectOne( + ` + WITH old AS ( + SELECT description, homepage, declared_repository_url, repository_url, + licenses, licenses_raw, keywords, status, + latest_version, versions_count, first_release_at, latest_release_at, + registry_url, ingestion_source + FROM packages WHERE purl = $(purl) + ), + ins AS ( + INSERT INTO packages ( + purl, ecosystem, namespace, name, + description, homepage, declared_repository_url, repository_url, + licenses, licenses_raw, keywords, status, + latest_version, versions_count, first_release_at, latest_release_at, + registry_url, ingestion_source, last_synced_at, created_at + ) VALUES ( + $(purl), 'nuget', NULL, $(name), + $(description), $(homepage), $(declaredRepositoryUrl), $(repositoryUrl), + $(licenses)::text[], $(licensesRaw), $(keywords)::text[], $(status), + $(latestVersion), $(versionsCount), $(firstReleaseAt), $(latestReleaseAt), + $(registryUrl), $(ingestionSource), NOW(), NOW() + ) + ON CONFLICT (purl) DO UPDATE SET + description = COALESCE(EXCLUDED.description, packages.description), + homepage = COALESCE(EXCLUDED.homepage, packages.homepage), + declared_repository_url = COALESCE(EXCLUDED.declared_repository_url, packages.declared_repository_url), + repository_url = COALESCE(EXCLUDED.repository_url, packages.repository_url), + licenses = COALESCE(EXCLUDED.licenses, packages.licenses), + licenses_raw = COALESCE(EXCLUDED.licenses_raw, packages.licenses_raw), + keywords = COALESCE(EXCLUDED.keywords, packages.keywords), + status = COALESCE(EXCLUDED.status, packages.status), + latest_version = COALESCE(EXCLUDED.latest_version, packages.latest_version), + versions_count = COALESCE(EXCLUDED.versions_count, packages.versions_count), + first_release_at = COALESCE(EXCLUDED.first_release_at, packages.first_release_at), + latest_release_at = COALESCE(EXCLUDED.latest_release_at, packages.latest_release_at), + registry_url = COALESCE(EXCLUDED.registry_url, packages.registry_url), + ingestion_source = EXCLUDED.ingestion_source, + last_synced_at = NOW() + RETURNING id, description, homepage, declared_repository_url, repository_url, + licenses, licenses_raw, keywords, status, + latest_version, versions_count, first_release_at, latest_release_at, + registry_url, ingestion_source + ) + SELECT ins.id, + array_remove(ARRAY[ + CASE WHEN o.description IS DISTINCT FROM ins.description THEN 'packages.description' END, + CASE WHEN o.homepage IS DISTINCT FROM ins.homepage THEN 'packages.homepage' END, + CASE WHEN o.declared_repository_url IS DISTINCT FROM ins.declared_repository_url THEN 'packages.declared_repository_url' END, + CASE WHEN o.repository_url IS DISTINCT FROM ins.repository_url THEN 'packages.repository_url' END, + CASE WHEN o.licenses IS DISTINCT FROM ins.licenses THEN 'packages.licenses' END, + CASE WHEN o.licenses_raw IS DISTINCT FROM ins.licenses_raw THEN 'packages.licenses_raw' END, + CASE WHEN o.keywords IS DISTINCT FROM ins.keywords THEN 'packages.keywords' END, + CASE WHEN o.status IS DISTINCT FROM ins.status THEN 'packages.status' END, + CASE WHEN o.latest_version IS DISTINCT FROM ins.latest_version THEN 'packages.latest_version' END, + CASE WHEN o.versions_count IS DISTINCT FROM ins.versions_count THEN 'packages.versions_count' END, + CASE WHEN o.first_release_at IS DISTINCT FROM ins.first_release_at THEN 'packages.first_release_at' END, + CASE WHEN o.latest_release_at IS DISTINCT FROM ins.latest_release_at THEN 'packages.latest_release_at' END, + CASE WHEN o.registry_url IS DISTINCT FROM ins.registry_url THEN 'packages.registry_url' END, + CASE WHEN o.ingestion_source IS DISTINCT FROM ins.ingestion_source THEN 'packages.ingestion_source' END + ], NULL) AS changed_fields + FROM ins LEFT JOIN old o ON true + `, + { + purl: item.purl, + name: item.name, + description: item.description ?? null, + homepage: item.homepage ?? null, + declaredRepositoryUrl: item.declaredRepositoryUrl ?? null, + repositoryUrl: item.repositoryUrl ?? null, + licenses: item.licenses ?? null, + licensesRaw: item.licensesRaw ?? null, + keywords: item.keywords ?? null, + status: item.status ?? null, + latestVersion: item.latestVersion ?? null, + versionsCount: item.versionsCount ?? null, + firstReleaseAt: item.firstReleaseAt ?? null, + latestReleaseAt: item.latestReleaseAt ?? null, + registryUrl: item.registryUrl ?? null, + ingestionSource: item.ingestionSource, + }, + ) + return { id: row.id as number, changedFields: row.changed_fields as string[] } +} + +// ─── Versions bulk upsert ───────────────────────────────────────────────────── + +export async function upsertNuGetVersionsBatch( + qx: QueryExecutor, + versions: IDbNuGetVersionUpsert[], +): Promise { + if (versions.length === 0) return [] + + const packageId = versions[0].packageId + if (versions.some((v) => v.packageId !== packageId)) { + throw new Error('upsertNuGetVersionsBatch: all versions must belong to the same package') + } + + const seen = new Set() + const deduped = versions.filter((v) => { + if (seen.has(v.number)) return false + seen.add(v.number) + return true + }) + + const row: { changed_fields: string[] } = await qx.selectOne( + ` + WITH old AS ( + SELECT number, is_latest, is_prerelease, is_yanked, licenses, download_count, published_at + FROM versions + WHERE package_id = $(packageId)::bigint AND number = ANY($(numbers)::text[]) + ), + ins AS ( + INSERT INTO versions (package_id, ecosystem, namespace, name, number, + published_at, is_latest, is_prerelease, is_yanked, + licenses, download_count, + last_synced_at, created_at) + SELECT + $(packageId)::bigint, 'nuget', NULL, t.name, t.number, + t.published_at::timestamptz, t.is_latest, t.is_prerelease, + t.is_yanked, + CASE WHEN t.license IS NULL THEN NULL ELSE ARRAY[t.license] END, + t.download_count::bigint, + NOW(), NOW() + FROM UNNEST( + $(names)::text[], + $(numbers)::text[], + $(publishedAts)::text[], + $(isLatests)::bool[], + $(isPreleases)::bool[], + $(isYankeds)::bool[], + $(licenses)::text[], + $(downloadCounts)::bigint[] + ) AS t(name, number, published_at, is_latest, is_prerelease, is_yanked, license, download_count) + ON CONFLICT (package_id, number) DO UPDATE SET + is_latest = EXCLUDED.is_latest, + is_prerelease = EXCLUDED.is_prerelease, + is_yanked = COALESCE(EXCLUDED.is_yanked, versions.is_yanked), + licenses = COALESCE(EXCLUDED.licenses, versions.licenses), + download_count = COALESCE(EXCLUDED.download_count, versions.download_count), + published_at = COALESCE(EXCLUDED.published_at, versions.published_at), + last_synced_at = NOW() + RETURNING number, is_latest, is_prerelease, is_yanked, licenses, download_count, published_at + ) + SELECT array_remove(ARRAY[ + CASE WHEN bool_or(o.number IS NULL) THEN 'versions.number' END, + CASE WHEN bool_or(o.is_latest IS DISTINCT FROM ins.is_latest) THEN 'versions.is_latest' END, + CASE WHEN bool_or(o.is_prerelease IS DISTINCT FROM ins.is_prerelease) THEN 'versions.is_prerelease' END, + CASE WHEN bool_or(o.is_yanked IS DISTINCT FROM ins.is_yanked) THEN 'versions.is_yanked' END, + CASE WHEN bool_or(o.licenses IS DISTINCT FROM ins.licenses) THEN 'versions.licenses' END, + CASE WHEN bool_or(o.download_count IS DISTINCT FROM ins.download_count) THEN 'versions.download_count' END, + CASE WHEN bool_or(o.published_at IS DISTINCT FROM ins.published_at) THEN 'versions.published_at' END + ], NULL) AS changed_fields + FROM ins LEFT JOIN old o ON o.number = ins.number + `, + { + packageId, + names: deduped.map((v) => v.name), + numbers: deduped.map((v) => v.number), + publishedAts: deduped.map((v) => (v.publishedAt ? v.publishedAt.toISOString() : null)), + isLatests: deduped.map((v) => v.isLatest), + isPreleases: deduped.map((v) => v.isPrerelease), + isYankeds: deduped.map((v) => v.isYanked ?? null), + licenses: deduped.map((v) => (v.licenses && v.licenses.length > 0 ? v.licenses[0] : null)), + downloadCounts: deduped.map((v) => (v.downloadCount !== null ? Number(v.downloadCount) : null)), + }, + ) + return row.changed_fields +} + +// ─── List packages to sync ──────────────────────────────────────────────────── + +// TODO: once critical packages are defined for NuGet, gate on is_critical when isCritical=true. +const NUGET_WORKER_OUTCOMES = ['nuget-registry', 'nuget_not_found', 'nuget_error'] + +export async function listNuGetPackagesToSync( + qx: QueryExecutor, + options: { limit: number; isCritical: boolean }, +): Promise { + const { limit, isCritical } = options + return qx.select( + ` + SELECT + p.id, + p.purl, + p.name, + p.dependent_count AS "dependentPackagesCount", + p.dependent_repos_count AS "dependentReposCount", + p.latest_version AS "latestVersion" + FROM packages p + WHERE + p.ecosystem = 'nuget' + ${isCritical ? 'AND p.is_critical' : ''} + AND ( + p.ingestion_source IS NULL + OR p.ingestion_source <> ALL($(workerOutcomes)::text[]) + OR p.last_synced_at < NOW() - INTERVAL '1 day' + ) + ORDER BY + p.dependent_count DESC NULLS LAST, + p.id ASC + LIMIT $(limit) + `, + { limit, workerOutcomes: NUGET_WORKER_OUTCOMES }, + ) +} + +// ─── Download snapshot ──────────────────────────────────────────────────────── + +export async function recordNuGetDownloadSnapshot( + qx: QueryExecutor, + params: { + packageId: number + purl: string + totalDownloads: number + today: string // YYYY-MM-DD + }, +): Promise { + const { packageId, purl, totalDownloads, today } = params + const changed: string[] = [] + + const prevRow = await qx.selectOne( + `SELECT total_downloads FROM packages WHERE id = $(packageId)`, + { packageId }, + ) + const prev: number | null = prevRow?.total_downloads ?? null + + if (prev !== null && totalDownloads > prev) { + const delta = totalDownloads - prev + const dailyChanged = await insertDailyDownloads(qx, String(packageId), [ + { day: today, downloads: delta }, + ]) + dailyChanged.forEach((f) => changed.push(f)) + } + + const updated = await qx.result( + `UPDATE packages SET total_downloads = $(totalDownloads) WHERE id = $(packageId) AND total_downloads IS DISTINCT FROM $(totalDownloads)`, + { totalDownloads, packageId }, + ) + if (updated > 0) changed.push('packages.total_downloads') + + const sumRow = await qx.selectOne( + `SELECT COALESCE(SUM(count), 0)::bigint AS total + FROM downloads_daily + WHERE package_id = $(packageId)::bigint + AND date > $(today)::date - INTERVAL '30 days' + AND date <= $(today)::date`, + { packageId, today }, + ) + const monthlySum = Number(sumRow.total) + + const thirtyDaysAgo = new Date(today) + thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 29) + const startDate = thirtyDaysAgo.toISOString().split('T')[0] + + const monthlyChanged = await upsertLast30dDownload(qx, purl, startDate, today, monthlySum, true) + monthlyChanged.forEach((f) => changed.push(f)) + + return changed +} From 35a02b98d950480d69bc9b2a7ec844caa0c6de51 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Thu, 25 Jun 2026 14:31:01 +0100 Subject: [PATCH 2/3] fix: code review fixes Signed-off-by: Mouad BANI --- services/apps/packages_worker/src/config.ts | 6 ++-- .../apps/packages_worker/src/nuget/client.ts | 7 ++++- .../packages_worker/src/nuget/normalize.ts | 31 +++++++------------ .../src/nuget/runNuGetEnrichmentLoop.ts | 6 +++- .../data-access-layer/src/osspckgs/nuget.ts | 27 +++++++++++----- 5 files changed, 44 insertions(+), 33 deletions(-) diff --git a/services/apps/packages_worker/src/config.ts b/services/apps/packages_worker/src/config.ts index c2c7a7875e..e29c3c9bb2 100644 --- a/services/apps/packages_worker/src/config.ts +++ b/services/apps/packages_worker/src/config.ts @@ -72,9 +72,9 @@ export function getGoConfig() { export function getNuGetConfig() { return { - batchSize: parseInt(process.env.NUGET_FETCHER_BATCH_SIZE ?? '500', 10), - concurrency: parseInt(process.env.NUGET_FETCHER_CONCURRENCY ?? '8', 10), - groupDelayMs: parseInt(process.env.NUGET_FETCHER_GROUP_DELAY_MS ?? '250', 10), + batchSize: parseInt(process.env.NUGET_FETCHER_BATCH_SIZE ?? '1000', 10), + concurrency: parseInt(process.env.NUGET_FETCHER_CONCURRENCY ?? '20', 10), + groupDelayMs: parseInt(process.env.NUGET_FETCHER_GROUP_DELAY_MS ?? '0', 10), isCritical: (process.env.NUGET_FETCHER_IS_CRITICAL ?? 'false') === 'true', userAgent: process.env.NUGET_USER_AGENT, } diff --git a/services/apps/packages_worker/src/nuget/client.ts b/services/apps/packages_worker/src/nuget/client.ts index 90afef67f0..95d2cffc62 100644 --- a/services/apps/packages_worker/src/nuget/client.ts +++ b/services/apps/packages_worker/src/nuget/client.ts @@ -1,6 +1,11 @@ import axios from 'axios' -import { NuGetFetchError, NuGetRegistrationIndex, NuGetRegistrationPage, NuGetSearchItem } from './types' +import { + NuGetFetchError, + NuGetRegistrationIndex, + NuGetRegistrationPage, + NuGetSearchItem, +} from './types' const SERVICE_INDEX_URL = 'https://api.nuget.org/v3/index.json' diff --git a/services/apps/packages_worker/src/nuget/normalize.ts b/services/apps/packages_worker/src/nuget/normalize.ts index 0126f421c0..7a66ad0ad9 100644 --- a/services/apps/packages_worker/src/nuget/normalize.ts +++ b/services/apps/packages_worker/src/nuget/normalize.ts @@ -65,22 +65,16 @@ export function normalizeNuGetPackage( ) const description = - searchResult?.description || - searchResult?.summary || - latestListedEntry?.description || - null - - const homepage = - searchResult?.projectUrl || latestListedEntry?.projectUrl || null - - const declaredRepositoryUrl = - latestEntry4License?.repository?.url - ? latestEntry4License.repository.url - : null + searchResult?.description || searchResult?.summary || latestListedEntry?.description || null + + const homepage = searchResult?.projectUrl || latestListedEntry?.projectUrl || null + + const declaredRepositoryUrl = latestEntry4License?.repository?.url + ? latestEntry4License.repository.url + : null const repositoryUrl = normalizeRepoUrl(declaredRepositoryUrl) - const keywords = - searchResult?.tags && searchResult.tags.length > 0 ? searchResult.tags : null + const keywords = searchResult?.tags && searchResult.tags.length > 0 ? searchResult.tags : null let status: 'active' | 'deprecated' | 'unpublished' if (listedEntries.length === 0) { @@ -93,23 +87,20 @@ export function normalizeNuGetPackage( const publishedDates = allEntries .filter((e) => e.published) - .map((e) => new Date(e.published!)) + .map((e) => new Date(e.published as string)) .filter((d) => !isNaN(d.getTime())) .sort((a, b) => a.getTime() - b.getTime()) const firstReleaseAt = publishedDates.length > 0 ? publishedDates[0] : null const latestEntry4Date = latestListedEntry ?? latestEntry - const latestReleaseAt = - latestEntry4Date?.published ? new Date(latestEntry4Date.published) : null + const latestReleaseAt = latestEntry4Date?.published ? new Date(latestEntry4Date.published) : null const totalDownloads = searchResult?.totalDownloads ?? 0 const owners = searchResult?.owners ?? [] - const authors = parseAuthors( - searchResult?.authors ?? latestEntry4License?.authors, - ) + const authors = parseAuthors(searchResult?.authors ?? latestEntry4License?.authors) const searchVersionMap = new Map() if (searchResult?.versions) { diff --git a/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts b/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts index dba2043595..478bc4bb2b 100644 --- a/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts +++ b/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts @@ -193,7 +193,11 @@ async function processPackage( return 'processed' } -export async function processBatch(qx: QueryExecutor, config: NuGetConfig, today: string): Promise { +export async function processBatch( + qx: QueryExecutor, + config: NuGetConfig, + today: string, +): Promise { const packages = await listNuGetPackagesToSync(qx, { limit: config.batchSize, isCritical: config.isCritical, diff --git a/services/libs/data-access-layer/src/osspckgs/nuget.ts b/services/libs/data-access-layer/src/osspckgs/nuget.ts index fba26a06fa..170a0ee53d 100644 --- a/services/libs/data-access-layer/src/osspckgs/nuget.ts +++ b/services/libs/data-access-layer/src/osspckgs/nuget.ts @@ -1,6 +1,6 @@ -import { QueryExecutor } from '../queryExecutor' import { insertDailyDownloads } from '../packages/downloadsDaily' import { upsertLast30dDownload } from '../packages/downloadsLast30d' +import { QueryExecutor } from '../queryExecutor' // ─── Types ──────────────────────────────────────────────────────────────────── @@ -30,8 +30,8 @@ export type IDbNuGetPackageUpsert = { latestReleaseAt: Date | null registryUrl: string | null ingestionSource: string - dependentPackagesCount?: number | null - dependentReposCount?: number | null + dependentPackagesCount: number | null + dependentReposCount: number | null } export type IDbNuGetVersionUpsert = { @@ -67,13 +67,15 @@ export async function upsertNuGetPackage( description, homepage, declared_repository_url, repository_url, licenses, licenses_raw, keywords, status, latest_version, versions_count, first_release_at, latest_release_at, - registry_url, ingestion_source, last_synced_at, created_at + registry_url, ingestion_source, dependent_count, dependent_repos_count, + last_synced_at, created_at ) VALUES ( $(purl), 'nuget', NULL, $(name), $(description), $(homepage), $(declaredRepositoryUrl), $(repositoryUrl), $(licenses)::text[], $(licensesRaw), $(keywords)::text[], $(status), $(latestVersion), $(versionsCount), $(firstReleaseAt), $(latestReleaseAt), - $(registryUrl), $(ingestionSource), NOW(), NOW() + $(registryUrl), $(ingestionSource), $(dependentPackagesCount), $(dependentReposCount), + NOW(), NOW() ) ON CONFLICT (purl) DO UPDATE SET description = COALESCE(EXCLUDED.description, packages.description), @@ -90,6 +92,8 @@ export async function upsertNuGetPackage( latest_release_at = COALESCE(EXCLUDED.latest_release_at, packages.latest_release_at), registry_url = COALESCE(EXCLUDED.registry_url, packages.registry_url), ingestion_source = EXCLUDED.ingestion_source, + dependent_count = COALESCE(EXCLUDED.dependent_count, packages.dependent_count), + dependent_repos_count = COALESCE(EXCLUDED.dependent_repos_count, packages.dependent_repos_count), last_synced_at = NOW() RETURNING id, description, homepage, declared_repository_url, repository_url, licenses, licenses_raw, keywords, status, @@ -132,6 +136,8 @@ export async function upsertNuGetPackage( latestReleaseAt: item.latestReleaseAt ?? null, registryUrl: item.registryUrl ?? null, ingestionSource: item.ingestionSource, + dependentPackagesCount: item.dependentPackagesCount ?? null, + dependentReposCount: item.dependentReposCount ?? null, }, ) return { id: row.id as number, changedFields: row.changed_fields as string[] } @@ -216,7 +222,9 @@ export async function upsertNuGetVersionsBatch( isPreleases: deduped.map((v) => v.isPrerelease), isYankeds: deduped.map((v) => v.isYanked ?? null), licenses: deduped.map((v) => (v.licenses && v.licenses.length > 0 ? v.licenses[0] : null)), - downloadCounts: deduped.map((v) => (v.downloadCount !== null ? Number(v.downloadCount) : null)), + downloadCounts: deduped.map((v) => + v.downloadCount !== null ? Number(v.downloadCount) : null, + ), }, ) return row.changed_fields @@ -277,7 +285,8 @@ export async function recordNuGetDownloadSnapshot( `SELECT total_downloads FROM packages WHERE id = $(packageId)`, { packageId }, ) - const prev: number | null = prevRow?.total_downloads ?? null + const prev: number | null = + prevRow?.total_downloads != null ? Number(prevRow.total_downloads) : null if (prev !== null && totalDownloads > prev) { const delta = totalDownloads - prev @@ -288,7 +297,9 @@ export async function recordNuGetDownloadSnapshot( } const updated = await qx.result( - `UPDATE packages SET total_downloads = $(totalDownloads) WHERE id = $(packageId) AND total_downloads IS DISTINCT FROM $(totalDownloads)`, + `UPDATE packages SET total_downloads = $(totalDownloads) + WHERE id = $(packageId) + AND (total_downloads IS NULL OR total_downloads < $(totalDownloads))`, { totalDownloads, packageId }, ) if (updated > 0) changed.push('packages.total_downloads') From 8de3017a331d04665d2763ac3f0a84c8bf110f30 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Thu, 25 Jun 2026 14:41:48 +0100 Subject: [PATCH 3/3] fix: remove unecessary user-agent Signed-off-by: Mouad BANI --- services/apps/packages_worker/src/config.ts | 1 - .../apps/packages_worker/src/nuget/client.ts | 27 +++++-------------- .../src/nuget/runNuGetEnrichmentLoop.ts | 4 +-- 3 files changed, 9 insertions(+), 23 deletions(-) diff --git a/services/apps/packages_worker/src/config.ts b/services/apps/packages_worker/src/config.ts index e29c3c9bb2..ab5de33808 100644 --- a/services/apps/packages_worker/src/config.ts +++ b/services/apps/packages_worker/src/config.ts @@ -76,7 +76,6 @@ export function getNuGetConfig() { concurrency: parseInt(process.env.NUGET_FETCHER_CONCURRENCY ?? '20', 10), groupDelayMs: parseInt(process.env.NUGET_FETCHER_GROUP_DELAY_MS ?? '0', 10), isCritical: (process.env.NUGET_FETCHER_IS_CRITICAL ?? 'false') === 'true', - userAgent: process.env.NUGET_USER_AGENT, } } diff --git a/services/apps/packages_worker/src/nuget/client.ts b/services/apps/packages_worker/src/nuget/client.ts index 95d2cffc62..55aaadbdef 100644 --- a/services/apps/packages_worker/src/nuget/client.ts +++ b/services/apps/packages_worker/src/nuget/client.ts @@ -25,11 +25,10 @@ interface ResolvedEndpoints { let cachedEndpoints: ResolvedEndpoints | null = null -async function resolveEndpoints(userAgent?: string): Promise { +async function resolveEndpoints(): Promise { if (cachedEndpoints) return cachedEndpoints const resp = await axios.get(SERVICE_INDEX_URL, { - headers: userAgent ? { 'User-Agent': userAgent } : {}, timeout: 10000, }) @@ -68,9 +67,8 @@ function classifyError(err: unknown): NuGetFetchError | null { export async function fetchSearch( packageId: string, - userAgent?: string, ): Promise { - const { searchBaseUrl } = await resolveEndpoints(userAgent) + const { searchBaseUrl } = await resolveEndpoints() const lowerPackageId = packageId.toLowerCase() try { @@ -82,7 +80,6 @@ export async function fetchSearch( take: 20, }, headers: { - ...(userAgent ? { 'User-Agent': userAgent } : {}), 'Accept-Encoding': 'gzip', }, timeout: 15000, @@ -100,15 +97,9 @@ export async function fetchSearch( } } -async function fetchRegistrationPage( - pageId: string, - userAgent?: string, -): Promise { +async function fetchRegistrationPage(pageId: string): Promise { const resp = await axios.get(pageId, { - headers: { - ...(userAgent ? { 'User-Agent': userAgent } : {}), - 'Accept-Encoding': 'gzip', - }, + headers: { 'Accept-Encoding': 'gzip' }, timeout: 15000, }) return resp.data @@ -116,19 +107,15 @@ async function fetchRegistrationPage( export async function fetchRegistration( packageId: string, - userAgent?: string, ): Promise { - const { registrationBaseUrl } = await resolveEndpoints(userAgent) + const { registrationBaseUrl } = await resolveEndpoints() const lowerId = packageId.toLowerCase() try { const resp = await axios.get( `${registrationBaseUrl}${lowerId}/index.json`, { - headers: { - ...(userAgent ? { 'User-Agent': userAgent } : {}), - 'Accept-Encoding': 'gzip', - }, + headers: { 'Accept-Encoding': 'gzip' }, timeout: 15000, }, ) @@ -139,7 +126,7 @@ export async function fetchRegistration( const page = index.items[i] if (!page.items) { try { - const fullPage = await fetchRegistrationPage(page['@id'], userAgent) + const fullPage = await fetchRegistrationPage(page['@id']) index.items[i] = { ...page, items: fullPage.items ?? [] } } catch { index.items[i] = { ...page, items: [] } diff --git a/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts b/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts index 478bc4bb2b..e334f2bede 100644 --- a/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts +++ b/services/apps/packages_worker/src/nuget/runNuGetEnrichmentLoop.ts @@ -56,8 +56,8 @@ async function processPackage( const packageId = pkg.name const [searchResult, registrationResult] = await Promise.all([ - fetchSearch(packageId, config.userAgent), - fetchRegistration(packageId, config.userAgent), + fetchSearch(packageId), + fetchRegistration(packageId), ]) if (isNuGetFetchError(registrationResult)) {