Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE packages ADD COLUMN IF NOT EXISTS total_downloads bigint;
2 changes: 1 addition & 1 deletion scripts/builders/packages.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DOCKERFILE="./services/docker/Dockerfile.packages"
CONTEXT="../"
REPO="sjc.ocir.io/axbydjxa5zuh/packages"
SERVICES="github-repos-enricher bq-dataset-ingest npm-worker maven-worker osv-worker dockerhub-sync cargo-worker go-worker"
SERVICES="github-repos-enricher bq-dataset-ingest npm-worker maven-worker osv-worker dockerhub-sync cargo-worker go-worker nuget-worker"
65 changes: 65 additions & 0 deletions scripts/services/nuget-worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
version: '3.1'

x-env-args: &env-args
DOCKER_BUILDKIT: 1
NODE_ENV: docker
SERVICE: nuget-worker
SHELL: /bin/sh
SUPPRESS_NO_CONFIG_WARNING: 'true'
CROWD_TEMPORAL_TASKQUEUE: nuget-worker

services:
nuget-worker:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.packages
command: 'pnpm run start:nuget-worker'
working_dir: /usr/crowd/app/services/apps/packages_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
restart: always
networks:
- crowd-bridge

nuget-worker-dev:
build:
context: ../../
dockerfile: ./scripts/services/docker/Dockerfile.packages
command: 'pnpm run dev:nuget-worker'
working_dir: /usr/crowd/app/services/apps/packages_worker
env_file:
- ../../backend/.env.dist.local
- ../../backend/.env.dist.composed
- ../../backend/.env.override.local
- ../../backend/.env.override.composed
environment:
<<: *env-args
hostname: nuget-worker
networks:
- crowd-bridge
volumes:
- ../../services/libs/audit-logs/src:/usr/crowd/app/services/libs/audit-logs/src
- ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src
- ../../services/libs/common_services/src:/usr/crowd/app/services/libs/common_services/src
- ../../services/libs/data-access-layer/src:/usr/crowd/app/services/libs/data-access-layer/src
- ../../services/libs/database/src:/usr/crowd/app/services/libs/database/src
- ../../services/libs/integrations/src:/usr/crowd/app/services/libs/integrations/src
- ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src
- ../../services/libs/nango/src:/usr/crowd/app/services/libs/nango/src
- ../../services/libs/opensearch/src:/usr/crowd/app/services/libs/opensearch/src
- ../../services/libs/queue/src:/usr/crowd/app/services/libs/queue/src
- ../../services/libs/redis/src:/usr/crowd/app/services/libs/redis/src
- ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src
- ../../services/libs/telemetry/src:/usr/crowd/app/services/libs/telemetry/src
- ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src
- ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src
- ../../services/apps/packages_worker/src:/usr/crowd/app/services/apps/packages_worker/src

networks:
crowd-bridge:
external: true
3 changes: 3 additions & 0 deletions services/apps/packages_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
"start:go-worker": "CROWD_TEMPORAL_TASKQUEUE=go-worker SERVICE=go-worker tsx src/bin/go-worker.ts",
"dev:go-worker": "CROWD_TEMPORAL_TASKQUEUE=go-worker SERVICE=go-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9241 src/bin/go-worker.ts",
"dev:go-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=go-worker SERVICE=go-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9241 src/bin/go-worker.ts",
"start:nuget-worker": "CROWD_TEMPORAL_TASKQUEUE=nuget-worker SERVICE=nuget-worker tsx src/bin/nuget-worker.ts",
"dev:nuget-worker": "CROWD_TEMPORAL_TASKQUEUE=nuget-worker SERVICE=nuget-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9242 src/bin/nuget-worker.ts",
"dev:nuget-worker:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=nuget-worker SERVICE=nuget-worker LOG_LEVEL=trace nodemon --watch src --watch ../../libs --ext ts --exec tsx --inspect=0.0.0.0:9242 src/bin/nuget-worker.ts",
"backfill:maven": "SERVICE=maven tsx src/bin/maven-backfill.ts",
"backfill:maven:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=maven LOG_LEVEL=info tsx src/bin/maven-backfill.ts",
"backfill:stewardship": "SERVICE=stewardship-backfill tsx src/bin/stewardship-backfill.ts",
Expand Down
1 change: 1 addition & 0 deletions services/apps/packages_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ export {
cargoCleanup,
} from './cargo/activities'
export { enrichGoVersionsBatch, enrichGoStatusBatch } from './go/activities'
export { processNuGetBatch } from './nuget/activities'
8 changes: 8 additions & 0 deletions services/apps/packages_worker/src/bin/nuget-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { scheduleNuGetIngestion } from '../nuget/schedule'
import { svc } from '../service'

setImmediate(async () => {
await svc.init()
await scheduleNuGetIngestion()
await svc.start()
})
9 changes: 9 additions & 0 deletions services/apps/packages_worker/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ export function getGoConfig() {
}
}

export function getNuGetConfig() {
return {
batchSize: parseInt(process.env.NUGET_FETCHER_BATCH_SIZE ?? '1000', 10),
concurrency: parseInt(process.env.NUGET_FETCHER_CONCURRENCY ?? '20', 10),
groupDelayMs: parseInt(process.env.NUGET_FETCHER_GROUP_DELAY_MS ?? '0', 10),
isCritical: (process.env.NUGET_FETCHER_IS_CRITICAL ?? 'false') === 'true',
}
}
Comment on lines +73 to +80

