diff --git a/docs/API.md b/docs/API.md index afbfb3276..862927df5 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1448,20 +1448,21 @@ starts a free compute job and returns jobId if succesfull #### Parameters -| name | type | required | description | -| ----------------- | ------ | -------- | ----------------------------------------------------------------------------- | -| command | string | v | command name | -| node | string | | if not present it means current node | -| consumerAddress | string | v | consumer address | -| signature | string | v | signature (msg=String(nonce) ) | -| nonce | string | v | nonce for the request | -| datasets | object | | list of ComputeAsset to be used as inputs | -| algorithm | object | | ComputeAlgorithm definition | -| environment | string | v | compute environment to use | -| resources | object | | optional list of required resources | -| metadata | object | | optional metadata for the job, data provided by the user | -| additionalViewers | object | | optional array of addresses that are allowed to fetch the result | -| queueMaxWaitTime | number | | optional max time in seconds a job can wait in the queue before being started | +| name | type | required | description | +| --------------------------- | ------ | -------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| command | string | v | command name | +| node | string | | if not present it means current node | +| consumerAddress | string | v | consumer address | +| signature | string | v | signature (msg=String(nonce) ) | +| nonce | string | v | nonce for the request | +| datasets | object | | list of ComputeAsset to be used as inputs | +| algorithm | object | | ComputeAlgorithm definition | +| environment | string | v | compute environment to use | +| resources | object | | optional list of required resources | +| metadata | object | | optional metadata for the job, data provided by the user | +| additionalViewers | object | | optional array of addresses that are allowed to fetch the result | +| queueMaxWaitTime | number | | optional max time in seconds a job can wait in the queue before being started | +| encryptedDockerRegistryAuth | string | | Ecies encrypted docker auth schema for image (see [Private Docker Registries with Per-Job Authentication](../env.md#private-docker-registries-with-per-job-authentication)) | #### Request diff --git a/docs/env.md b/docs/env.md index 520502578..2b74ad8c9 100644 --- a/docs/env.md +++ b/docs/env.md @@ -197,6 +197,7 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of - **socketPath**: Path to the Docker socket (e.g., docker.sock). - **imageRetentionDays** - how long docker images are kept, in days. Default: 7 - **imageCleanupInterval** - how often to run cleanup for docker images, in seconds. Min: 3600 (1hour), Default: 86400 (24 hours) +- **paymentClaimInterval** - how often to run payment claiming, in seconds. Default: 3600 (1 hour) - **storageExpiry**: Amount of seconds for storage expiry.(Mandatory) - **maxJobDuration**: Maximum duration in seconds for a job.(Mandatory) - **minJobDuration**: Minimum duration in seconds for a job.(Mandatory) @@ -226,3 +227,205 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of - **total**: Total number of the resource available. - **min**: Minimum number of the resource needed for a job. - **max**: Maximum number of the resource for a job. + +### Docker Registry Authentication + +- `DOCKER_REGISTRY_AUTHS`: JSON object mapping Docker registry URLs to authentication credentials. Used for accessing private Docker/OCI registries when validating and pulling Docker images. Each registry entry must provide either `username`+`password` or `auth`. Example: + +```json +{ + "https://registry-1.docker.io": { + "username": "myuser", + "password": "mypassword" + }, + "https://ghcr.io": { + "username": "myuser", + "password": "ghp_..." + }, + "https://registry.gitlab.com": { + "auth": "glpat-..." + } +} +``` + +**Configuration Options:** + +- **Registry URL** (key): The full registry URL including protocol (e.g., `https://registry-1.docker.io`, `https://ghcr.io`, `https://registry.gitlab.com`) +- **username** (optional): Username for registry authentication. Required if using password-based auth. +- **password** (optional): Password or personal access token for registry authentication. Required if using username-based auth. +- **auth** (optional): Authentication token (alternative to username+password). Required if not using username+password. + +**Notes:** + +- For Docker Hub (`registry-1.docker.io`), you can use your Docker Hub username and password, or a personal access token (PAT) as the password. +- For GitHub Container Registry (GHCR), use your GitHub username with a personal access token (PAT) as the password, or use a token directly. +- For GitLab Container Registry, use a personal access token (PAT) or deploy token. +- The registry URL must match exactly (including protocol) with the registry used in the Docker image reference. +- If no credentials are configured for a registry, the node will attempt unauthenticated access (works for public images only). + +--- + +## Private Docker Registries with Per-Job Authentication + +In addition to node-level registry authentication via `DOCKER_REGISTRY_AUTHS`, you can provide encrypted Docker registry authentication credentials on a per-job basis. This allows different users to use different private registries or credentials for their compute jobs. + +### Overview + +The `encryptedDockerRegistryAuth` parameter allows you to securely provide Docker registry credentials that are: + +- Encrypted using ECIES (Elliptic Curve Integrated Encryption Scheme) with the node's public key +- Validated to ensure proper format (either `auth` string OR `username`+`password`) +- Used only for the specific compute job, overriding node-level configuration if provided + +### Encryption Format + +The `encryptedDockerRegistryAuth` must be: + +1. A JSON object matching the Docker registry auth schema (see below) +2. Encrypted using ECIES with the node's public key +3. Hex-encoded as a string + +**Auth Schema Format:** + +The decrypted JSON must follow this structure: + +```json +{ + "username": "myuser", + "password": "mypassword" +} +``` + +OR + +```json +{ + "auth": "base64-encoded-username:password" +} +``` + +OR (all fields present) + +```json +{ + "username": "myuser", + "password": "mypassword", + "auth": "base64-encoded-username:password" +} +``` + +**Validation Rules:** + +- Either `auth` string must be provided (non-empty), OR +- Both `username` AND `password` must be provided (both non-empty) +- Empty strings are not accepted + +### Usage Examples + +#### 1. Paid Compute Start (`POST /api/services/compute`) + +```json +{ + "command": "startCompute", + "consumerAddress": "0x...", + "signature": "...", + "nonce": "123", + "environment": "0x...", + "algorithm": { + "meta": { + "container": { + "image": "registry.example.com/myorg/myimage:latest" + } + } + }, + "datasets": [], + "payment": { ... }, + "encryptedDockerRegistryAuth": "0xdeadbeef..." // ECIES encrypted hex string +} +``` + +#### 2. Free Compute Start (`POST /api/services/freeCompute`) + +```json +{ + "command": "freeStartCompute", + "consumerAddress": "0x...", + "signature": "...", + "nonce": "123", + "environment": "0x...", + "algorithm": { + "meta": { + "container": { + "image": "ghcr.io/myorg/myimage:latest" + } + } + }, + "datasets": [], + "encryptedDockerRegistryAuth": "0xdeadbeef..." // ECIES encrypted hex string +} +``` + +#### 3. Initialize Compute + +The `initialize` command accepts `encryptedDockerRegistryAuth` as part of the command payload, as it validates the image + +```json +{ + "command": "initialize", + "datasets": [...], + "algorithm": { + "meta": { + "container": { + "image": "registry.gitlab.com/myorg/myimage:latest" + } + } + }, + "environment": "0x...", + "payment": { ... }, + "consumerAddress": "0x...", + "maxJobDuration": 3600, + "encryptedDockerRegistryAuth": "0xdeadbeef..." // ECIES encrypted hex string +} +``` + +### Encryption Process + +To create `encryptedDockerRegistryAuth`, you need to: + +1. **Prepare the auth JSON object:** + + ```json + { + "username": "myuser", + "password": "mypassword" + } + ``` + +2. **Get the node's public key** (available via the node's API or P2P interface) + +3. **Encrypt the JSON string** using ECIES with the node's public key + +4. **Hex-encode the encrypted result** + +### Behavior + +- **Priority**: If `encryptedDockerRegistryAuth` is provided, it takes precedence over node-level `DOCKER_REGISTRY_AUTHS` configuration for that specific job +- **Validation**: The encrypted auth is decrypted and validated before the job starts. Invalid formats will result in an error +- **Scope**: The credentials are used for: + - Validating the Docker image exists (during initialize) + - Pulling the Docker image (during job execution) +- **Security**: Credentials are encrypted and only decrypted by the node using its private key + +### Error Handling + +If `encryptedDockerRegistryAuth` is invalid, you'll receive an error: + +- **Decryption failure**: `Invalid encryptedDockerRegistryAuth: failed to parse JSON - [error message]` +- **Schema validation failure**: `Invalid encryptedDockerRegistryAuth: Either 'auth' must be provided, or both 'username' and 'password' must be provided` + +### Notes + +- The `encryptedDockerRegistryAuth` parameter is optional. If not provided, the node will use `DOCKER_REGISTRY_AUTHS` configuration or attempt unauthenticated access +- The registry URL in the Docker image reference must match the registry you're authenticating to +- For Docker Hub, use `registry-1.docker.io` as the registry URL +- Credentials are stored encrypted in the job record and decrypted only when needed for image operations diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 6a6213a2e..503813268 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -144,6 +144,7 @@ export interface C2DDockerConfig { access: ComputeAccessList imageRetentionDays?: number // Default: 7 days imageCleanupInterval?: number // Default: 86400 seconds (24 hours) + paymentClaimInterval?: number // Default: 3600 seconds (1 hours) } export type ComputeResultType = @@ -264,6 +265,7 @@ export interface DBComputeJob extends ComputeJob { metadata?: DBComputeJobMetadata additionalViewers?: string[] // addresses of additional addresses that can get results algoDuration: number // duration of the job in seconds + encryptedDockerRegistryAuth?: string } // make sure we keep them both in sync @@ -311,7 +313,9 @@ export enum C2DStatusNumber { // eslint-disable-next-line no-unused-vars ResultsUploadFailed = 62, // eslint-disable-next-line no-unused-vars - JobFinished = 70 + JobFinished = 70, + // eslint-disable-next-line no-unused-vars + JobSettle = 71 } export enum C2DStatusText { // eslint-disable-next-line no-unused-vars @@ -357,5 +361,7 @@ export enum C2DStatusText { // eslint-disable-next-line no-unused-vars ResultsUploadFailed = 'Failed to upload results to storage', // eslint-disable-next-line no-unused-vars - JobFinished = 'Job finished' + JobFinished = 'Job finished', + // eslint-disable-next-line no-unused-vars + JobSettle = 'Job settling' } diff --git a/src/@types/Escrow.ts b/src/@types/Escrow.ts index 9e9aa6b16..7f9f25575 100644 --- a/src/@types/Escrow.ts +++ b/src/@types/Escrow.ts @@ -10,7 +10,6 @@ export interface EscrowAuthorization { export interface EscrowLock { jobId: BigInt payer: string - payee: string amount: BigInt expiry: BigInt token: string diff --git a/src/@types/OceanNode.ts b/src/@types/OceanNode.ts index 9cda85acb..15b616dd4 100644 --- a/src/@types/OceanNode.ts +++ b/src/@types/OceanNode.ts @@ -93,8 +93,18 @@ export interface AccessListContract { [chainId: string]: string[] } +export interface dockerRegistryAuth { + username?: string + password?: string + auth?: string +} +export interface dockerRegistrysAuth { + [registry: string]: dockerRegistryAuth +} + export interface OceanNodeConfig { dockerComputeEnvironments: C2DDockerConfig[] + dockerRegistrysAuth: dockerRegistrysAuth authorizedDecrypters: string[] authorizedDecryptersList: AccessListContract | null allowedValidators: string[] diff --git a/src/@types/commands.ts b/src/@types/commands.ts index 4fbe633cc..cf6fc3953 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -207,6 +207,7 @@ export interface ComputeInitializeCommand extends Command { maxJobDuration: number policyServer?: any // object to pass to policy server queueMaxWaitTime?: number // max time in seconds a job can wait in the queue before being started + encryptedDockerRegistryAuth?: string } export interface FreeComputeStartCommand extends Command { @@ -223,6 +224,7 @@ export interface FreeComputeStartCommand extends Command { metadata?: DBComputeJobMetadata additionalViewers?: string[] // addresses of additional addresses that can get results queueMaxWaitTime?: number // max time in seconds a job can wait in the queue before being started + encryptedDockerRegistryAuth?: string } export interface PaidComputeStartCommand extends FreeComputeStartCommand { payment: ComputePayment diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 44d51f8e5..0412e369b 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -242,8 +242,24 @@ export class OceanP2P extends EventEmitter { shouldAnnounce(addr: any) { try { const maddr = multiaddr(addr) - // always filter loopback + + const protos = maddr.getComponents() const addressString = maddr.nodeAddress().address + if ( + protos.some( + (entry) => + entry.name === 'dns' || + entry.name === 'dns4' || + entry.name === 'dns6' || + entry.name === 'dnsaddr' + ) + ) { + if (addressString === 'localhost' || addressString === '127.0.0.1') { + return false + } + + return true + } if (!ipaddr.isValid(addressString)) { return false diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 13e49a6a4..b6cb3e1e5 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -21,22 +21,30 @@ import { C2DClusterType } from '../../@types/C2D/C2D.js' import { C2DDatabase } from '../database/C2DDatabase.js' import { Escrow } from '../core/utils/escrow.js' import { KeyManager } from '../KeyManager/index.js' - +import { dockerRegistryAuth, dockerRegistrysAuth } from '../../@types/OceanNode.js' +import { ValidateParams } from '../httpRoutes/validateCommands.js' +import { EncryptMethod } from '../../@types/fileObject.js' +import { CORE_LOGGER } from '../../utils/logging/common.js' +import { DockerRegistryAuthSchema } from '../../utils/config/schemas.js' export abstract class C2DEngine { private clusterConfig: C2DClusterInfo public db: C2DDatabase public escrow: Escrow public keyManager: KeyManager + public dockerRegistryAuths: dockerRegistrysAuth + public constructor( cluster: C2DClusterInfo, db: C2DDatabase, escrow: Escrow, - keyManager: KeyManager + keyManager: KeyManager, + dockerRegistryAuths: dockerRegistrysAuth ) { this.clusterConfig = cluster this.db = db this.escrow = escrow this.keyManager = keyManager + this.dockerRegistryAuths = dockerRegistryAuths } getKeyManager(): KeyManager { @@ -66,6 +74,13 @@ export abstract class C2DEngine { return null } + // eslint-disable-next-line require-await + public abstract checkDockerImage( + image: string, + encryptedDockerRegistryAuth?: string, + platform?: any + ): Promise + public abstract startComputeJob( assets: ComputeAsset[], algorithm: ComputeAlgorithm, @@ -78,7 +93,8 @@ export abstract class C2DEngine { jobId: string, metadata?: DBComputeJobMetadata, additionalViewers?: string[], - queueMaxWaitTime?: number + queueMaxWaitTime?: number, + encryptedDockerRegistryAuth?: string ): Promise public abstract stopComputeJob( @@ -523,4 +539,60 @@ export abstract class C2DEngine { } return cost } + + public getDockerRegistryAuth(registry: string): dockerRegistryAuth | null { + if (!this.dockerRegistryAuths) return null + if (this.dockerRegistryAuths[registry]) { + return this.dockerRegistryAuths[registry] + } + return null + } + + public async checkEncryptedDockerRegistryAuth( + encryptedDockerRegistryAuth: string + ): Promise { + let decryptedDockerRegistryAuth: dockerRegistryAuth + try { + const decryptedDockerRegistryAuthBuffer = await this.keyManager.decrypt( + Uint8Array.from(Buffer.from(encryptedDockerRegistryAuth, 'hex')), + EncryptMethod.ECIES + ) + + // Convert decrypted buffer to string and parse as JSON + const decryptedDockerRegistryAuthString = + decryptedDockerRegistryAuthBuffer.toString() + + decryptedDockerRegistryAuth = JSON.parse(decryptedDockerRegistryAuthString) + } catch (error: any) { + const errorMessage = `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` + CORE_LOGGER.error(errorMessage) + return { + valid: false, + reason: errorMessage, + status: 400 + } + } + + // Validate using schema - ensures either auth or username+password are provided + const validationResult = DockerRegistryAuthSchema.safeParse( + decryptedDockerRegistryAuth + ) + if (!validationResult.success) { + const errorMessageValidation = validationResult.error.errors + .map((err) => err.message) + .join('; ') + const errorMessage = `Invalid encryptedDockerRegistryAuth: ${errorMessageValidation}` + CORE_LOGGER.error(errorMessage) + return { + valid: false, + reason: errorMessage, + status: 400 + } + } + return { + valid: true, + reason: null, + status: 200 + } + } } diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 7459e7ce2..cd9ee4c60 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -51,6 +51,9 @@ import { decryptFilesObject, omitDBComputeFieldsFromComputeJob } from './index.j import { ValidateParams } from '../httpRoutes/validateCommands.js' import { Service } from '@oceanprotocol/ddo-js' import { getOceanTokenAddressForChain } from '../../utils/address.js' +import { dockerRegistrysAuth, dockerRegistryAuth } from '../../@types/OceanNode.js' +import { EncryptMethod } from '../../@types/fileObject.js' +import { ZeroAddress } from 'ethers' export class C2DEngineDocker extends C2DEngine { private envs: ComputeEnvironment[] = [] @@ -61,17 +64,19 @@ export class C2DEngineDocker extends C2DEngine { private jobImageSizes: Map = new Map() private isInternalLoopRunning: boolean = false private imageCleanupTimer: NodeJS.Timeout | null = null + private paymentClaimTimer: NodeJS.Timeout | null = null private static DEFAULT_DOCKER_REGISTRY = 'https://registry-1.docker.io' private retentionDays: number private cleanupInterval: number - + private paymentClaimInterval: number public constructor( clusterConfig: C2DClusterInfo, db: C2DDatabase, escrow: Escrow, - keyManager: KeyManager + keyManager: KeyManager, + dockerRegistryAuths: dockerRegistrysAuth ) { - super(clusterConfig, db, escrow, keyManager) + super(clusterConfig, db, escrow, keyManager, dockerRegistryAuths) this.docker = null if (clusterConfig.connection.socketPath) { @@ -83,6 +88,7 @@ export class C2DEngineDocker extends C2DEngine { } this.retentionDays = clusterConfig.connection.imageRetentionDays || 7 this.cleanupInterval = clusterConfig.connection.imageCleanupInterval || 86400 // 24 hours + this.paymentClaimInterval = clusterConfig.connection.paymentClaimInterval || 3600 // 1 hour if ( clusterConfig.connection.protocol && clusterConfig.connection.host && @@ -286,6 +292,8 @@ export class C2DEngineDocker extends C2DEngine { } // Start image cleanup timer this.startImageCleanupTimer() + // Start claim timer + this.startPaymentTimer() } public override stop(): Promise { @@ -301,6 +309,11 @@ export class C2DEngineDocker extends C2DEngine { this.imageCleanupTimer = null CORE_LOGGER.debug('Image cleanup timer stopped') } + if (this.paymentClaimTimer) { + clearInterval(this.paymentClaimTimer) + this.paymentClaimTimer = null + CORE_LOGGER.debug('Payment claim timer stopped') + } return Promise.resolve() } @@ -312,6 +325,282 @@ export class C2DEngineDocker extends C2DEngine { } } + private async claimPayments(): Promise { + const envs: string[] = [] + for (const env of this.envs) { + envs.push(env.id) + } + // get all jobs that are in the settle status + const jobs = await this.db.getJobs( + envs, + undefined, + undefined, + C2DStatusNumber.JobSettle + ) + + if (jobs.length === 0) { + return + } + + const providerAddress = this.getKeyManager().getEthAddress() + const chains: Set = new Set() + // get all unique chains + for (const job of jobs) { + if (job.payment && job.payment.token) { + chains.add(job.payment.chainId) + } + } + + // Get all locks for all chains + const locks: any[] = [] + for (const chain of chains) { + try { + const contractLocks = await this.escrow.getLocks( + chain, + ZeroAddress, + ZeroAddress, + providerAddress + ) + if (contractLocks) { + locks.push(...contractLocks) + } + } catch (e) { + CORE_LOGGER.error(`Failed to get locks for chain ${chain}: ${e.message}`) + } + } + + const currentTimestamp = BigInt(Math.floor(Date.now() / 1000)) + + // Group jobs by operation type and chain for batch processing + const jobsToClaim: Array<{ + job: DBComputeJob + cost: number + proof: string + }> = [] + const jobsToCancel: DBComputeJob[] = [] + const jobsWithoutLock: DBComputeJob[] = [] + + // Process each job to determine what operation is needed + for (const job of jobs) { + // Calculate algo duration + const algoDuration = + parseFloat(job.algoStopTimestamp) - parseFloat(job.algoStartTimestamp) + job.algoDuration = algoDuration + + // Free jobs or jobs without payment info - mark as finished + if (job.isFree || !job.payment) { + jobsWithoutLock.push(job) + continue + } + + // Find matching lock + const lock = locks.find( + (lock) => BigInt(lock.jobId.toString()) === BigInt(create256Hash(job.jobId)) + ) + + if (!lock) { + // No lock found, mark as finished + jobsWithoutLock.push(job) + continue + } + + // Check if lock is expired + const lockExpiry = BigInt(lock.expiry.toString()) + if (currentTimestamp > lockExpiry) { + // Lock expired, cancel it + jobsToCancel.push(job) + continue + } + + // Get environment to calculate cost + const env = await this.getComputeEnvironment(job.payment.chainId, job.environment) + + if (!env) { + CORE_LOGGER.warn( + `Environment not found for job ${job.jobId}, skipping payment claim` + ) + continue + } + + // Calculate minimum duration + let minDuration = 0 + if (algoDuration < 0) minDuration += algoDuration * -1 + else minDuration += algoDuration + if ( + `minJobDuration` in env && + env.minJobDuration && + minDuration < env.minJobDuration + ) { + minDuration = env.minJobDuration + } + + if (minDuration > 0) { + // We need to claim payment + const fee = env.fees[job.payment.chainId]?.find( + (fee) => fee.feeToken === job.payment.token + ) + + if (!fee) { + CORE_LOGGER.warn( + `Fee not found for job ${job.jobId}, token ${job.payment.token}, skipping` + ) + continue + } + + const cost = this.getTotalCostOfJob(job.resources, minDuration, fee) + const proof = JSON.stringify(omitDBComputeFieldsFromComputeJob(job)) + + jobsToClaim.push({ job, cost, proof }) + } else { + // No payment due, cancel the lock + jobsToCancel.push(job) + } + } + + // Batch process claims by chain + const claimsByChain = new Map< + number, + Array<{ job: DBComputeJob; cost: number; proof: string }> + >() + for (const claim of jobsToClaim) { + const { chainId } = claim.job.payment! + if (!claimsByChain.has(chainId)) { + claimsByChain.set(chainId, []) + } + claimsByChain.get(chainId)!.push(claim) + } + + // Process batch claims + for (const [chainId, claims] of claimsByChain.entries()) { + if (claims.length === 0) continue + + try { + const jobs = claims.map((c) => c.job) + const tokens = jobs.map((j) => j.payment!.token) + const payers = jobs.map((j) => j.owner) + const amounts = claims.map((c) => c.cost) + const proofs = claims.map((c) => c.proof) + + const txId = await this.escrow.claimLocks( + chainId, + jobs.map((j) => j.jobId), + tokens, + payers, + amounts, + proofs + ) + + if (txId) { + // Update all jobs with the transaction ID + for (const claim of claims) { + claim.job.payment!.claimTx = txId + claim.job.payment!.cost = claim.cost + claim.job.status = C2DStatusNumber.JobFinished + claim.job.statusText = C2DStatusText.JobFinished + await this.db.updateJob(claim.job) + } + CORE_LOGGER.info( + `Successfully claimed ${claims.length} locks in batch transaction ${txId}` + ) + } + } catch (e) { + CORE_LOGGER.error( + `Failed to batch claim locks for chain ${chainId}: ${e.message}` + ) + // Fallback to individual processing on batch failure + for (const claim of claims) { + try { + const txId = await this.escrow.claimLock( + chainId, + claim.job.jobId, + claim.job.payment!.token, + claim.job.owner, + claim.cost, + claim.proof + ) + if (txId) { + claim.job.payment!.claimTx = txId + claim.job.payment!.cost = claim.cost + claim.job.status = C2DStatusNumber.JobFinished + claim.job.statusText = C2DStatusText.JobFinished + await this.db.updateJob(claim.job) + } + } catch (err) { + CORE_LOGGER.error( + `Failed to claim lock for job ${claim.job.jobId}: ${err.message}` + ) + } + } + } + } + + // Batch process cancellations by chain + const cancellationsByChain = new Map() + for (const job of jobsToCancel) { + const { chainId } = job.payment! + if (!cancellationsByChain.has(chainId)) { + cancellationsByChain.set(chainId, []) + } + cancellationsByChain.get(chainId)!.push(job) + } + + // Process batch cancellations + for (const [chainId, jobsToCancelBatch] of cancellationsByChain.entries()) { + if (jobsToCancelBatch.length === 0) continue + + try { + const jobIds = jobsToCancelBatch.map((j) => j.jobId) + const tokens = jobsToCancelBatch.map((j) => j.payment!.token) + const payers = jobsToCancelBatch.map((j) => j.owner) + + const txId = await this.escrow.cancelExpiredLocks(chainId, jobIds, tokens, payers) + + if (txId) { + // Update all jobs + for (const job of jobsToCancelBatch) { + job.status = C2DStatusNumber.JobFinished + job.statusText = C2DStatusText.JobFinished + await this.db.updateJob(job) + } + CORE_LOGGER.info( + `Successfully cancelled ${jobsToCancelBatch.length} expired locks in batch transaction ${txId}` + ) + } + } catch (e) { + CORE_LOGGER.error( + `Failed to batch cancel locks for chain ${chainId}: ${e.message}` + ) + // Fallback to individual processing on batch failure + for (const job of jobsToCancelBatch) { + try { + const txId = await this.escrow.cancelExpiredLock( + chainId, + job.jobId, + job.payment!.token, + job.owner + ) + if (txId) { + job.status = C2DStatusNumber.JobFinished + job.statusText = C2DStatusText.JobFinished + await this.db.updateJob(job) + } + } catch (err) { + CORE_LOGGER.error( + `Failed to cancel lock for job ${job.jobId}: ${err.message}` + ) + } + } + } + } + + // Mark jobs without locks as finished + for (const job of jobsWithoutLock) { + job.status = C2DStatusNumber.JobFinished + job.statusText = C2DStatusText.JobFinished + await this.db.updateJob(job) + } + } + private async cleanupOldImages(): Promise { if (!this.docker) return @@ -376,6 +665,30 @@ export class C2DEngineDocker extends C2DEngine { ) } + private startPaymentTimer(): void { + if (this.paymentClaimTimer) { + return // Already running + } + + // Run initial cleanup after a short delay + setTimeout(() => { + this.claimPayments().catch((e) => { + CORE_LOGGER.error(`Initial payments claim failed: ${e.message}`) + }) + }, 60000) // Wait 1 minute after start + + // Set up periodic cleanup + this.paymentClaimTimer = setInterval(() => { + this.claimPayments().catch((e) => { + CORE_LOGGER.error(`Periodic payments claim failed: ${e.message}`) + }) + }, this.paymentClaimInterval * 1000) + + CORE_LOGGER.info( + `Payments claim timer started (interval: ${this.paymentClaimInterval / 60} minutes)` + ) + } + // eslint-disable-next-line require-await public override async getComputeEnvironments( chainId?: number @@ -438,7 +751,7 @@ export class C2DEngineDocker extends C2DEngine { return filteredEnvs } - private static parseImage(image: string) { + private parseImage(image: string) { let registry = C2DEngineDocker.DEFAULT_DOCKER_REGISTRY let name = image let ref = 'latest' @@ -472,13 +785,44 @@ export class C2DEngineDocker extends C2DEngine { return { registry, name, ref } } - public static async getDockerManifest(image: string): Promise { - const { registry, name, ref } = C2DEngineDocker.parseImage(image) + public async getDockerManifest( + image: string, + encryptedDockerRegistryAuth?: string + ): Promise { + const { registry, name, ref } = this.parseImage(image) const url = `${registry}/v2/${name}/manifests/${ref}` + + // Use user provided registry auth or get it from the config + let dockerRegistryAuth: dockerRegistryAuth | null = null + if (encryptedDockerRegistryAuth) { + const decryptedDockerRegistryAuth = await this.keyManager.decrypt( + Uint8Array.from(Buffer.from(encryptedDockerRegistryAuth, 'hex')), + EncryptMethod.ECIES + ) + dockerRegistryAuth = JSON.parse(decryptedDockerRegistryAuth.toString()) + } else { + dockerRegistryAuth = this.getDockerRegistryAuth(registry) + } + let headers: Record = { Accept: 'application/vnd.docker.distribution.manifest.v2+json, application/vnd.oci.image.manifest.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.oci.image.index.v1+json' } + + // If we have auth credentials, add Basic auth header to initial request + if (dockerRegistryAuth) { + // Use auth string if available, otherwise encode username:password + const authString = dockerRegistryAuth.auth + ? dockerRegistryAuth.auth + : Buffer.from( + `${dockerRegistryAuth.username}:${dockerRegistryAuth.password}` + ).toString('base64') + headers.Authorization = `Basic ${authString}` + CORE_LOGGER.debug( + `Using docker registry auth for ${registry} to get manifest for image ${image}` + ) + } + let response = await fetch(url, { headers }) if (response.status === 401) { @@ -489,7 +833,22 @@ export class C2DEngineDocker extends C2DEngine { const tokenUrl = new URL(match[1]) tokenUrl.searchParams.set('service', match[2]) tokenUrl.searchParams.set('scope', `repository:${name}:pull`) - const { token } = (await fetch(tokenUrl.toString()).then((r) => r.json())) as { + + // Add Basic auth to token request if we have credentials + const tokenHeaders: Record = {} + if (dockerRegistryAuth) { + // Use auth string if available, otherwise encode username:password + const authString = dockerRegistryAuth.auth + ? dockerRegistryAuth.auth + : Buffer.from( + `${dockerRegistryAuth.username}:${dockerRegistryAuth.password}` + ).toString('base64') + tokenHeaders.Authorization = `Basic ${authString}` + } + + const { token } = (await fetch(tokenUrl.toString(), { + headers: tokenHeaders + }).then((r) => r.json())) as { token: string } headers = { ...headers, Authorization: `Bearer ${token}` } @@ -507,16 +866,67 @@ export class C2DEngineDocker extends C2DEngine { } /** - * Checks the docker image by looking at the manifest + * Checks the docker image by looking at local images first, then remote manifest * @param image name or tag - * @returns boolean + * @param encryptedDockerRegistryAuth optional encrypted auth for remote registry + * @param platform optional platform to validate against + * @returns ValidateParams with valid flag and platform validation result */ - public static async checkDockerImage( + public async checkDockerImage( image: string, + encryptedDockerRegistryAuth?: string, platform?: RunningPlatform ): Promise { + // Step 1: Try to check local image first + if (this.docker) { + try { + const dockerImage = this.docker.getImage(image) + const imageInfo = await dockerImage.inspect() + + // Extract platform information from local image + const localPlatform = { + architecture: imageInfo.Architecture || 'amd64', + os: imageInfo.Os || 'linux' + } + + // Normalize architecture (amd64 -> x86_64 for compatibility) + if (localPlatform.architecture === 'amd64') { + localPlatform.architecture = 'x86_64' + } + + // Validate platform if required + const isValidPlatform = platform + ? checkManifestPlatform(localPlatform, platform) + : true + + if (isValidPlatform) { + CORE_LOGGER.debug(`Image ${image} found locally and platform is valid`) + return { valid: true } + } else { + CORE_LOGGER.warn( + `Image ${image} found locally but platform mismatch: ` + + `local=${localPlatform.architecture}/${localPlatform.os}, ` + + `required=${platform.architecture}/${platform.os}` + ) + return { + valid: false, + status: 400, + reason: + `Platform mismatch: image is ${localPlatform.architecture}/${localPlatform.os}, ` + + `but environment requires ${platform.architecture}/${platform.os}` + } + } + } catch (localErr: any) { + // Image not found locally or error inspecting - fall through to remote check + CORE_LOGGER.debug( + `Image ${image} not found locally (${localErr.message}), checking remote registry` + ) + } + } + + // Step 2: Fall back to remote registry check (existing behavior) try { - const manifest = await C2DEngineDocker.getDockerManifest(image) + const manifest = await this.getDockerManifest(image, encryptedDockerRegistryAuth) const platforms = Array.isArray(manifest.manifests) ? manifest.manifests.map((entry: any) => entry.platform) @@ -552,7 +962,8 @@ export class C2DEngineDocker extends C2DEngine { jobId: string, metadata?: DBComputeJobMetadata, additionalViewers?: string[], - queueMaxWaitTime?: number + queueMaxWaitTime?: number, + encryptedDockerRegistryAuth?: string ): Promise { if (!this.docker) return [] // TO DO - iterate over resources and get default runtime @@ -600,6 +1011,7 @@ export class C2DEngineDocker extends C2DEngine { throw new Error(`additionalDockerFiles cannot be used with queued jobs`) } } + const job: DBComputeJob = { clusterHash: this.getC2DConfig().hash, containerImage: image, @@ -636,7 +1048,8 @@ export class C2DEngineDocker extends C2DEngine { additionalViewers, terminationDetails: { exitCode: null, OOMKilled: null }, algoDuration: 0, - queueMaxWaitTime: queueMaxWaitTime || 0 + queueMaxWaitTime: queueMaxWaitTime || 0, + encryptedDockerRegistryAuth // we store the encrypted docker registry auth in the job } if (algorithm.meta.container && algorithm.meta.container.dockerfile) { @@ -647,7 +1060,11 @@ export class C2DEngineDocker extends C2DEngine { } } else { // already built, we need to validate it - const validation = await C2DEngineDocker.checkDockerImage(image, env.platform) + const validation = await this.checkDockerImage( + image, + job.encryptedDockerRegistryAuth, + env.platform + ) if (!validation.valid) throw new Error( `Cannot find image ${image} for ${env.platform.architecture}. Maybe it does not exist or it's build for other arhitectures.` @@ -1319,8 +1736,8 @@ export class C2DEngineDocker extends C2DEngine { } if (job.status === C2DStatusNumber.PublishingResults) { // get output - job.status = C2DStatusNumber.JobFinished - job.statusText = C2DStatusText.JobFinished + job.status = C2DStatusNumber.JobSettle + job.statusText = C2DStatusText.JobSettle let container try { container = await this.docker.getContainer(job.jobId + '-algoritm') @@ -1379,66 +1796,6 @@ export class C2DEngineDocker extends C2DEngine { this.jobImageSizes.delete(job.jobId) - // payments - const algoDuration = - parseFloat(job.algoStopTimestamp) - parseFloat(job.algoStartTimestamp) - - job.algoDuration = algoDuration - await this.db.updateJob(job) - if (!job.isFree && job.payment) { - let txId = null - const env = await this.getComputeEnvironment(job.payment.chainId, job.environment) - let minDuration = 0 - - if (algoDuration < 0) minDuration += algoDuration * -1 - else minDuration += algoDuration - if ( - env && - `minJobDuration` in env && - env.minJobDuration && - minDuration < env.minJobDuration - ) { - minDuration = env.minJobDuration - } - let cost = 0 - if (minDuration > 0) { - // we need to claim - const fee = env.fees[job.payment.chainId].find( - (fee) => fee.feeToken === job.payment.token - ) - cost = this.getTotalCostOfJob(job.resources, minDuration, fee) - const proof = JSON.stringify(omitDBComputeFieldsFromComputeJob(job)) - try { - txId = await this.escrow.claimLock( - job.payment.chainId, - job.jobId, - job.payment.token, - job.owner, - cost, - proof - ) - } catch (e) { - CORE_LOGGER.error('Failed to claim lock: ' + e.message) - } - } else { - // release the lock, we are not getting paid - try { - txId = await this.escrow.cancelExpiredLocks( - job.payment.chainId, - job.jobId, - job.payment.token, - job.owner - ) - } catch (e) { - CORE_LOGGER.error('Failed to release lock: ' + e.message) - } - } - if (txId) { - job.payment.claimTx = txId - job.payment.cost = cost - await this.db.updateJob(job) - } - } try { const container = await this.docker.getContainer(job.jobId + '-algoritm') if (container) { @@ -1644,7 +2001,50 @@ export class C2DEngineDocker extends C2DEngine { const imageLogFile = this.getC2DConfig().tempFolder + '/' + job.jobId + '/data/logs/image.log' try { - const pullStream = await this.docker.pull(job.containerImage) + // Get registry auth for the image + const { registry } = this.parseImage(job.containerImage) + // Use user provided registry auth or get it from the config + let dockerRegistryAuthForPull: any + if (originaljob.encryptedDockerRegistryAuth) { + const decryptedDockerRegistryAuth = await this.keyManager.decrypt( + Uint8Array.from(Buffer.from(originaljob.encryptedDockerRegistryAuth, 'hex')), + EncryptMethod.ECIES + ) + dockerRegistryAuthForPull = JSON.parse(decryptedDockerRegistryAuth.toString()) + } else { + dockerRegistryAuthForPull = this.getDockerRegistryAuth(registry) + } + + // Prepare authconfig for Dockerode if credentials are available + const pullOptions: any = {} + if (dockerRegistryAuthForPull) { + // Extract hostname from registry URL (remove protocol) + const registryUrl = new URL(registry) + const serveraddress = + registryUrl.hostname + (registryUrl.port ? `:${registryUrl.port}` : '') + + // Use auth string if available, otherwise encode username:password + const authString = dockerRegistryAuthForPull.auth + ? dockerRegistryAuthForPull.auth + : Buffer.from( + `${dockerRegistryAuthForPull.username}:${dockerRegistryAuthForPull.password}` + ).toString('base64') + + pullOptions.authconfig = { + serveraddress, + ...(dockerRegistryAuthForPull.auth + ? { auth: authString } + : { + username: dockerRegistryAuthForPull.username, + password: dockerRegistryAuthForPull.password + }) + } + CORE_LOGGER.debug( + `Using docker registry auth for ${registry} to pull image ${job.containerImage}` + ) + } + + const pullStream = await this.docker.pull(job.containerImage, pullOptions) await new Promise((resolve, reject) => { let wroteStatusBanner = false this.docker.modem.followProgress( diff --git a/src/components/c2d/compute_engines.ts b/src/components/c2d/compute_engines.ts index 24d7cd46b..26ad035f9 100644 --- a/src/components/c2d/compute_engines.ts +++ b/src/components/c2d/compute_engines.ts @@ -5,9 +5,9 @@ import { OceanNodeConfig } from '../../@types/OceanNode.js' import { C2DDatabase } from '../database/C2DDatabase.js' import { Escrow } from '../core/utils/escrow.js' import { KeyManager } from '../KeyManager/index.js' + export class C2DEngines { public engines: C2DEngine[] - public constructor( config: OceanNodeConfig, db: C2DDatabase, @@ -23,7 +23,15 @@ export class C2DEngines { this.engines = [] for (const cluster of config.c2dClusters) { if (cluster.type === C2DClusterType.DOCKER) { - this.engines.push(new C2DEngineDocker(cluster, db, escrow, keyManager)) + this.engines.push( + new C2DEngineDocker( + cluster, + db, + escrow, + keyManager, + config.dockerRegistrysAuth + ) + ) } } } diff --git a/src/components/c2d/index.ts b/src/components/c2d/index.ts index 8f0470c96..db343221e 100644 --- a/src/components/c2d/index.ts +++ b/src/components/c2d/index.ts @@ -54,7 +54,8 @@ export function omitDBComputeFieldsFromComputeJob(dbCompute: DBComputeJob): Comp // 'assets', 'isRunning', 'isStarted', - 'containerImage' + 'containerImage', + 'encryptedDockerRegistryAuth' ]) as ComputeJob return job } diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index e9937d4c7..4ebaf4e0d 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -28,7 +28,7 @@ import { sanitizeServiceFiles } from '../../../utils/util.js' import { FindDdoHandler } from '../handler/ddoHandler.js' import { isOrderingAllowedForAsset } from '../handler/downloadHandler.js' import { getNonceAsNumber } from '../utils/nonceHandler.js' -import { C2DEngineDocker, getAlgorithmImage } from '../../c2d/compute_engine_docker.js' +import { getAlgorithmImage } from '../../c2d/compute_engine_docker.js' import { Credentials, DDOManager } from '@oceanprotocol/ddo-js' import { checkCredentials } from '../../../utils/credentials.js' import { PolicyServer } from '../../policyServer/index.js' @@ -387,8 +387,25 @@ export class ComputeInitializeHandler extends CommandHandler { if (hasDockerImages) { const algoImage = getAlgorithmImage(task.algorithm, generateUniqueID(task)) if (algoImage) { - const validation: ValidateParams = await C2DEngineDocker.checkDockerImage( + // validate encrypteddockerRegistryAuth + let validation: ValidateParams + if (task.encryptedDockerRegistryAuth) { + validation = await engine.checkEncryptedDockerRegistryAuth( + task.encryptedDockerRegistryAuth + ) + if (!validation.valid) { + return { + stream: null, + status: { + httpStatus: validation.status, + error: `Invalid encryptedDockerRegistryAuth :${validation.reason}` + } + } + } + } + validation = await engine.checkDockerImage( algoImage, + task.encryptedDockerRegistryAuth, env.platform ) if (!validation.valid) { diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index cd625e428..11d317d02 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -151,6 +151,22 @@ export class PaidComputeStartHandler extends CommandHandler { } } } + // validate encrypteddockerRegistryAuth + if (task.encryptedDockerRegistryAuth) { + const validation = await engine.checkEncryptedDockerRegistryAuth( + task.encryptedDockerRegistryAuth + ) + if (!validation.valid) { + return { + stream: null, + status: { + httpStatus: validation.status, + error: `Invalid encryptedDockerRegistryAuth :${validation.reason}` + } + } + } + } + const { algorithm } = task const config = await getConfiguration() @@ -567,7 +583,8 @@ export class PaidComputeStartHandler extends CommandHandler { jobId, task.metadata, task.additionalViewers, - task.queueMaxWaitTime + task.queueMaxWaitTime, + task.encryptedDockerRegistryAuth ) CORE_LOGGER.logMessage( 'ComputeStartCommand Response: ' + JSON.stringify(response, null, 2), @@ -584,7 +601,7 @@ export class PaidComputeStartHandler extends CommandHandler { const errMsg = e?.message || String(e) CORE_LOGGER.error(`Error starting compute job: ${errMsg}`) try { - await engine.escrow.cancelExpiredLocks( + await engine.escrow.cancelExpiredLock( task.payment.chainId, jobId, task.payment.token, @@ -681,6 +698,22 @@ export class FreeComputeStartHandler extends CommandHandler { } } } + // validate encrypteddockerRegistryAuth + if (task.encryptedDockerRegistryAuth) { + const validation = await engine.checkEncryptedDockerRegistryAuth( + task.encryptedDockerRegistryAuth + ) + if (!validation.valid) { + return { + stream: null, + status: { + httpStatus: validation.status, + error: `Invalid encryptedDockerRegistryAuth :${validation.reason}` + } + } + } + } + const policyServer = new PolicyServer() for (const elem of [...[task.algorithm], ...task.datasets]) { if (!('documentId' in elem)) { @@ -913,7 +946,8 @@ export class FreeComputeStartHandler extends CommandHandler { jobId, task.metadata, task.additionalViewers, - task.queueMaxWaitTime + task.queueMaxWaitTime, + task.encryptedDockerRegistryAuth ) CORE_LOGGER.logMessage( diff --git a/src/components/core/utils/escrow.ts b/src/components/core/utils/escrow.ts index 10416a71e..e336f63e1 100644 --- a/src/components/core/utils/escrow.ts +++ b/src/components/core/utils/escrow.ts @@ -9,10 +9,16 @@ import { sleep } from '../../../utils/util.js' import { BlockchainRegistry } from '../../BlockchainRegistry/index.js' import { CORE_LOGGER } from '../../../utils/logging/common.js' +/** Cache key for token decimals: "chainId:tokenAddress" (token lowercased) */ +const DECIMALS_CACHE_KEY = (chainId: number, token: string) => + `${chainId}:${token.toLowerCase()}` + export class Escrow { private networks: RPCS private claimDurationTimeout: number private blockchainRegistry: BlockchainRegistry + /** Cache for token decimals to avoid repeated blockchain calls */ + private decimalsCache: Map = new Map() constructor( supportedNetworks: RPCS, @@ -27,7 +33,6 @@ export class Escrow { // eslint-disable-next-line require-await getEscrowContractAddressForChain(chainId: number): string | null { const addresses = getOceanArtifactsAdressesByChainId(chainId) - if (addresses && addresses.EnterpriseEscrow) return addresses.EnterpriseEscrow if (addresses && addresses.Escrow) return addresses.Escrow return null } @@ -51,22 +56,31 @@ export class Escrow { return blockchain } - async getPaymentAmountInWei(cost: number, chain: number, token: string) { + /** + * Get token decimals with cache to avoid repeated blockchain calls. + */ + private async getDecimals(chain: number, token: string): Promise { + const key = DECIMALS_CACHE_KEY(chain, token) + const cached = this.decimalsCache.get(key) + if (cached !== undefined) { + return cached + } const blockchain = this.getBlockchain(chain) const provider = await blockchain.getProvider() + const decimalBigNumber = await getDatatokenDecimals(token, provider) + const decimals = parseInt(decimalBigNumber.toString()) + this.decimalsCache.set(key, decimals) + return decimals + } - const decimalgBigNumber = await getDatatokenDecimals(token, provider) - const decimals = parseInt(decimalgBigNumber.toString()) - + async getPaymentAmountInWei(cost: number, chain: number, token: string) { + const decimals = await this.getDecimals(chain, token) const roundedCost = Number(cost.toFixed(decimals)).toString() - return parseUnits(roundedCost, decimals).toString() } async getNumberFromWei(wei: string, chain: number, token: string) { - const blockchain = this.getBlockchain(chain) - const provider = await blockchain.getProvider() - const decimals = await getDatatokenDecimals(token, provider) + const decimals = await this.getDecimals(chain, token) return parseFloat(formatUnits(wei, decimals)) } @@ -175,7 +189,6 @@ export class Escrow { } authorizations.` ) } - if ( BigInt(auths[0].currentLockedAmount.toString()) + BigInt(wei) > BigInt(auths[0].maxLockedAmount.toString()) @@ -215,12 +228,9 @@ export class Escrow { const contract = this.getContract(chain, signer) const wei = await this.getPaymentAmountInWei(amount, chain, token) const jobId = create256Hash(job) - CORE_LOGGER.info('ClaimLock function init') - if (!contract) return null try { const locks = await this.getLocks(chain, token, payer, await signer.getAddress()) - CORE_LOGGER.info(`Found ${locks.length} locks for job ${jobId}`) for (const lock of locks) { if (BigInt(lock.jobId.toString()) === BigInt(jobId)) { const gas = await contract.claimLockAndWithdraw.estimateGas( @@ -230,8 +240,6 @@ export class Escrow { wei, ethers.toUtf8Bytes(proof) ) - CORE_LOGGER.info('Claiming lock for job: ' + jobId) - CORE_LOGGER.info('Gas: ' + gas) const gasOptions = await blockchain.getGasOptions(gas, 1.2) const tx = await contract.claimLockAndWithdraw( jobId, @@ -241,7 +249,6 @@ export class Escrow { ethers.toUtf8Bytes(proof), gasOptions ) - CORE_LOGGER.info('Transaction hash: ' + tx?.hash) return tx.hash } } @@ -252,7 +259,7 @@ export class Escrow { } } - async cancelExpiredLocks( + async cancelExpiredLock( chain: number, job: string, token: string, @@ -268,14 +275,14 @@ export class Escrow { const locks = await this.getLocks(chain, token, payer, await signer.getAddress()) for (const lock of locks) { if (BigInt(lock.jobId.toString()) === BigInt(jobId)) { - const gas = await contract.cancelExpiredLocks.estimateGas( + const gas = await contract.cancelExpiredLock.estimateGas( jobId, token, payer, await signer.getAddress() ) const gasOptions = await blockchain.getGasOptions(gas, 1.2) - const tx = await contract.cancelExpiredLocks( + const tx = await contract.cancelExpiredLock( jobId, token, payer, @@ -292,4 +299,102 @@ export class Escrow { throw new Error(String(e.message)) } } + + async claimLocks( + chain: number, + jobs: string[], + tokens: string[], + payers: string[], + amounts: number[], + proofs: string[] + ): Promise { + const blockchain = this.getBlockchain(chain) + const signer = await blockchain.getSigner() + const contract = this.getContract(chain, signer) + if (!contract) return null + const weis: string[] = [] + const jobIds: string[] = [] + const ethProofs: Uint8Array[] = [] + if ( + jobs.length !== tokens.length || + jobs.length !== payers.length || + jobs.length !== amounts.length || + jobs.length !== proofs.length + ) { + throw new Error('Invalid input: all arrays must have the same length') + } + for (let i = 0; i < jobs.length; i++) { + const wei = await this.getPaymentAmountInWei(amounts[i], chain, tokens[i]) + weis.push(wei) + const jobId = create256Hash(jobs[i]) + jobIds.push(jobId) + ethProofs.push(ethers.toUtf8Bytes(proofs[i])) + } + try { + const gas = await contract.claimLocksAndWithdraw.estimateGas( + jobIds, + tokens, + payers, + weis, + ethProofs + ) + const gasOptions = await blockchain.getGasOptions(gas, 1.2) + const tx = await contract.claimLocksAndWithdraw( + jobIds, + tokens, + payers, + weis, + ethProofs, + gasOptions + ) + return tx.hash + } catch (e) { + CORE_LOGGER.error('Failed to claim lock: ' + e.message) + throw new Error(String(e.message)) + } + } + + async cancelExpiredLocks( + chain: number, + jobs: string[], + tokens: string[], + payers: string[] + ): Promise { + const blockchain = this.getBlockchain(chain) + const signer = await blockchain.getSigner() + if (jobs.length !== tokens.length || jobs.length !== payers.length) { + throw new Error('Invalid input: all arrays must have the same length') + } + const jobIds: string[] = [] + const payersAddresses: string[] = [] + for (let i = 0; i < jobs.length; i++) { + const jobId = create256Hash(jobs[i]) + jobIds.push(jobId) + payersAddresses.push(await signer.getAddress()) + } + const contract = this.getContract(chain, signer) + + if (!contract) return null + try { + const gas = await contract.cancelExpiredLocks.estimateGas( + jobIds, + tokens, + payers, + payersAddresses + ) + const gasOptions = await blockchain.getGasOptions(gas, 1.2) + const tx = await contract.cancelExpiredLocks( + jobIds, + tokens, + payers, + payersAddresses, + gasOptions + ) + + return tx.hash + } catch (e) { + CORE_LOGGER.error('Failed to cancel expired locks: ' + e.message) + throw new Error(String(e.message)) + } + } } diff --git a/src/components/database/C2DDatabase.ts b/src/components/database/C2DDatabase.ts index 01eff5a09..4916763ed 100644 --- a/src/components/database/C2DDatabase.ts +++ b/src/components/database/C2DDatabase.ts @@ -1,6 +1,10 @@ import path from 'path' import fs from 'fs' -import { ComputeEnvironment, DBComputeJob } from '../../@types/C2D/C2D.js' +import { + ComputeEnvironment, + DBComputeJob, + C2DStatusNumber +} from '../../@types/C2D/C2D.js' import { SQLiteCompute } from './sqliteCompute.js' import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { OceanNodeDBConfig } from '../../@types/OceanNode.js' @@ -79,9 +83,10 @@ export class C2DDatabase extends AbstractDatabase { async getJobs( environments?: string[], fromTimestamp?: string, - consumerAddrs?: string[] + consumerAddrs?: string[], + status?: C2DStatusNumber ): Promise { - return await this.provider.getJobs(environments, fromTimestamp, consumerAddrs) + return await this.provider.getJobs(environments, fromTimestamp, consumerAddrs, status) } async updateImage(image: string): Promise { diff --git a/src/components/database/sqliteCompute.ts b/src/components/database/sqliteCompute.ts index 0ae9823dc..b73fa85f0 100644 --- a/src/components/database/sqliteCompute.ts +++ b/src/components/database/sqliteCompute.ts @@ -424,7 +424,8 @@ export class SQLiteCompute implements ComputeDatabaseProvider { getJobs( environments?: string[], fromTimestamp?: string, - consumerAddrs?: string[] + consumerAddrs?: string[], + status?: C2DStatusNumber ): Promise { let selectSQL = `SELECT * FROM ${this.schema.name}` @@ -448,6 +449,11 @@ export class SQLiteCompute implements ComputeDatabaseProvider { params.push(...consumerAddrs) } + if (status) { + conditions.push(`status = ?`) + params.push(status.toString()) + } + if (conditions.length > 0) { selectSQL += ` WHERE ${conditions.join(' AND ')}` } diff --git a/src/test/integration/algorithmsAccess.test.ts b/src/test/integration/algorithmsAccess.test.ts index e80d1947d..12cc2e114 100644 --- a/src/test/integration/algorithmsAccess.test.ts +++ b/src/test/integration/algorithmsAccess.test.ts @@ -422,7 +422,12 @@ describe('Trusted algorithms Flow', () => { try { await escrowContract .connect(consumerAccount) - .cancelExpiredLocks(lock.jobId, lock.token, lock.payer, lock.payee) + .cancelExpiredLock( + lock.jobId, + lock.token, + lock.payer, + firstEnv.consumerAddress + ) } catch (e) {} } } diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 53d041be2..2dbe382c8 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -20,7 +20,10 @@ import type { import { type ComputeAsset, type ComputeAlgorithm, - type ComputeEnvironment + type ComputeEnvironment, + C2DStatusNumber, + C2DStatusText, + type DBComputeJob } from '../../@types/C2D/C2D.js' import { // DB_TYPES, @@ -75,6 +78,8 @@ import { import { freeComputeStartPayload } from '../data/commands.js' import { DDOManager } from '@oceanprotocol/ddo-js' +import Dockerode from 'dockerode' +import { C2DEngineDocker } from '../../components/c2d/compute_engine_docker.js' const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) @@ -274,25 +279,19 @@ describe('Compute', () => { publishedComputeDataset = await waitToIndex( publishedComputeDataset.ddo.id, EVENTS.METADATA_UPDATED, - DEFAULT_TEST_TIMEOUT * 3, + DEFAULT_TEST_TIMEOUT * 2, true ) - if (!publishedComputeDataset.ddo) { - expect(expectedTimeoutFailure(this.test.title)).to.be.equal( - publishedComputeDataset.wasTimeout - ) - } else { - assert( - publishedComputeDataset?.ddo?.services[0]?.compute?.publisherTrustedAlgorithms - .length > 0, - 'Trusted algorithms not updated' - ) - assert( - publishedComputeDataset?.ddo?.services[0]?.compute?.publisherTrustedAlgorithms[0] - .did === publishedAlgoDataset.ddo.id, - 'Algorithm DID mismatch in trusted algorithms' - ) - } + assert( + publishedComputeDataset?.ddo?.services[0]?.compute?.publisherTrustedAlgorithms + .length > 0, + 'Trusted algorithms not updated' + ) + assert( + publishedComputeDataset?.ddo?.services[0]?.compute?.publisherTrustedAlgorithms[0] + .did === publishedAlgoDataset.ddo.id, + 'Algorithm DID mismatch in trusted algorithms' + ) }) it('Get compute environments', async () => { @@ -344,7 +343,6 @@ describe('Compute', () => { const response = await new ComputeGetEnvironmentsHandler(oceanNode).handle( getEnvironmentsTask ) - console.log('firstEnv', firstEnv) computeEnvironments = await streamToObject(response.stream as Readable) firstEnv = computeEnvironments[0] const initializeComputeTask: ComputeInitializeCommand = { @@ -694,7 +692,12 @@ describe('Compute', () => { try { await escrowContract .connect(consumerAccount) - .cancelExpiredLocks(lock.jobId, lock.token, lock.payer, lock.payee) + .cancelExpiredLock( + lock.jobId, + lock.token, + lock.payer, + firstEnv.consumerAddress + ) } catch (e) {} } locks = await oceanNode.escrow.getLocks( @@ -1382,7 +1385,6 @@ describe('Compute', () => { const datasetDDOTest = ddo const datasetInstance = DDOManager.getDDOClass(datasetDDO) - console.log('datasetDDOTest', datasetDDOTest) if (datasetDDOTest) { const result = await validateAlgoForDataset( algoDDOTest.id, @@ -1401,6 +1403,626 @@ describe('Compute', () => { }) }) + describe('encryptedDockerRegistryAuth integration tests', () => { + /** + * Helper function to encrypt docker registry auth using ECIES + */ + async function encryptDockerRegistryAuth(auth: { + username?: string + password?: string + auth?: string + }): Promise { + const authJson = JSON.stringify(auth) + const authData = Uint8Array.from(Buffer.from(authJson)) + const encrypted = await oceanNode + .getKeyManager() + .encrypt(authData, EncryptMethod.ECIES) + return Buffer.from(encrypted).toString('hex') + } + + it('should initialize compute with valid encryptedDockerRegistryAuth (username/password)', async () => { + const validAuth = { + username: 'testuser', + password: 'testpass' + } + const encryptedAuth = await encryptDockerRegistryAuth(validAuth) + + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + } + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + encryptedDockerRegistryAuth: encryptedAuth + } + + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + assert(resp, 'Failed to get response') + // Should succeed (200) or fail for other reasons, but not due to auth validation + // Check that error is not a validation error (format validation), even if Docker auth fails + if (resp.status.httpStatus !== 200) { + expect(resp.status.error).to.not.include('Invalid encryptedDockerRegistryAuth') + } + if (resp.status.httpStatus === 200) { + assert(resp.stream, 'Failed to get stream') + expect(resp.stream).to.be.instanceOf(Readable) + } + }) + + it('should initialize compute with valid encryptedDockerRegistryAuth (auth string)', async () => { + const validAuth = { + auth: Buffer.from('testuser:testpass').toString('base64') + } + const encryptedAuth = await encryptDockerRegistryAuth(validAuth) + + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + } + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + encryptedDockerRegistryAuth: encryptedAuth + } + + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + assert(resp, 'Failed to get response') + // Should succeed (200) or fail for other reasons, but not due to auth validation + // Check that error is not a validation error (format validation), even if Docker auth fails + if (resp.status.httpStatus !== 200) { + expect(resp.status.error).to.not.include('Invalid encryptedDockerRegistryAuth') + } + if (resp.status.httpStatus === 200) { + assert(resp.stream, 'Failed to get stream') + expect(resp.stream).to.be.instanceOf(Readable) + } + }) + + it('should fail initialize compute with invalid encryptedDockerRegistryAuth (missing password)', async () => { + const invalidAuth = { + username: 'testuser' + // missing password + } + const encryptedAuth = await encryptDockerRegistryAuth(invalidAuth) + + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + } + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + encryptedDockerRegistryAuth: encryptedAuth + } + + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + assert(resp, 'Failed to get response') + // Should fail with 400 due to validation error + assert( + resp.status.httpStatus === 400, + `Expected 400 but got ${resp.status.httpStatus}: ${resp.status.error}` + ) + expect(resp.status.error).to.include('Invalid encryptedDockerRegistryAuth') + expect(resp.status.error).to.include( + "Either 'auth' must be provided, or both 'username' and 'password' must be provided" + ) + }) + + it('should fail initialize compute with invalid encryptedDockerRegistryAuth (empty object)', async () => { + const invalidAuth = {} + const encryptedAuth = await encryptDockerRegistryAuth(invalidAuth) + + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + } + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + encryptedDockerRegistryAuth: encryptedAuth + } + + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + assert(resp, 'Failed to get response') + assert( + resp.status.httpStatus === 400, + `Expected 400 but got ${resp.status.httpStatus}: ${resp.status.error}` + ) + expect(resp.status.error).to.include('Invalid encryptedDockerRegistryAuth') + }) + + it('should start paid compute job with valid encryptedDockerRegistryAuth', async () => { + const validAuth = { + username: 'testuser', + password: 'testpass' + } + const encryptedAuth = await encryptDockerRegistryAuth(validAuth) + + const nonce = Date.now().toString() + const message = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce + ) + const consumerMessage = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(message))] + ) + const messageHashBytes = ethers.toBeArray(consumerMessage) + const signature = await wallet.signMessage(messageHashBytes) + + const startComputeTask: PaidComputeStartCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_START, + consumerAddress: await consumerAccount.getAddress(), + environment: firstEnv.id, + signature, + nonce, + datasets: [ + { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + ], + algorithm: { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + }, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + encryptedDockerRegistryAuth: encryptedAuth + } + + const response = await new PaidComputeStartHandler(oceanNode).handle( + startComputeTask + ) + assert(response, 'Failed to get response') + // Should succeed (200) or fail for other reasons, but not due to auth validation + // Check that error is not a validation error (format validation), even if Docker auth fails + if (response.status.httpStatus !== 200) { + expect(response.status.error).to.not.include( + 'Invalid encryptedDockerRegistryAuth' + ) + } + if (response.status.httpStatus === 200) { + assert(response.stream, 'Failed to get stream') + expect(response.stream).to.be.instanceOf(Readable) + } + }) + + it('should fail paid compute start with invalid encryptedDockerRegistryAuth', async () => { + const invalidAuth = { + username: 'testuser' + // missing password + } + const encryptedAuth = await encryptDockerRegistryAuth(invalidAuth) + + const nonce = Date.now().toString() + const message = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce + ) + const consumerMessage = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(message))] + ) + const messageHashBytes = ethers.toBeArray(consumerMessage) + const signature = await wallet.signMessage(messageHashBytes) + + const startComputeTask: PaidComputeStartCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_START, + consumerAddress: await consumerAccount.getAddress(), + environment: firstEnv.id, + signature, + nonce, + datasets: [ + { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + ], + algorithm: { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + }, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + encryptedDockerRegistryAuth: encryptedAuth + } + + const response = await new PaidComputeStartHandler(oceanNode).handle( + startComputeTask + ) + assert(response, 'Failed to get response') + assert( + response.status.httpStatus === 400, + `Expected 400 but got ${response.status.httpStatus}: ${response.status.error}` + ) + expect(response.status.error).to.include('Invalid encryptedDockerRegistryAuth') + }) + + it('should start free compute job with valid encryptedDockerRegistryAuth', async () => { + const validAuth = { + username: 'testuser', + password: 'testpass' + } + const encryptedAuth = await encryptDockerRegistryAuth(validAuth) + + const nonce = Date.now().toString() + const consumerMessage = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(nonce))] + ) + const signature = await wallet.signMessage(ethers.toBeArray(consumerMessage)) + + const startComputeTask: FreeComputeStartCommand = { + command: PROTOCOL_COMMANDS.FREE_COMPUTE_START, + consumerAddress: await wallet.getAddress(), + signature, + nonce, + environment: firstEnv.id, + datasets: [ + { + fileObject: computeAsset.services[0].files.files[0], + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: datasetOrderTxId + } + ], + algorithm: { + fileObject: algoAsset.services[0].files.files[0], + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: algoOrderTxId, + meta: publishedAlgoDataset.ddo.metadata.algorithm + }, + output: {}, + encryptedDockerRegistryAuth: encryptedAuth + } + + const response = await new FreeComputeStartHandler(oceanNode).handle( + startComputeTask + ) + assert(response, 'Failed to get response') + // Should succeed (200) or fail for other reasons, but not due to auth validation + // Check that error is not a validation error (format validation), even if Docker auth fails + if (response.status.httpStatus !== 200) { + expect(response.status.error).to.not.include( + 'Invalid encryptedDockerRegistryAuth' + ) + } + if (response.status.httpStatus === 200) { + assert(response.stream, 'Failed to get stream') + expect(response.stream).to.be.instanceOf(Readable) + } + }) + + it('should fail free compute start with invalid encryptedDockerRegistryAuth', async () => { + const invalidAuth = { + password: 'testpass' + // missing username + } + const encryptedAuth = await encryptDockerRegistryAuth(invalidAuth) + + const nonce = Date.now().toString() + const consumerMessage = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(nonce))] + ) + const signature = await wallet.signMessage(ethers.toBeArray(consumerMessage)) + + const startComputeTask: FreeComputeStartCommand = { + command: PROTOCOL_COMMANDS.FREE_COMPUTE_START, + consumerAddress: await wallet.getAddress(), + signature, + nonce, + environment: firstEnv.id, + datasets: [ + { + fileObject: computeAsset.services[0].files.files[0], + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: datasetOrderTxId + } + ], + algorithm: { + fileObject: algoAsset.services[0].files.files[0], + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: algoOrderTxId, + meta: publishedAlgoDataset.ddo.metadata.algorithm + }, + output: {}, + encryptedDockerRegistryAuth: encryptedAuth + } + + const response = await new FreeComputeStartHandler(oceanNode).handle( + startComputeTask + ) + assert(response, 'Failed to get response') + assert( + response.status.httpStatus === 400, + `Expected 400 but got ${response.status.httpStatus}: ${response.status.error}` + ) + expect(response.status.error).to.include('Invalid encryptedDockerRegistryAuth') + }) + + it('should handle invalid hex-encoded encryptedDockerRegistryAuth gracefully', async () => { + const invalidHex = 'not-a-valid-hex-string' + + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + } + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + encryptedDockerRegistryAuth: invalidHex + } + + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + assert(resp, 'Failed to get response') + // Should fail with 500 due to decryption/parsing error + assert( + resp.status.httpStatus === 400, + `Expected 400 but got ${resp.status.httpStatus}: ${resp.status.error}` + ) + expect(resp.status.error).to.include('Invalid encryptedDockerRegistryAuth') + }) + }) + + describe('Local Docker image checking', () => { + let docker: Dockerode + let dockerEngine: C2DEngineDocker + const testImage = 'alpine:3.18' + before(async function () { + // Skip if Docker not available + try { + docker = new Dockerode() + await docker.info() + const pullStream = await docker.pull(testImage) + await new Promise((resolve, reject) => { + let wroteStatusBanner = false + docker.modem.followProgress( + pullStream, + (err: any, res: any) => { + // onFinished + if (err) { + console.log(err) + return reject(err) + } + console.log(`Successfully pulled image: ${testImage}`) + resolve(res) + }, + (progress: any) => { + // onProgress + if (!wroteStatusBanner) { + wroteStatusBanner = true + console.log('############# Pull docker image status: ##############') + } + // only write the status banner once, its cleaner + let logText = '' + if (progress.id) logText += progress.id + ' : ' + progress.status + else logText = progress.status + console.log('Pulling image : ' + logText) + } + ) + }) + } catch (e) { + this.skip() + } + + // Get the Docker engine from oceanNode + const c2dEngines = oceanNode.getC2DEngines() + const engines = (c2dEngines as any).engines as C2DEngineDocker[] + dockerEngine = engines.find((e) => e instanceof C2DEngineDocker) + if (!dockerEngine) { + this.skip() + } + }) + + it('should check local image when it exists locally', async function () { + // Check the image - should find it locally + const result = await dockerEngine.checkDockerImage(testImage) + + assert(result, 'Result should exist') + assert(result.valid === true, 'Image should be valid') + }).timeout(30000) + + it('should validate platform for local images', async function () { + // Get the platform from the local image + const imageInfo = await docker.getImage(testImage).inspect() + const localArch = imageInfo.Architecture || 'amd64' + const localOs = imageInfo.Os || 'linux' + + // Check with matching platform + const matchingPlatform = { + architecture: localArch === 'amd64' ? 'x86_64' : localArch, + os: localOs + } + const resultMatching = await dockerEngine.checkDockerImage( + testImage, + undefined, + matchingPlatform + ) + + assert(resultMatching, 'Result should exist') + assert( + resultMatching.valid === true, + 'Image should be valid with matching platform' + ) + }).timeout(30000) + + it('should detect platform mismatch for local images', async function () { + // Check with mismatched platform (assuming local is linux/amd64 or linux/x86_64) + const mismatchedPlatform = { + architecture: 'arm64', // Different architecture + os: 'linux' + } + const resultMismatch = await dockerEngine.checkDockerImage( + testImage, + undefined, + mismatchedPlatform + ) + + assert(resultMismatch, 'Result should exist') + assert( + resultMismatch.valid === false, + 'Image should be invalid with mismatched platform' + ) + assert(resultMismatch.status === 400, 'Status should be 400 for platform mismatch') + assert( + resultMismatch.reason.includes('Platform mismatch'), + 'Reason should include platform mismatch message' + ) + }).timeout(30000) + + it('should fall back to remote registry when local image not found', async function () { + const nonExistentLocalImage = 'nonexistent-local-image:latest' + + // Ensure image doesn't exist locally + try { + const image = docker.getImage(nonExistentLocalImage) + await image.inspect() + // If we get here, image exists - remove it for test + await image.remove({ force: true }) + } catch (e) { + // Image doesn't exist locally, which is what we want + } + + // Check the image - should fall back to remote check + // This will likely fail with 404, but we're testing the fallback behavior + const result = await dockerEngine.checkDockerImage(nonExistentLocalImage) + + assert(result, 'Result should exist') + // Should have attempted remote check (will fail, but that's expected) + assert(result.valid === false, 'Image should be invalid (not found)') + assert(result.status === 404, 'Status should be 404 for not found') + }).timeout(30000) + + it('should work without platform validation when platform not specified', async function () { + // Check without platform - should succeed if image exists + const result = await dockerEngine.checkDockerImage(testImage) + + assert(result, 'Result should exist') + assert(result.valid === true, 'Image should be valid without platform check') + }).timeout(30000) + + after(async function () { + // Clean up test images if needed + try { + await docker.info() + } catch (e) { + // Docker not available, skip cleanup + } + + // Optionally remove test images to save space + // (commented out to avoid breaking other tests that might use these images) + /* + try { + const image = docker.getImage('alpine:3.18') + await image.remove({ force: true }) + } catch (e) { + // Ignore errors during cleanup + } + */ + }) + }) + after(async () => { await tearDownEnvironment(previousConfiguration) indexer.stopAllChainIndexers() @@ -1822,4 +2444,551 @@ describe('Compute Access Restrictions', () => { await tearDownEnvironment(previousConfiguration) }) }) + + describe('Payment Claim Timer and JobSettle Status', () => { + let previousConfiguration: OverrideEnvConfig[] + let config: OceanNodeConfig + let dbconn: Database + let oceanNode: OceanNode + let dockerEngine: C2DEngineDocker + let paymentToken: any + let firstEnv: ComputeEnvironment + let consumerAccount: any + let escrowContract: any + let paymentTokenContract: any + let artifactsAddresses: any + + before(async function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 2) + artifactsAddresses = getOceanArtifactsAdresses() + paymentToken = artifactsAddresses.development.Ocean + previousConfiguration = await setupEnvironment( + TEST_ENV_CONFIG_FILE, + buildEnvOverrideConfig( + [ + ENVIRONMENT_VARIABLES.RPCS, + ENVIRONMENT_VARIABLES.INDEXER_NETWORKS, + ENVIRONMENT_VARIABLES.PRIVATE_KEY, + ENVIRONMENT_VARIABLES.ADDRESS_FILE, + ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS + ], + [ + JSON.stringify(mockSupportedNetworks), + JSON.stringify([DEVELOPMENT_CHAIN_ID]), + '0xc594c6e5def4bab63ac29eed19a134c130388f74f019bc74b8f4389df2837a58', + `${homedir}/.ocean/ocean-contracts/artifacts/address.json`, + '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"paymentClaimInterval":60,"fees":{"' + + DEVELOPMENT_CHAIN_ID + + '":[{"feeToken":"' + + paymentToken + + '","prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration":10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]' + ] + ) + ) + config = await getConfiguration(true) + dbconn = await Database.init(config.dbConfig) + oceanNode = await OceanNode.getInstance( + config, + dbconn, + null, + null, + null, + null, + null, + true + ) + const indexer = new OceanIndexer( + dbconn, + config.indexingNetworks, + oceanNode.blockchainRegistry + ) + oceanNode.addIndexer(indexer) + oceanNode.addC2DEngines() + + const provider = new JsonRpcProvider('http://127.0.0.1:8545') + const publisherAccount = (await provider.getSigner(0)) as Signer + consumerAccount = (await provider.getSigner(1)) as Signer + escrowContract = new ethers.Contract( + artifactsAddresses.development.Escrow, + EscrowJson.abi, + consumerAccount + ) + paymentTokenContract = new ethers.Contract( + paymentToken, + OceanToken.abi, + publisherAccount + ) + + // Get the Docker engine + const c2dEngines = oceanNode.getC2DEngines() + const engines = (c2dEngines as any).engines as C2DEngineDocker[] + dockerEngine = engines.find((e) => e instanceof C2DEngineDocker) + if (!dockerEngine) { + this.skip() + } + + // Get compute environments + const getEnvironmentsTask = { + command: PROTOCOL_COMMANDS.COMPUTE_GET_ENVIRONMENTS + } + const resp = await new ComputeGetEnvironmentsHandler(oceanNode).handle( + getEnvironmentsTask + ) + const computeEnvironments = await streamToObject(resp.stream as Readable) + firstEnv = computeEnvironments[0] + }) + + after(async () => { + await tearDownEnvironment(previousConfiguration) + }) + + it('should transition job to JobSettle status when PublishingResults completes', async function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 2) + + // Create a test job in PublishingResults status + const testJobId = `test-job-settle-${Date.now()}` + const now = Math.floor(Date.now() / 1000).toString() + const testJob: DBComputeJob = { + owner: await consumerAccount.getAddress(), + jobId: testJobId, + dateCreated: now, + status: C2DStatusNumber.PublishingResults, + statusText: C2DStatusText.PublishingResults, + environment: firstEnv.id, + isRunning: false, + isStarted: true, + stopRequested: false, + algoStartTimestamp: String(parseInt(now) - 120), // 2 minutes ago + algoStopTimestamp: now, + algoDuration: 120, + isFree: false, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken, + lockTx: '0x123', + claimTx: '', + cost: 0 + }, + resources: [ + { + id: 'cpu', + amount: 1, + price: 1 + } + ], + clusterHash: 'test-cluster', + configlogURL: '', + publishlogURL: '', + algologURL: '', + outputsURL: '', + algorithm: {} as ComputeAlgorithm, + assets: [] as ComputeAsset[], + containerImage: null, + dateFinished: null, + results: [], + queueMaxWaitTime: 0 + } + + await dbconn.c2d.newJob(testJob) + + // Simulate finishJob being called (which would normally happen after results are published) + // We'll manually update to JobSettle to test the payment claim flow + testJob.status = C2DStatusNumber.JobSettle + testJob.statusText = C2DStatusText.JobSettle + await dbconn.c2d.updateJob(testJob) + + // Verify job is in JobSettle status + const updatedJob = await dbconn.c2d.getJob(testJobId) + assert( + updatedJob[0].status === C2DStatusNumber.JobSettle, + 'Job should be in JobSettle status' + ) + assert( + updatedJob[0].statusText === C2DStatusText.JobSettle, + 'Job statusText should be Job settling' + ) + }) + + it('should process jobs in JobSettle status via claimPayments', async function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 3) + + // Create a test job with a lock + const testJobId = `test-job-claim-${Date.now()}` + const now = Math.floor(Date.now() / 1000) + const expiry = 3500 + + const providerAddress = await (await oceanNode.getKeyManager()).getEthAddress() + + // Clean up existing locks and authorizations first + const locks = await oceanNode.escrow.getLocks( + DEVELOPMENT_CHAIN_ID, + paymentToken, + await consumerAccount.getAddress(), + providerAddress + ) + if (locks.length > 0) { + // Cancel all existing locks + for (const lock of locks) { + try { + await escrowContract + .connect(consumerAccount) + .cancelExpiredLock(lock.jobId, lock.token, lock.payer, providerAddress) + } catch (e) { + // Ignore errors + } + } + } + + // Clean up existing authorizations + let auth = await oceanNode.escrow.getAuthorizations( + DEVELOPMENT_CHAIN_ID, + paymentToken, + await consumerAccount.getAddress(), + providerAddress + ) + if (auth.length > 0) { + // Remove authorization by setting to 0 + await escrowContract + .connect(consumerAccount) + .authorize(paymentToken, providerAddress, 0, 0, 0) + } + + // Check and withdraw existing funds if any + const funds = await oceanNode.escrow.getUserAvailableFunds( + DEVELOPMENT_CHAIN_ID, + await consumerAccount.getAddress(), + paymentToken + ) + if (BigInt(funds.toString()) > BigInt(0)) { + await escrowContract.connect(consumerAccount).withdraw([paymentToken], [funds]) + } + + // Now set up fresh authorization + const balance = await paymentTokenContract.balanceOf( + await consumerAccount.getAddress() + ) + if (BigInt(balance.toString()) === BigInt(0)) { + const mintAmount = ethers.parseUnits('1000', 18) + const mintTx = await paymentTokenContract.mint( + await consumerAccount.getAddress(), + mintAmount + ) + await mintTx.wait() + } + + const approveTx = await paymentTokenContract + .connect(consumerAccount) + .approve(artifactsAddresses.development.Escrow, balance) + await approveTx.wait() + + const depositTx = await escrowContract + .connect(consumerAccount) + .deposit(paymentToken, balance) + await depositTx.wait() + + const authorizeTx = await escrowContract + .connect(consumerAccount) + .authorize(paymentToken, providerAddress, balance, 3600, 10) + await authorizeTx.wait() + + // Verify authorization is set up correctly + auth = await oceanNode.escrow.getAuthorizations( + DEVELOPMENT_CHAIN_ID, + paymentToken, + await consumerAccount.getAddress(), + providerAddress + ) + assert(auth.length > 0, 'Should have authorization') + assert( + BigInt(auth[0].maxLockedAmount.toString()) > BigInt(0), + 'Should have maxLockedAmount in auth' + ) + + // Create lock + const lockTx = await oceanNode.escrow.createLock( + DEVELOPMENT_CHAIN_ID, + testJobId, + paymentToken, + await consumerAccount.getAddress(), + 100, // amount + expiry + ) + assert(lockTx, 'Lock creation should succeed') + + // Create job in JobSettle status + const testJob: DBComputeJob = { + owner: await consumerAccount.getAddress(), + jobId: testJobId, + dateCreated: now.toString(), + status: C2DStatusNumber.JobSettle, + statusText: C2DStatusText.JobSettle, + environment: firstEnv.id, + isRunning: false, + isStarted: true, + stopRequested: false, + algoStartTimestamp: String(now - 120), + algoStopTimestamp: now.toString(), + algoDuration: 120, + isFree: false, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken, + lockTx: lockTx || '0x123', + claimTx: '', + cost: 0 + }, + resources: [ + { + id: 'cpu', + amount: 1, + price: 1 + } + ], + clusterHash: 'test-cluster', + configlogURL: '', + publishlogURL: '', + algologURL: '', + outputsURL: '', + algorithm: {} as ComputeAlgorithm, + assets: [] as ComputeAsset[], + containerImage: null, + dateFinished: null, + results: [], + queueMaxWaitTime: 0 + } + + await dbconn.c2d.newJob(testJob) + + // Manually trigger claimPayments + const claimPaymentsMethod = (dockerEngine as any).claimPayments.bind(dockerEngine) + await claimPaymentsMethod() + + // Wait a bit for transaction to process + await sleep(5000) + + // Verify job was processed (should be JobFinished if claim succeeded, or still JobSettle if failed) + const updatedJob = await dbconn.c2d.getJob(testJobId) + // Job should be finished + assert( + updatedJob[0].status === C2DStatusNumber.JobFinished, + `Job status should be JobFinished or JobSettle, got ${updatedJob[0].status}` + ) + }) + + it('should handle expired locks by canceling them', async function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 3) + + const testJobId = `test-job-expired-${Date.now()}` + const now = Math.floor(Date.now() / 1000) + + // Create lock with expired timestamp (we'll need to mock this or use a different approach) + // For this test, we'll create a job and verify it handles expiration correctly + const testJob: DBComputeJob = { + owner: await consumerAccount.getAddress(), + jobId: testJobId, + dateCreated: now.toString(), + status: C2DStatusNumber.JobSettle, + statusText: C2DStatusText.JobSettle, + environment: firstEnv.id, + isRunning: false, + isStarted: true, + stopRequested: false, + algoStartTimestamp: String(now - 120), + algoStopTimestamp: now.toString(), + algoDuration: 120, + isFree: false, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken, + lockTx: '0xexpired', + claimTx: '', + cost: 0 + }, + resources: [ + { + id: 'cpu', + amount: 1, + price: 1 + } + ], + clusterHash: 'test-cluster', + configlogURL: '', + publishlogURL: '', + algologURL: '', + outputsURL: '', + algorithm: {} as ComputeAlgorithm, + assets: [] as ComputeAsset[], + containerImage: null, + dateFinished: null, + results: [], + queueMaxWaitTime: 0 + } + + await dbconn.c2d.newJob(testJob) + + // Trigger claimPayments - if lock is expired, it should cancel it + const claimPaymentsMethod = (dockerEngine as any).claimPayments.bind(dockerEngine) + await claimPaymentsMethod() + + // Wait for processing + await sleep(3000) + + // Verify job was handled (either finished or still settling) + const updatedJob = await dbconn.c2d.getJob(testJobId) + assert( + updatedJob[0].status === C2DStatusNumber.JobFinished, + 'Job should be processed' + ) + }) + + it('should skip payment logic for free jobs', async function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 2) + + const testJobId = `test-job-free-${Date.now()}` + const now = Math.floor(Date.now() / 1000).toString() + + const testJob: DBComputeJob = { + owner: await consumerAccount.getAddress(), + jobId: testJobId, + dateCreated: now, + status: C2DStatusNumber.JobSettle, + statusText: C2DStatusText.JobSettle, + environment: firstEnv.id, + isRunning: false, + isStarted: true, + stopRequested: false, + algoStartTimestamp: String(parseInt(now) - 120), + algoStopTimestamp: now, + algoDuration: 120, + isFree: true, // Free job + resources: [ + { + id: 'cpu', + amount: 1, + price: 0 + } + ], + clusterHash: 'test-cluster', + configlogURL: '', + publishlogURL: '', + algologURL: '', + outputsURL: '', + algorithm: {} as ComputeAlgorithm, + assets: [] as ComputeAsset[], + containerImage: null, + dateFinished: null, + results: [], + queueMaxWaitTime: 0 + } + + await dbconn.c2d.newJob(testJob) + + // Trigger claimPayments + const claimPaymentsMethod = (dockerEngine as any).claimPayments.bind(dockerEngine) + await claimPaymentsMethod() + + // Wait for processing + await sleep(2000) + + // Free jobs should be marked as finished immediately (no lock to check) + const updatedJob = await dbconn.c2d.getJob(testJobId) + // Free jobs without locks should be marked as finished + assert( + updatedJob[0].status === C2DStatusNumber.JobFinished, + 'Free job should be marked as finished' + ) + }) + + it('should process multiple jobs in batch', async function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 3) + + const jobIds: string[] = [] + const now = Math.floor(Date.now() / 1000) + + // Create multiple jobs in JobSettle status + for (let i = 0; i < 3; i++) { + const testJobId = `test-job-batch-${i}-${Date.now()}` + jobIds.push(testJobId) + + const testJob: DBComputeJob = { + owner: await consumerAccount.getAddress(), + jobId: testJobId, + dateCreated: now.toString(), + status: C2DStatusNumber.JobSettle, + statusText: C2DStatusText.JobSettle, + environment: firstEnv.id, + isRunning: false, + isStarted: true, + stopRequested: false, + algoStartTimestamp: String(now - 120), + algoStopTimestamp: now.toString(), + algoDuration: 120, + isFree: true, // Use free jobs for simpler testing + resources: [ + { + id: 'cpu', + amount: 1, + price: 0 + } + ], + clusterHash: 'test-cluster', + configlogURL: '', + publishlogURL: '', + algologURL: '', + outputsURL: '', + algorithm: {} as ComputeAlgorithm, + assets: [] as ComputeAsset[], + containerImage: null, + dateFinished: null, + results: [], + queueMaxWaitTime: 0 + } + + await dbconn.c2d.newJob(testJob) + } + + // Verify all jobs are in JobSettle status + const jobsBefore = await dbconn.c2d.getJobs( + [firstEnv.id], + undefined, + undefined, + C2DStatusNumber.JobSettle + ) + const testJobsBefore = jobsBefore.filter((j) => jobIds.includes(j.jobId)) + assert( + testJobsBefore.length === 3, + `Should have 3 jobs in JobSettle status, got ${testJobsBefore.length}` + ) + + // Trigger claimPayments to process all + const claimPaymentsMethod = (dockerEngine as any).claimPayments.bind(dockerEngine) + await claimPaymentsMethod() + + // Wait for processing + await sleep(3000) + + // Verify all jobs were processed + for (const jobId of jobIds) { + const job = await dbconn.c2d.getJob(jobId) + assert( + job[0].status === C2DStatusNumber.JobFinished, + `Job ${jobId} should be processed` + ) + } + }) + + it('should start payment claim timer on engine start', function () { + // Verify timer methods exist + // Timer might be null if not started yet, or a NodeJS.Timeout if started + // We can't easily test the timer directly, but we can verify the method exists + assert( + typeof (dockerEngine as any).startPaymentTimer === 'function', + 'startPaymentTimer method should exist' + ) + assert( + typeof (dockerEngine as any).claimPayments === 'function', + 'claimPayments method should exist' + ) + }) + }) }) diff --git a/src/test/integration/dockerRegistryAuth.test.ts b/src/test/integration/dockerRegistryAuth.test.ts new file mode 100644 index 000000000..394bbd1df --- /dev/null +++ b/src/test/integration/dockerRegistryAuth.test.ts @@ -0,0 +1,398 @@ +/* eslint-disable no-unused-expressions */ +/** + * Integration test for Docker registry authentication functionality. + * + * Tests the getDockerManifest method with: + * - Public images (no credentials) + * - Registry auth configuration (username/password and auth string) + * - Error handling + */ +import { expect, assert } from 'chai' +import { C2DEngineDocker } from '../../components/c2d/compute_engine_docker.js' +import { C2DClusterInfo, C2DClusterType } from '../../@types/C2D/C2D.js' +import { dockerRegistrysAuth } from '../../@types/OceanNode.js' +import { DockerRegistryAuthSchema } from '../../utils/config/schemas.js' + +describe('Docker Registry Authentication Integration Tests', () => { + describe('Public registry access (no credentials)', () => { + it('should successfully fetch manifest for public Docker Hub image', async () => { + // Create minimal engine instance for testing + const clusterConfig: C2DClusterInfo = { + type: C2DClusterType.DOCKER, + hash: 'test-cluster-hash', + connection: { + socketPath: '/var/run/docker.sock' + }, + tempFolder: '/tmp/test-docker' + } + + // Mock minimal dependencies - we only need getDockerManifest + const dockerEngine = new C2DEngineDocker( + clusterConfig, + null as any, + null as any, + null as any, + {} // No auth config + ) + + // Test with a well-known public image + const image = 'library/alpine:latest' + const manifest = await dockerEngine.getDockerManifest(image, null) + + expect(manifest).to.exist + expect(manifest).to.have.property('schemaVersion') + expect(manifest).to.have.property('mediaType') + }).timeout(10000) + + it('should successfully fetch manifest for public image with explicit tag', async () => { + const clusterConfig: C2DClusterInfo = { + type: C2DClusterType.DOCKER, + hash: 'test-cluster-hash-2', + connection: { + socketPath: '/var/run/docker.sock' + }, + tempFolder: '/tmp/test-docker-2' + } + + const dockerEngine = new C2DEngineDocker( + clusterConfig, + null as any, + null as any, + null as any, + {} + ) + + // Use a simple image reference that will default to Docker Hub + const image = 'hello-world:latest' + const manifest = await dockerEngine.getDockerManifest(image, null) + + expect(manifest).to.exist + expect(manifest).to.have.property('schemaVersion') + }).timeout(10000) + }) + + describe('Registry authentication configuration', () => { + it('should store and retrieve username/password credentials', () => { + const testAuth: dockerRegistrysAuth = { + 'https://registry-1.docker.io': { + username: 'testuser', + password: 'testpass', + auth: '' + } + } + + const clusterConfig: C2DClusterInfo = { + type: C2DClusterType.DOCKER, + hash: 'test-cluster-hash-auth', + connection: { + socketPath: '/var/run/docker.sock' + }, + tempFolder: '/tmp/test-docker-auth' + } + + const engineWithAuth = new C2DEngineDocker( + clusterConfig, + null as any, + null as any, + null as any, + testAuth + ) + + // Verify that getDockerRegistryAuth returns the credentials + const auth = (engineWithAuth as any).getDockerRegistryAuth( + 'https://registry-1.docker.io' + ) + expect(auth).to.exist + expect(auth?.username).to.equal('testuser') + expect(auth?.password).to.equal('testpass') + }) + + it('should use auth string when provided', () => { + const preEncodedAuth = Buffer.from('testuser:testpass').toString('base64') + const testAuth: dockerRegistrysAuth = { + 'https://registry-1.docker.io': { + username: 'testuser', + password: 'testpass', + auth: preEncodedAuth + } + } + + const clusterConfig: C2DClusterInfo = { + type: C2DClusterType.DOCKER, + hash: 'test-cluster-hash-auth2', + connection: { + socketPath: '/var/run/docker.sock' + }, + tempFolder: '/tmp/test-docker-auth2' + } + + const engineWithAuth = new C2DEngineDocker( + clusterConfig, + null as any, + null as any, + null as any, + testAuth + ) + + const auth = (engineWithAuth as any).getDockerRegistryAuth( + 'https://registry-1.docker.io' + ) + expect(auth).to.exist + expect(auth?.auth).to.equal(preEncodedAuth) + }) + + it('should return null for non-existent registry auth', () => { + const clusterConfig: C2DClusterInfo = { + type: C2DClusterType.DOCKER, + hash: 'test-cluster-hash-3', + connection: { + socketPath: '/var/run/docker.sock' + }, + tempFolder: '/tmp/test-docker-3' + } + + const dockerEngine = new C2DEngineDocker( + clusterConfig, + null as any, + null as any, + null as any, + {} + ) + + const auth = (dockerEngine as any).getDockerRegistryAuth( + 'https://nonexistent-registry.com' + ) + expect(auth).to.be.null + }) + + it('should handle multiple registry configurations', () => { + const testAuth: dockerRegistrysAuth = { + 'https://registry-1.docker.io': { + username: 'user1', + password: 'pass1', + auth: '' + }, + 'https://ghcr.io': { + username: 'user2', + password: 'pass2', + auth: '' + } + } + + const clusterConfig: C2DClusterInfo = { + type: C2DClusterType.DOCKER, + hash: 'test-cluster-hash-multi', + connection: { + socketPath: '/var/run/docker.sock' + }, + tempFolder: '/tmp/test-docker-multi' + } + + const engineWithAuth = new C2DEngineDocker( + clusterConfig, + null as any, + null as any, + null as any, + testAuth + ) + + const dockerHubAuth = (engineWithAuth as any).getDockerRegistryAuth( + 'https://registry-1.docker.io' + ) + expect(dockerHubAuth).to.exist + expect(dockerHubAuth?.username).to.equal('user1') + + const ghcrAuth = (engineWithAuth as any).getDockerRegistryAuth('https://ghcr.io') + expect(ghcrAuth).to.exist + expect(ghcrAuth?.username).to.equal('user2') + + const unknownAuth = (engineWithAuth as any).getDockerRegistryAuth( + 'https://unknown-registry.com' + ) + expect(unknownAuth).to.be.null + }) + }) + + describe('Error handling', () => { + it('should handle invalid image references gracefully', async () => { + const clusterConfig: C2DClusterInfo = { + type: C2DClusterType.DOCKER, + hash: 'test-cluster-hash-error', + connection: { + socketPath: '/var/run/docker.sock' + }, + tempFolder: '/tmp/test-docker-error' + } + + const dockerEngine = new C2DEngineDocker( + clusterConfig, + null as any, + null as any, + null as any, + {} + ) + + try { + await dockerEngine.getDockerManifest('invalid-image-reference', null) + assert.fail('Should have thrown an error for invalid image') + } catch (error: any) { + expect(error).to.exist + expect(error.message).to.include('Failed to get manifest') + } + }).timeout(10000) + }) + + describe('DockerRegistryAuthSchema validation', () => { + it('should validate schema with only auth string', () => { + const authData = { + auth: Buffer.from('testuser:testpass').toString('base64') + } + + const result = DockerRegistryAuthSchema.safeParse(authData) + expect(result.success).to.be.true + if (result.success) { + expect(result.data.auth).to.equal(authData.auth) + expect(result.data.username).to.be.undefined + expect(result.data.password).to.be.undefined + } + }) + + it('should validate schema with only username and password', () => { + const authData = { + username: 'testuser', + password: 'testpass' + } + + const result = DockerRegistryAuthSchema.safeParse(authData) + expect(result.success).to.be.true + if (result.success) { + expect(result.data.username).to.equal('testuser') + expect(result.data.password).to.equal('testpass') + expect(result.data.auth).to.be.undefined + } + }) + + it('should validate schema with all three fields (auth, username, password)', () => { + const authData = { + username: 'testuser', + password: 'testpass', + auth: Buffer.from('testuser:testpass').toString('base64') + } + + const result = DockerRegistryAuthSchema.safeParse(authData) + expect(result.success).to.be.true + if (result.success) { + expect(result.data.username).to.equal('testuser') + expect(result.data.password).to.equal('testpass') + expect(result.data.auth).to.equal(authData.auth) + } + }) + + it('should reject schema with only username (missing password)', () => { + const authData = { + username: 'testuser' + } + + const result = DockerRegistryAuthSchema.safeParse(authData) + expect(result.success).to.be.false + if (!result.success) { + expect(result.error.errors).to.exist + expect(result.error.errors[0].message).to.include( + "Either 'auth' must be provided, or both 'username' and 'password' must be provided" + ) + } + }) + + it('should reject schema with only password (missing username)', () => { + const authData = { + password: 'testpass' + } + + const result = DockerRegistryAuthSchema.safeParse(authData) + expect(result.success).to.be.false + if (!result.success) { + expect(result.error.errors).to.exist + expect(result.error.errors[0].message).to.include( + "Either 'auth' must be provided, or both 'username' and 'password' must be provided" + ) + } + }) + + it('should reject empty object', () => { + const authData = {} + + const result = DockerRegistryAuthSchema.safeParse(authData) + expect(result.success).to.be.false + if (!result.success) { + expect(result.error.errors).to.exist + expect(result.error.errors[0].message).to.include( + "Either 'auth' must be provided, or both 'username' and 'password' must be provided" + ) + } + }) + + it('should reject empty auth string with no username/password', () => { + const authData = { + auth: '' + } + + const result = DockerRegistryAuthSchema.safeParse(authData) + expect(result.success).to.be.false + if (!result.success) { + expect(result.error.errors).to.exist + expect(result.error.errors[0].message).to.include( + "Either 'auth' must be provided, or both 'username' and 'password' must be provided" + ) + } + }) + + it('should reject empty username with password provided', () => { + const authData = { + username: '', + password: 'testpass' + } + + const result = DockerRegistryAuthSchema.safeParse(authData) + expect(result.success).to.be.false + if (!result.success) { + expect(result.error.errors).to.exist + expect(result.error.errors[0].message).to.include( + "Either 'auth' must be provided, or both 'username' and 'password' must be provided" + ) + } + }) + + it('should reject empty password with username provided', () => { + const authData = { + username: 'testuser', + password: '' + } + + const result = DockerRegistryAuthSchema.safeParse(authData) + expect(result.success).to.be.false + if (!result.success) { + expect(result.error.errors).to.exist + expect(result.error.errors[0].message).to.include( + "Either 'auth' must be provided, or both 'username' and 'password' must be provided" + ) + } + }) + + it('should reject all empty strings', () => { + const authData = { + username: '', + password: '', + auth: '' + } + + const result = DockerRegistryAuthSchema.safeParse(authData) + expect(result.success).to.be.false + if (!result.success) { + expect(result.error.errors).to.exist + expect(result.error.errors[0].message).to.include( + "Either 'auth' must be provided, or both 'username' and 'password' must be provided" + ) + } + }) + }) +}) diff --git a/src/test/integration/imageCleanUp.test.ts b/src/test/integration/imageCleanUp.test.ts index 00245525b..ab3b5304c 100644 --- a/src/test/integration/imageCleanUp.test.ts +++ b/src/test/integration/imageCleanUp.test.ts @@ -188,7 +188,7 @@ describe('Docker Image Cleanup Integration Tests', () => { escrow = {} as Escrow keyManager = {} as KeyManager - dockerEngine = new C2DEngineDocker(clusterConfig, db, escrow, keyManager) + dockerEngine = new C2DEngineDocker(clusterConfig, db, escrow, keyManager, {}) }) it('should track image usage when image is pulled', async () => { diff --git a/src/utils/config/constants.ts b/src/utils/config/constants.ts index 8f99b8bb4..5d9c9cdad 100644 --- a/src/utils/config/constants.ts +++ b/src/utils/config/constants.ts @@ -31,6 +31,7 @@ export const ENV_TO_CONFIG_MAPPING = { ALLOWED_ADMINS: 'allowedAdmins', ALLOWED_ADMINS_LIST: 'allowedAdminsList', DOCKER_COMPUTE_ENVIRONMENTS: 'dockerComputeEnvironments', + DOCKER_REGISTRY_AUTHS: 'dockerRegistrysAuth', P2P_BOOTSTRAP_NODES: 'p2pConfig.bootstrapNodes', P2P_BOOTSTRAP_TIMEOUT: 'p2pConfig.bootstrapTimeout', P2P_BOOTSTRAP_TAGNAME: 'p2pConfig.bootstrapTagName', diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 1139b69c5..4e905871a 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -84,6 +84,31 @@ export const OceanNodeDBConfigSchema = z.object({ dbType: z.string().nullable() }) +export const DockerRegistryAuthSchema = z + .object({ + username: z.string().optional(), + password: z.string().optional(), + auth: z.string().optional() + }) + .refine( + (data) => { + // Either 'auth' is provided, OR both 'username' and 'password' are provided + return ( + (data.auth !== undefined && data.auth !== '') || + (data.username !== undefined && + data.username !== '' && + data.password !== undefined && + data.password !== '') + ) + }, + { + message: + "Either 'auth' must be provided, or both 'username' and 'password' must be provided" + } + ) + +export const DockerRegistrysSchema = z.record(z.string(), DockerRegistryAuthSchema) + export const ComputeResourceSchema = z.object({ id: z.string(), total: z.number().optional(), @@ -256,6 +281,8 @@ export const OceanNodeConfigSchema = z .optional() .default([]), + dockerRegistrysAuth: jsonFromString(DockerRegistrysSchema).optional().default({}), + authorizedDecrypters: addressArrayFromString.optional().default([]), authorizedDecryptersList: jsonFromString(AccessListContractSchema).optional(), diff --git a/src/utils/constants.ts b/src/utils/constants.ts index 632efb277..028a12d15 100644 --- a/src/utils/constants.ts +++ b/src/utils/constants.ts @@ -400,6 +400,11 @@ export const ENVIRONMENT_VARIABLES: Record = { value: process.env.DOCKER_COMPUTE_ENVIRONMENTS, required: false }, + DOCKER_REGISTRY_AUTHS: { + name: 'DOCKER_REGISTRY_AUTHS', + value: process.env.DOCKER_REGISTRY_AUTHS, + required: false + }, DOCKER_SOCKET_PATH: { name: 'DOCKER_SOCKET_PATH', value: process.env.DOCKER_SOCKET_PATH,