export function getDockerhubConfig() {
return {
hubBaseUrl: requireEnv('DOCKERHUB_API_BASE_URL'),
Expand Down
19 changes: 19 additions & 0 deletions services/apps/packages_worker/src/nuget/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { getServiceChildLogger } from '@crowd/logging'

import { getNuGetConfig } from '../config'
import { getPackagesDb } from '../db'

import { processBatch } from './runNuGetEnrichmentLoop'
import { BatchResult } from './types'

const log = getServiceChildLogger('nuget-activity')

export async function processNuGetBatch(): Promise<BatchResult> {
const config = getNuGetConfig()
const qx = await getPackagesDb()

const today = new Date().toISOString().split('T')[0]
const result = await processBatch(qx, config, today)
log.info({ ...result }, 'NuGet batch complete')
return result
}
143 changes: 143 additions & 0 deletions services/apps/packages_worker/src/nuget/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import axios from 'axios'

import {
NuGetFetchError,
NuGetRegistrationIndex,
NuGetRegistrationPage,
NuGetSearchItem,
} from './types'

const SERVICE_INDEX_URL = 'https://api.nuget.org/v3/index.json'

interface ServiceIndexResource {
'@id': string
'@type': string
}

interface ServiceIndex {
resources: ServiceIndexResource[]
}

interface ResolvedEndpoints {
searchBaseUrl: string
registrationBaseUrl: string
}

let cachedEndpoints: ResolvedEndpoints | null = null

async function resolveEndpoints(): Promise<ResolvedEndpoints> {
if (cachedEndpoints) return cachedEndpoints

const resp = await axios.get<ServiceIndex>(SERVICE_INDEX_URL, {
timeout: 10000,
})

const resources = resp.data.resources

const findFirst = (...types: string[]): string | undefined => {
for (const t of types) {
const found = resources.find((r) => r['@type'] === t)
if (found) return found['@id']
}
return undefined
}

const searchBaseUrl = findFirst('SearchQueryService/3.5.0', 'SearchQueryService')
const registrationBaseUrl = findFirst(
'RegistrationsBaseUrl/3.6.0',
'RegistrationsBaseUrl/3.5.0',
'RegistrationsBaseUrl',
)

if (!searchBaseUrl || !registrationBaseUrl) {
throw new Error('NuGet service index missing required endpoints')
}

cachedEndpoints = { searchBaseUrl, registrationBaseUrl }
return cachedEndpoints
}

function classifyError(err: unknown): NuGetFetchError | null {
if (!axios.isAxiosError(err)) return null
const status = err.response?.status
if (status === 404) return { kind: 'NOT_FOUND', status, message: err.message }
if (status === 429) return { kind: 'RATE_LIMIT', status, message: err.message }
return null
}

export async function fetchSearch(

Check failure on line 68 in services/apps/packages_worker/src/nuget/client.ts

View workflow job for this annotation

GitHub Actions / lint-format-services

Replace `⏎··packageId:·string,⏎` with `packageId:·string`
packageId: string,
): Promise<NuGetSearchItem | NuGetFetchError> {
const { searchBaseUrl } = await resolveEndpoints()
const lowerPackageId = packageId.toLowerCase()

try {
const resp = await axios.get<{ totalHits: number; data: NuGetSearchItem[] }>(searchBaseUrl, {
params: {
q: `packageid:${packageId}`,
prerelease: 'true',
semVerLevel: '2.0.0',
take: 20,
},
headers: {
'Accept-Encoding': 'gzip',
},
timeout: 15000,
Comment on lines +82 to +85
})

const match = resp.data.data.find((item) => item.id.toLowerCase() === lowerPackageId)
if (!match) {
return { kind: 'NOT_FOUND', message: `Package ${packageId} not found in search results` }
}
return match
} catch (err) {
const classified = classifyError(err)
if (classified) return classified
throw err
}
}

async function fetchRegistrationPage(pageId: string): Promise<NuGetRegistrationPage> {
const resp = await axios.get<NuGetRegistrationPage>(pageId, {
headers: { 'Accept-Encoding': 'gzip' },
timeout: 15000,
})
Comment on lines +101 to +104
return resp.data
}

export async function fetchRegistration(
packageId: string,
): Promise<NuGetRegistrationIndex | NuGetFetchError> {
const { registrationBaseUrl } = await resolveEndpoints()
const lowerId = packageId.toLowerCase()

try {
const resp = await axios.get<NuGetRegistrationIndex>(
`${registrationBaseUrl}${lowerId}/index.json`,
{
headers: { 'Accept-Encoding': 'gzip' },
timeout: 15000,
},
Comment on lines +117 to +120
)

const index = resp.data

for (let i = 0; i < index.items.length; i++) {
const page = index.items[i]
if (!page.items) {
try {
const fullPage = await fetchRegistrationPage(page['@id'])
index.items[i] = { ...page, items: fullPage.items ?? [] }
} catch {
index.items[i] = { ...page, items: [] }
}
}
}

return index
} catch (err) {
const classified = classifyError(err)
if (classified) return classified
throw err
}
}
Loading
Loading