diff --git a/docs/design/cwl-input-sandbox-design.md b/docs/design/cwl-input-sandbox-design.md new file mode 100644 index 00000000..b8fc5d32 --- /dev/null +++ b/docs/design/cwl-input-sandbox-design.md @@ -0,0 +1,121 @@ +# Design: Input Sandbox via Replica Map + +## Context + +The `dirac:Job` hint declares input sandbox files via `input_sandbox` source references: + +```yaml +hints: + - class: dirac:Job + input_sandbox: + - source: helper_script + - source: config_files + path: "conf/" +``` + +These reference CWL inputs whose values are files that need to be available on the worker node. The sandbox store handles upload/download of tar archives. + +## Design + +### Client Side (before submission) + +1. User uploads sandbox files via the sandbox store API → receives a sandbox ID (e.g., `SB:SandboxSE|/S3/diracx-sandbox-store/sha256:abc123.tar.zst`) +2. User references the sandbox in their input YAML using a `SB:` prefixed path: + +```yaml +# input.yaml +helper_script: + class: File + path: "SB:SandboxSE|/S3/diracx-sandbox-store/sha256:abc123.tar.zst:helper.sh" + +config_files: + - class: File + path: "SB:SandboxSE|/S3/diracx-sandbox-store/sha256:abc123.tar.zst:config/app.yaml" + - class: File + path: "SB:SandboxSE|/S3/diracx-sandbox-store/sha256:abc123.tar.zst:config/db.yaml" +``` + +The format is `SB::`. + +### Submission (diracx-logic) + +No changes needed. `submit_cwl_jobs` stores the input YAML as-is in `workflow_params`. The `SB:` paths are opaque strings at this stage. + +### Worker Side (JobWrapper) + +Changes in `JobWrapper.__download_input_sandbox`: + +1. Iterate `input_sandbox` sources from the hint +2. For each source, extract the CWL input value from `workflow_params` +3. Detect `SB:` prefixed paths +4. Parse the sandbox ID and relative path: `SB::` +5. Download the sandbox tar (once per unique sandbox ID — cache across sources) +6. Extract to the job directory, respecting the hint's `path` field for subdirectory placement +7. Add a replica map entry mapping the `SB:` path to the local extracted file path + +```python +# Pseudo-code for the worker-side resolution +for ref in job_hint.input_sandbox: + cwl_value = inputs.cwl.get(ref.source) + for file_path in extract_file_paths(cwl_value): + if file_path.startswith("SB:"): + sandbox_id, relative_path = parse_sb_path(file_path) + + # Download + extract sandbox (cached per sandbox_id) + extract_dir = download_and_extract_sandbox(sandbox_id, job_path) + + # Determine local path (respecting hint's path field) + dest_dir = job_path / ref.path if ref.path else job_path + local_path = dest_dir / relative_path + + # Add to replica map for the executor + replica_map[file_path] = local_path +``` + +After this, the `DiracReplicaMapFsAccess` resolves `SB:` paths exactly like `LFN:` paths — the executor doesn't need to know the difference. + +### Replica Map as Universal Resolution Layer + +With this design, the replica map resolves all file types uniformly: + +| Prefix | Source | Resolution | +|--------|--------|------------| +| `LFN:` | Grid storage | `DataManager.getActiveReplicas()` → PFN URLs | +| `SB:` | Sandbox store | Download tar → extract → local file path | +| *(none)* | Local file | Direct path (already on worker) | + +The CWL executor sees only local paths or URLs via the replica map — it never handles sandbox or grid storage directly. + +### Changes Required + +| Location | Change | +|----------|--------| +| `JobWrapper.__download_input_sandbox` | Detect `SB:` paths, download/extract tar, add to replica map | +| `DiracReplicaMapFsAccess._resolve_lfn` | Handle `SB:` prefix in addition to `LFN:` (or rename to `_resolve_path`) | +| `JobWrapper.__build_replica_map` | Accept sandbox entries in addition to LFN entries | + +### Sandbox Caching + +Multiple `input_sandbox` sources may reference the same sandbox tar (same `sandbox_id`, different `relative_path`). The download and extraction should happen once per unique sandbox ID: + +```python +extracted_sandboxes: dict[str, Path] = {} # sandbox_id → extracted directory + +for sandbox_id in unique_sandbox_ids: + if sandbox_id not in extracted_sandboxes: + extracted_sandboxes[sandbox_id] = download_and_extract(sandbox_id, job_path) +``` + +### Parametric Jobs + +For parametric jobs (same CWL, different inputs), sandboxes may be shared: +- Same sandbox tar for all jobs → uploaded once by client, same `SB:` ID in all input YAMLs +- Different sandbox per job → different tars, different `SB:` IDs + +The `workflows` table deduplication handles the CWL. The sandbox store handles file deduplication. `workflow_params` carries the per-job `SB:` references. + +### Open Questions + +1. **Sandbox path format** — is `SB::` the right format, or should we use a different separator? The sandbox ID itself may contain colons. +2. **Tar extraction** — does `download_sandbox` already extract, or do we need to handle `zstd` decompression + tar extraction ourselves? +3. **Sandbox assignment** — does the sandbox need to be assigned to the job via `assign_sandbox_to_job` before the worker can download it? diff --git a/docs/design/cwl-job-submission-design.md b/docs/design/cwl-job-submission-design.md new file mode 100644 index 00000000..f9bc063f --- /dev/null +++ b/docs/design/cwl-job-submission-design.md @@ -0,0 +1,717 @@ +**Status: Draft for Discussion** + +## Context: The Three-Level CWL Model + +DIRAC CWL workflows operate at three levels, determined by **which hints are present** (not by CWL class type): + +| Level | DIRAC Concept | CWL Hint | Determines | +|-------|---------------|----------|------------| +| **Production** | Production/Request | `dirac:Production` | Input dataset sourcing, orchestrates transformations | +| **Transformation** | Transformation | `dirac:Transformation` | Job template — grouping, input queries | +| **Job** | Job | `dirac:Job` *(new, replaces `dirac:Scheduling` + `dirac:ExecutionHooks`)* | Single execution: scheduling, I/O, hooks | + +A CWL Workflow with `dirac:Production` creates Transformations from its steps. Each Transformation is a job template that creates many Jobs. The `dirac:Job` hint lives at the **Job level** — it tells DIRAC how to schedule and manage a single execution. + +This design focuses on the **Job level**: the `dirac:Job` hint and the diracX submission endpoint. + +## Problem Statement + +At the moment in DIRAC, CWL jobs are submitted to DIRAC by: +1. Setting `Executable = "dirac-cwl-exec"` in the JDL +2. Shipping the CWL definition (`job.json`) as an InputSandbox file +3. At runtime on the worker node, `__createCWLJobWrapper` clones the `dirac-cwl` repo, installs pixi, downloads the sandbox, and runs the CWL + +This has several problems: +- The CWL workflow definition is opaque to DIRAC — it's just a sandbox blob, not queryable or inspectable +- `dirac:Scheduling` and `dirac:ExecutionHooks` are separate hints that duplicate concepts already present in JDL (`Site`, `Priority`, `OutputSandbox`, etc.) +- The existing `convert_to_jdl()` in `submission_clients.py` only maps ~40% of available JDL fields +- There is no native diracX API for CWL submission — everything goes through the JDL path +- The worker-node shim `git clone`s dirac-cwl and installs pixi on every job + +## Goals + +1. **Unified `dirac:Job` hint** — replace `dirac:Scheduling` and `dirac:ExecutionHooks` with a single versioned hint +2. **Dedicated `workflows` table** — store CWL definitions once, content-addressed by SHA-256; jobs reference a workflow, not embed it +3. **`job_workflow_params` table** — per-job parameters stored separately, lightweight; 10k parametric jobs = 1 workflow row + 10k param rows +4. **New diracX endpoint** — `POST /api/jobs/` accepts CWL + input YAML(s) directly +5. **Models in diracX** — `JobSubmissionModel`, `JobHint`, and related types live in diracX (migrated from dirac-cwl) +6. **No git clone on worker nodes** — dirac-cwl is installed in the diracX environment; job wrapper is accessed via `importlib.resources` +7. **Fail fast** — strict validation of all CWL ID references, types, and hint fields at submission time + +## Design + +### 1. The `dirac:Job` Hint + +#### Design principles + +1. **Use standard CWL where possible** — don't duplicate what CWL already provides natively via `requirements` +2. **Execution hooks are not user-configured** — they are determined automatically by `type`; the submitter doesn't choose them +3. **Derive what you can** — `job_name` from CWL `label`/`id`, processors from `ResourceRequirement`, etc. +4. **Reference CWL I/O by source ID** — instead of duplicating file lists, use `source:` to point to CWL input/output IDs +5. **Versioned schema** — the hint carries a `schema_version` to enable forward-compatible evolution + +#### What CWL already provides (via `requirements`) + +These standard CWL constructs map directly to JDL fields without needing a `dirac:Job` field: + +| CWL Requirement | CWL Field | JDL Equivalent | Notes | +|-----------------|-----------|----------------|-------| +| `ResourceRequirement` | `coresMin` | `MinNumberOfProcessors` | | +| `ResourceRequirement` | `coresMax` | `MaxNumberOfProcessors` | | +| `ResourceRequirement` | `ramMin` | `MinRAM` | | +| `ResourceRequirement` | `ramMax` | `MaxRAM` | | +| `ToolTimeLimit` | `timelimit` | — | Wall-clock seconds; see [CPUTime](#cputime-and-cpu-work) | +| `DockerRequirement` | `dockerPull` | — | Container support TBD (unrelated to `Platform`) | +| `CUDARequirement` | *(presence)* | `Tags: ["GPU"]` | Implies GPU tag | +| `MPIRequirement` | *(presence)* | — | **Not supported** — raises `NotImplementedError` | +| *(CWL task)* | `label` or `id` | `JobName` | Derived automatically | + +#### CPUTime and CPU work + +DIRAC's `CPUTime` is **normalized CPU work** in HS06-seconds (`wall_time * CPUNormalizationFactor`), not wall-clock time. CWL's `ToolTimeLimit` is wall-clock seconds. These are fundamentally different units. + +**Approach**: The `dirac:Job` hint provides an explicit `cpu_work` field representing normalized HS06-seconds — the same unit DIRAC uses internally. This avoids ambiguity: + +- `cpu_work` in `dirac:Job` → maps directly to JDL `CPUTime` (HS06-seconds) +- `ToolTimeLimit` (if present) → used by cwltool for local execution; **not** translated to `CPUTime` + +The normalization factor itself can be calculated by DB12 on the worker node. Users who think in wall-clock terms can compute `cpu_work = wall_seconds * estimated_HS06_factor`. + +#### What `dirac:Job` adds (DIRAC-specific, no CWL equivalent) + +```yaml +cwlVersion: v1.2 +class: CommandLineTool + +requirements: + - class: ResourceRequirement + coresMin: 1 + coresMax: 4 + ramMin: 2048 # MB + +hints: + - class: dirac:Job + schema_version: "1.0" + + # --- Scheduling --- + priority: 5 + cpu_work: 864000 # HS06-seconds (= CPUTime in JDL) + platform: "x86_64-el9" + sites: + - LCG.CERN.cern + - LCG.IN2P3.fr + banned_sites: + - LCG.RAL.uk + tags: ["GPU"] # additional tags beyond auto-derived ones + + # --- Job metadata --- + type: "User" # determines execution hooks automatically + group: "lhcb_analysis" + log_level: "INFO" + + # --- I/O: reference CWL inputs/outputs by source ID --- + input_sandbox: + - source: helper_script # CWL input ID (type: File) → job root + - source: config_files # CWL input ID (type: File[]) + path: "conf/" # relative to job working directory + input_data: + - source: input_lfns # CWL input ID (type: File[]) + output_sandbox: + - source: stderr_log # CWL output ID + output_data: + - source: result_file # CWL output ID + output_path: "/lhcb/user/r/roneil/output/" + output_se: ["SE-USER"] + - source: histogram # CWL output ID + output_path: "/lhcb/user/r/roneil/histos/" + output_se: ["SE-AUXILIARY"] + +label: "my-analysis-job" # → becomes JobName + +inputs: + - id: helper_script + type: File + - id: config_files + type: File[] + - id: input_lfns + type: File[] + - id: config_param + type: string + +outputs: + - id: result_file + type: File + outputBinding: + glob: "result.root" + - id: histogram + type: File + outputBinding: + glob: "histos.root" + - id: stderr_log + type: File + outputBinding: + glob: "std.err" + +$namespaces: + dirac: "schemas/dirac-metadata.json#/$defs/" +``` + +Note: no `baseCommand` — the executor is always the dirac-cwl runner. The CWL task defines **what** to run; DIRAC handles **how** to run it. + +#### Key design decisions + +**`type` instead of `job_type`**: Shorter, cleaner, and consistent with CWL's own `class:` convention. Maps to JDL `JobType`. Determines execution hooks automatically (see [Execution hooks](#execution-hooks-automatic-not-user-configured)). + +**Sites: `sites` + `banned_sites`** as flat lists: + +```yaml +# Run only at these sites: +sites: + - LCG.CERN.cern + - LCG.IN2P3.fr + +# Exclude specific sites: +banned_sites: + - LCG.RAL.uk +``` + +DIRAC computes the effective set as `Sites - BannedSites`. If `sites` is omitted or empty, the job can run anywhere (equivalent to `Site = ANY`). Both fields are optional flat lists — simple to read and write, no nesting required. The semantics mirror DIRAC's native model directly. + +**I/O by CWL source ID** using CWL-idiomatic `source:` syntax: + +Each I/O entry uses `source:` to reference a CWL input or output by its `id`, mirroring CWL's own `outputSource` convention: + +- `input_sandbox: [{source: helper_script}]` — the CWL input with `id: helper_script` (must be `type: File` or `File[]`) will be uploaded to the DIRAC sandbox store and delivered to the worker node. An optional `path:` specifies a relative directory within the job working directory (e.g., `path: "conf/"` places the file(s) in `/conf/`). If omitted, files land in the job root +- `input_data: [{source: input_lfns}]` — the CWL input with `id: input_lfns` will be resolved as LFN paths and registered as `InputData` in the JDL for data-driven scheduling +- `output_sandbox: [{source: stderr_log}]` — the CWL output with `id: stderr_log` will be uploaded to the sandbox store after execution +- `output_data: [{source: result_file, output_path: "/lhcb/...", output_se: ["SE-USER"]}]` — the CWL output with `id: result_file` will be registered in the file catalog at the given LFN path, on the specified storage element(s) + +The `source:` syntax is: +- **CWL-idiomatic** — consistent with how CWL references inputs/outputs elsewhere +- **Extensible** — per-entry metadata (like `output_se`, `output_path`, `path`) lives alongside the source reference +- **Per-output SE** — each `output_data` entry specifies its own `output_se`, allowing different outputs to go to different storage elements (e.g., large data to tape, small histograms to disk) + +All referenced IDs are **strictly validated** at submission time — the translation layer verifies that each `source` ID exists in the CWL task's inputs/outputs and has a compatible type (`File` or `File[]`). Invalid references fail the submission immediately. + +**Schema versioning**: Every `dirac:Job` hint must carry a `schema_version` field. This enables the system to: +- Reject hints with unsupported versions +- Evolve the schema without breaking existing workflows +- Provide clear error messages when a workflow targets a newer schema version + +#### Field mapping summary + +| Source | Field | JDL Equivalent | In `dirac:Job`? | +|--------|-------|----------------|-----------------| +| CWL `label`/`id` | *(auto)* | `JobName` | No — derived | +| `ResourceRequirement.coresMin` | *(auto)* | `MinNumberOfProcessors` | No — CWL native | +| `ResourceRequirement.coresMax` | *(auto)* | `MaxNumberOfProcessors` | No — CWL native | +| `ResourceRequirement.ramMin` | *(auto)* | `MinRAM` | No — CWL native | +| `ResourceRequirement.ramMax` | *(auto)* | `MaxRAM` | No — CWL native | +| `CUDARequirement` | *(auto)* | `Tags += ["GPU"]` | No — CWL native | +| `MPIRequirement` | — | — | **NotImplementedError** | +| `dirac:Job` | `schema_version` | — | **Yes** — required | +| `dirac:Job` | `cpu_work` | `CPUTime` | **Yes** — HS06-seconds | +| `dirac:Job` | `priority` | `Priority` | **Yes** | +| `dirac:Job` | `platform` | `Platform` | **Yes** | +| `dirac:Job` | `sites` | `Site` | **Yes** | +| `dirac:Job` | `banned_sites` | `BannedSites` | **Yes** | +| `dirac:Job` | `tags` | `Tags` (merged with auto) | **Yes** | +| `dirac:Job` | `type` | `JobType` | **Yes** | +| `dirac:Job` | `group` | `JobGroup` | **Yes** | +| `dirac:Job` | `log_level` | `LogLevel` | **Yes** | +| `dirac:Job` | `input_sandbox[].source` | `InputSandbox` | **Yes** — CWL input IDs | +| `dirac:Job` | `input_sandbox[].path` | *(worker-side)* | **Yes** — relative directory | +| `dirac:Job` | `input_data[].source` | `InputData` | **Yes** — CWL input IDs | +| `dirac:Job` | `output_sandbox[].source` | `OutputSandbox` | **Yes** — CWL output IDs | +| `dirac:Job` | `output_data[].source` | `OutputData` | **Yes** — CWL output ID | +| `dirac:Job` | `output_data[].output_path` | `OutputPath` | **Yes** — per-output LFN path | +| `dirac:Job` | `output_data[].output_se` | `OutputSE` | **Yes** — per-output SE list | +| *(system)* | `Executable` | `dirac-cwl-exec` | No — always set | +| *(system)* | `Owner`, `OwnerGroup`, `VO` | *(from auth)* | No — injected | +| *(system)* | `Status`, `MinorStatus` | *(managed)* | No | +| *(system)* | `JobID` | *(auto)* | No | + +#### Execution hooks: automatic, not user-configured + +Execution hooks are **derived from `type`**: + +- `type: "User"` → `QueryBasedPlugin` (default) +- `type: "MCSimulation"` → VO-specific simulation plugin +- etc. + +The hook plugin registry (currently in dirac-cwl, eventually migrated to diracX as entrypoints) handles discovery by VO and type. The `dirac:Job` hint does **not** expose `hook_plugin` or `hook_config` — these are internal to the system. + +`output_data` (with per-output `output_se` and `output_path`) and `output_sandbox` remain in `dirac:Job` because they are user-specified data management choices, not hook configuration. + +### 2. Storage Model: `workflows` + `job_workflow_params` + +Instead of embedding CWL into each JDL row (which would duplicate the CWL blob across thousands of parametric jobs), CWL definitions are stored **once** in a dedicated table, and jobs reference them. + +#### Schema + +```sql +CREATE TABLE workflows ( + workflow_id CHAR(64) PRIMARY KEY, -- SHA-256 of the CWL content + cwl MEDIUMTEXT NOT NULL, -- CWL YAML (original, uncompressed) + persistent BOOL NOT NULL DEFAULT FALSE, + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +-- New columns on the existing Jobs table: +ALTER TABLE Jobs + ADD COLUMN workflow_id CHAR(64) DEFAULT NULL, -- FK → workflows.workflow_id + ADD COLUMN workflow_params JSON DEFAULT NULL, -- immutable per-job input parameters + ADD FOREIGN KEY (workflow_id) REFERENCES workflows(workflow_id); +``` + +`workflow_params` is an **immutable** JSON column — once set at job creation, it is never updated. It holds the per-job CWL input parameters (the content of an input YAML). Non-CWL jobs leave both columns `NULL`. + +#### How it works + +1. **Workflow insertion** — on submission, the CWL content is SHA-256 hashed. If the hash already exists in `workflows`, the insert is skipped (content-addressed, immutable). Workflows are never edited — a changed CWL produces a new hash. + +2. **Job creation** — each job row in `Jobs` gets a `workflow_id` reference and its own `workflow_params` JSON. This is where per-job variation lives — co-located with the job, no extra join needed. + +3. **Parametric jobs** — submitting 10k jobs with the same CWL but different parameters produces: + - 1 row in `workflows` (insert-if-not-exists) + - 10k rows in `Jobs` with the same `workflow_id` but different `workflow_params` (lightweight JSON) + +4. **`persistent` flag** — controls cleanup behavior: + - `persistent = FALSE` (default): ad-hoc user jobs; workflow row can be cleaned up when no jobs reference it + - `persistent = TRUE`: production/transformation workflows; retained indefinitely + +5. **Worker-side retrieval** — the job wrapper fetches the CWL via diracX API using the `workflow_id` from its `Jobs` row, and reads input parameters from `workflow_params`. No sandbox involved for the workflow definition. + +#### Parameter mapping via `dirac:Job` hint + +The `dirac:Job` hint tells DIRAC which CWL inputs should be promoted to job-level parameters visible to the scheduler: + +```yaml +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + # ... scheduling fields ... + + # Which CWL inputs become job-visible parameters + input_data: + - source: input_lfns # CWL input ID → resolved to InputData for scheduling + input_sandbox: + - source: helper_script # CWL input ID → files uploaded to sandbox +``` + +At submission time, the translation layer reads these mappings to populate JDL fields (for the transition period) or job attributes (post-JDL) from the per-job parameters. + +#### Why not CWL-in-JDL? + +| Concern | CWL-in-JDL (previous) | `workflows` table (current) | +|---------|----------------------|----------------------------| +| 10k parametric jobs | 10k copies of compressed CWL | 1 workflow row + 10k param rows | +| Storage | ~16MB CWL blob per JDL row | CWL stored once | +| Queryability | Opaque base64 blob | CWL stored as readable YAML | +| Immutability | Mutable (JDL can be updated) | Content-addressed, immutable | +| Cleanup | Tied to JDL lifecycle | Independent lifecycle via `persistent` flag | + +### 3. New diracX Endpoint: `POST /api/jobs/` + +#### Request format + +The endpoint accepts a **CWL workflow file** plus one or more **input YAML files**. Each input YAML produces a separate job: + +``` +POST /api/jobs/ +Content-Type: multipart/form-data + +workflow: # CWL workflow/tool definition (YAML) +inputs[]: # Input parameters for job 1 +inputs[]: # Input parameters for job 2 +``` + +This produces 2 jobs: +- Job 1: run `wf.cwl` with `input1.yaml` +- Job 2: run `wf.cwl` with `input2.yaml` + +If no input files are provided, a single job is created with no inputs (suitable for tools with no required inputs or all defaults). + +#### Translation flow + +``` +POST /api/jobs/ + │ + ▼ +Router (diracx-routers) + │ Parses multipart: CWL YAML + input YAML(s) + │ Validates CWL via JobSubmissionModel (pydantic) + │ Validates schema_version + ▼ +Logic (diracx-logic) + │ SHA-256 hash CWL → INSERT INTO workflows IF NOT EXISTS + │ For each input YAML: + │ Extracts dirac:Job hint from CWL task + │ Extracts ResourceRequirement, CUDARequirement, etc. + │ Derives JobName from CWL label/id + │ Resolves I/O: source IDs → file paths/LFNs (strict validation) + │ Maps all → JDL fields (transition period) + │ Calls existing submit_jdl_jobs() with generated JDL + │ Sets workflow_id + workflow_params (JSON) on Jobs row + ▼ +DB + │ CWL stored once in workflows table + │ JDL stored in JobJDLs (transition period) + │ Job attrs + workflow_id + workflow_params in Jobs table + ▼ +Returns list[InsertedJob] +``` + +During the transition period, JDL is still generated for compatibility with existing DIRAC infrastructure (matcher, optimizer, etc.). The `workflows` table + `workflow_params` column are the source of truth for the CWL definition and per-job parameters. Once JDL is fully retired, the JDL generation step is removed. + +#### Translation logic (new functions in diracx-logic) + +```python +import hashlib +import json + +from cwl_utils.parser import save +from cwl_utils.parser.cwl_v1_2 import ( + ResourceRequirement, CUDARequirement, MPIRequirement, +) + + +SUPPORTED_SCHEMA_VERSIONS = {"1.0"} + + +def compute_workflow_id(cwl_yaml: str) -> str: + """Content-address a CWL workflow by its SHA-256 hash.""" + return hashlib.sha256(cwl_yaml.encode()).hexdigest() + + +async def submit_cwl_jobs( + cwl_yaml: str, + input_yamls: list[str], + db: JobDB, +) -> list[InsertedJob]: + """Submit CWL jobs: store workflow once, create one job per input YAML.""" + workflow_id = compute_workflow_id(cwl_yaml) + + # INSERT IF NOT EXISTS — idempotent, content-addressed + await db.insert_workflow(workflow_id, cwl_yaml, persistent=False) + + task = parse_cwl(cwl_yaml) + job_hint = JobHint.from_cwl(task) + + if job_hint.schema_version not in SUPPORTED_SCHEMA_VERSIONS: + raise ValueError( + f"Unsupported dirac:Job schema_version '{job_hint.schema_version}'. " + f"Supported: {SUPPORTED_SCHEMA_VERSIONS}" + ) + + inserted = [] + for input_yaml in input_yamls: + inputs = parse_inputs(input_yaml) if input_yaml else None + workflow_params = json.loads(input_yaml) if input_yaml else None + + # Generate JDL for transition period + jdl = cwl_to_jdl(task, job_hint, inputs) + + # Submit via existing pipeline + jobs = await submit_jdl_jobs([jdl]) + + # Set workflow reference + immutable params on job row + for job in jobs: + await db.set_workflow_ref( + job.job_id, + workflow_id=workflow_id, + workflow_params=workflow_params, + ) + inserted.extend(jobs) + + return inserted + + +def cwl_to_jdl( + task: CommandLineTool | Workflow | ExpressionTool, + job_hint: JobHint, + inputs: JobInputModel | None, +) -> str: + """Convert a CWL task with dirac:Job hint into a JDL string. + + This is a transition-period function — once JDL is retired, + job attributes are populated directly from the hint + CWL. + """ + jdl_fields = { + "Executable": "dirac-cwl-exec", + "JobType": job_hint.type, + "Priority": job_hint.priority, + "LogLevel": job_hint.log_level, + } + + if job_hint.cpu_work: + jdl_fields["CPUTime"] = job_hint.cpu_work + if job_hint.platform: + jdl_fields["Platform"] = job_hint.platform + + # Derive JobName from CWL label/id + task_label = getattr(task, "label", None) + task_id = getattr(task, "id", None) + if task_label: + jdl_fields["JobName"] = task_label + elif task_id and task_id != ".": + jdl_fields["JobName"] = task_id.split("#")[-1].split("/")[-1] + + # Extract from CWL requirements (standard CWL, not dirac:Job) + tags = set(job_hint.tags or []) + for req in (getattr(task, "requirements", None) or []): + if isinstance(req, ResourceRequirement): + if req.coresMin: + jdl_fields["MinNumberOfProcessors"] = int(req.coresMin) + if req.coresMax: + jdl_fields["MaxNumberOfProcessors"] = int(req.coresMax) + if req.ramMin: + jdl_fields["MinRAM"] = int(req.ramMin) + if req.ramMax: + jdl_fields["MaxRAM"] = int(req.ramMax) + elif isinstance(req, CUDARequirement): + tags.add("GPU") + elif isinstance(req, MPIRequirement): + raise NotImplementedError( + "MPIRequirement is not yet supported for DIRAC CWL jobs" + ) + + # Auto-derive processor tags + min_proc = jdl_fields.get("MinNumberOfProcessors", 1) + max_proc = jdl_fields.get("MaxNumberOfProcessors") + if min_proc and min_proc > 1: + tags.add("MultiProcessor") + if min_proc and max_proc and min_proc == max_proc: + tags.add(f"{min_proc}Processors") + + if tags: + jdl_fields["Tags"] = list(tags) + + # Sites + if job_hint.sites: + jdl_fields["Site"] = job_hint.sites + if job_hint.banned_sites: + jdl_fields["BannedSites"] = job_hint.banned_sites + + if job_hint.group: + jdl_fields["JobGroup"] = job_hint.group + + # Resolve I/O from CWL input/output source IDs + cwl_input_ids = {_extract_id(inp.id): inp for inp in (task.inputs or [])} + cwl_output_ids = {_extract_id(out.id): out for out in (task.outputs or [])} + + # InputSandbox + if job_hint.input_sandbox: + sandbox_files = [] + for ref in job_hint.input_sandbox: + _validate_cwl_id(ref.source, cwl_input_ids, "input", ["File", "File[]"]) + if inputs and ref.source in inputs.cwl: + sandbox_files.extend(_extract_file_paths(inputs.cwl[ref.source])) + if sandbox_files: + jdl_fields["InputSandbox"] = sandbox_files + + # InputData + if job_hint.input_data: + lfns = [] + for ref in job_hint.input_data: + _validate_cwl_id(ref.source, cwl_input_ids, "input", ["File", "File[]"]) + if inputs and ref.source in inputs.cwl: + lfns.extend(_extract_lfn_paths(inputs.cwl[ref.source])) + if lfns: + jdl_fields["InputData"] = lfns + + # OutputSandbox + if job_hint.output_sandbox: + sandbox_outputs = [] + for ref in job_hint.output_sandbox: + _validate_cwl_id(ref.source, cwl_output_ids, "output", ["File", "File[]"]) + out = cwl_output_ids[ref.source] + if hasattr(out, "outputBinding") and out.outputBinding: + sandbox_outputs.append(out.outputBinding.glob) + if sandbox_outputs: + jdl_fields["OutputSandbox"] = sandbox_outputs + + # OutputData (per-output SE and path) + if job_hint.output_data: + output_files = [] + all_ses = set() + for entry in job_hint.output_data: + _validate_cwl_id(entry.source, cwl_output_ids, "output", ["File", "File[]"]) + out = cwl_output_ids[entry.source] + if hasattr(out, "outputBinding") and out.outputBinding: + output_files.append(out.outputBinding.glob) + all_ses.update(entry.output_se) + if output_files: + jdl_fields["OutputData"] = output_files + jdl_fields["OutputPath"] = job_hint.output_data[0].output_path + jdl_fields["OutputSE"] = list(all_ses) + + return format_as_jdl(jdl_fields) + + +def _extract_id(cwl_id: str) -> str: + """Extract short ID from CWL full URI (e.g., 'file.cwl#input1' → 'input1').""" + return cwl_id.split("#")[-1].split("/")[-1] +``` + +### 4. Changes to DIRAC Worker-Side Execution + +#### Current flow (`__createCWLJobWrapper`): +``` +git clone dirac-cwl → install pixi → download sandbox (gets job.json) → run wrapper +``` + +#### New flow: +``` +Fetch CWL from workflows table (via diracX API) → read workflow_params from Jobs row → write job.json → run job wrapper (via importlib.resources) +``` + +Since dirac-cwl is installed as a package in the diracX environment, the job wrapper template is accessed via `importlib.resources` — no git clone or pixi install needed. + +Changes in `__createCWLJobWrapper` in `Utils.py`: + +1. Accept `jobParams` as a parameter (already available in `createJobWrapper`) +2. Fetch CWL definition from diracX API using `workflow_id` from the job +3. Read `workflow_params` (per-job input parameters) from the job +4. Write CWL + params to local files (`task.cwl`, `params.json`) +5. Remove the git clone, pixi install, and `dirac-wms-job-get-input` steps +6. Load the job wrapper template via `importlib.resources.files("dirac_cwl.job")` +7. InputSandbox is still used for actual input **files** — just not for the workflow definition + +### 5. Pydantic Models (in diracX) + +These models live in diracX (migrated from dirac-cwl). The old `SchedulingHint` and `ExecutionHooksHint` classes are removed — there is no backward compatibility layer. + +```python +class IOSource(BaseModel): + """Reference to a CWL input or output by its ID.""" + source: str # CWL input/output ID + path: str | None = None # relative path within job working directory (input_sandbox only) + + +class OutputDataEntry(BaseModel): + """Output data entry with per-output SE and LFN path.""" + source: str # CWL output ID + output_path: str # LFN destination path + output_se: list[str] = ["SE-USER"] + + +class JobHint(BaseModel, Hint): + """Unified DIRAC-specific hint for job scheduling and I/O. + + Resource requirements (cores, RAM) are expressed via standard CWL + requirements, not in this hint. + + Execution hooks are determined automatically by `type`, not + configured by the submitter. + + I/O fields reference CWL input/output IDs via `source:` syntax, + consistent with CWL's own referencing conventions. + """ + schema_version: str # required, e.g. "1.0" + + # Scheduling (DIRAC-specific, no CWL equivalent) + priority: int = 5 + cpu_work: int | None = None # HS06-seconds → JDL CPUTime + platform: str | None = None + sites: list[str] | None = None + banned_sites: list[str] | None = None + tags: list[str] | None = None # merged with auto-derived tags + + # Job metadata + type: str = "User" + group: str = "" + log_level: str = "INFO" + + # I/O: reference CWL input/output IDs via source: + input_sandbox: list[IOSource] = [] + input_data: list[IOSource] = [] + output_sandbox: list[IOSource] = [] + output_data: list[OutputDataEntry] = [] + + @classmethod + def from_cwl(cls, cwl_object) -> "JobHint": + hints = getattr(cwl_object, "hints", []) or [] + for hint in hints: + if hint.get("class") == "dirac:Job": + data = {k: v for k, v in hint.items() if k != "class"} + return cls(**data) + raise ValueError("CWL task is missing required dirac:Job hint") +``` + +### 6. Summary of Changes by Repository + +#### diracx (primary) +| Change | Location | +|--------|----------| +| `JobHint`, `IOSource`, `OutputDataEntry` models | `diracx-core/src/diracx/core/models/` | +| `JobSubmissionModel`, `JobInputModel` models | `diracx-core/src/diracx/core/models/` | +| `workflows` table schema | `diracx-db/src/diracx/db/sql/job/schema.py` | +| `workflow_id` + `workflow_params` columns on `Jobs` table | `diracx-db/src/diracx/db/sql/job/schema.py` | +| New router `POST /api/jobs/` (multipart CWL + inputs) | `diracx-routers/src/diracx/routers/jobs/submission.py` | +| `GET /api/workflows/{workflow_id}` endpoint | `diracx-routers/src/diracx/routers/jobs/` | +| `submit_cwl_jobs()` + `cwl_to_jdl()` logic | `diracx-logic/src/diracx/logic/jobs/submission.py` | +| Job wrapper template (migrated from dirac-cwl) | `diracx-logic/src/diracx/logic/jobs/` | +| `dirac-cwl` as dependency | `pyproject.toml` | + +#### dirac-cwl +| Change | Location | +|--------|----------| +| Remove `SchedulingHint` + `ExecutionHooksHint` | `execution_hooks/core.py` | +| Remove `convert_to_jdl()` | `job/submission_clients.py` | +| Update `JobWrapper.run_job()` to resolve hooks from `type` | `job/job_wrapper.py` | +| Update schema with `Job` definition (versioned) | `schemas/dirac-metadata.json` | +| Models migrated to diracX — remove from dirac-cwl | `submission_models.py` | + +#### DIRAC +| Change | Location | +|--------|----------| +| Modify `__createCWLJobWrapper` to fetch CWL via diracX API + read `workflow_params` | `WorkloadManagementSystem/Utilities/Utils.py` | +| Remove git clone + pixi install from bash wrapper | Same file | +| Load job wrapper via `importlib.resources` | Same file | + +### 7. Migration Path + +**Phase 1** (this work): +- Implement `workflows` table and `workflow_id`/`workflow_params` columns on `Jobs` +- Implement `dirac:Job` hint and models in diracX +- Implement `POST /api/jobs/` endpoint + `GET /api/workflows/{workflow_id}` +- Implement `cwl_to_jdl()` transition shim +- Modify DIRAC `__createCWLJobWrapper` — remove git clone, use importlib.resources, fetch CWL from API +- Remove old hints from dirac-cwl + +**Phase 2** (future, per production-plugin-system.md): +- `dirac:Transformation` hint → transformation submission endpoint +- `dirac:Production` hint → production orchestration endpoint +- Migrate execution hooks plugin registry to diracX entrypoints + +### 8. Decisions Made + +| # | Decision | Rationale | +|---|----------|-----------| +| D1 | Models live in diracX, not dirac-cwl | Natural migration path; dirac-cwl components follow | +| D2 | Endpoint accepts raw CWL YAML + input YAMLs | User-friendly; `wf.cwl + input1.yaml + input2.yaml` → N jobs | +| D3 | Endpoint is `POST /api/jobs/` (not `/api/jobs/cwl`) | CWL becomes the primary submission format | +| D4 | `Platform` is CPU architecture (unrelated to containers) | Container support is a separate discussion | +| D5 | No git clone on worker nodes | dirac-cwl installed in diracX; wrapper via `importlib.resources` | +| D6 | Hook registry starts in dirac-cwl, migrates to diracX entrypoints | Incremental migration | +| D7 | `cpu_work` (HS06-seconds) in hint; `ToolTimeLimit` for local cwltool only | Avoids unit ambiguity; DB12 calculates normalization factor | +| D8 | Strict I/O validation — fail fast at submission | Prevents wasting CPU on 10^3-10^5 jobs with bad references | +| D9 | No backward compatibility with old hints | Nothing in production; simplifies implementation | +| D10 | Hints carry `schema_version` | Forward-compatible evolution | +| D11 | CWL stored in `workflows` table, not embedded in JDL | Content-addressed (SHA-256), immutable; 10k parametric jobs share 1 workflow row | +| D12 | Per-job params as immutable JSON column on `Jobs` table | Co-located with job, no extra join; immutable once set | +| D13 | JDL generation is a transition-period shim, not the target | `workflows` + `workflow_params` are source of truth; JDL generated for existing DIRAC infra compatibility | + +### 9. Open Questions + +1. **`ToolTimeLimit` in cwl_utils**: Need to verify cwl_utils parses it. If available, it can be used for local execution wall-clock limits alongside `cpu_work` for DIRAC scheduling. To be investigated. + +2. **Container support**: How to run containerized jobs within DIRAC. Unrelated to `Platform` (which is CPU architecture). Separate design needed. + +3. **Multipart API design**: Exact multipart field naming and how to handle optional inputs (no input YAML = single job with defaults). Also: should the endpoint support JSON as an alternative to YAML? + +4. **Input YAML templating**: The endpoint naturally supports `wf.cwl + N input YAMLs → N jobs`. Future extension could support templating (e.g., parameter sweeps) — to be designed separately. + +5. **Workflow cleanup policy**: When `persistent = FALSE`, what triggers cleanup of orphaned workflow rows? Options: periodic GC that checks for referencing jobs, TTL-based expiry, or tied to existing job cleanup routines. diff --git a/src/dirac_cwl/commands/__init__.py b/src/dirac_cwl/commands/__init__.py index 01e8b170..e16d65fb 100644 --- a/src/dirac_cwl/commands/__init__.py +++ b/src/dirac_cwl/commands/__init__.py @@ -1,5 +1,6 @@ """Command classes for workflow pre/post-processing operations.""" from .core import PostProcessCommand, PreProcessCommand +from .store_output import StoreOutputCommand -__all__ = ["PreProcessCommand", "PostProcessCommand"] +__all__ = ["PreProcessCommand", "PostProcessCommand", "StoreOutputCommand"] diff --git a/src/dirac_cwl/commands/core.py b/src/dirac_cwl/commands/core.py index cc2ede3a..98498422 100644 --- a/src/dirac_cwl/commands/core.py +++ b/src/dirac_cwl/commands/core.py @@ -13,7 +13,7 @@ class CommandBase(ABC): """ @abstractmethod - def execute(self, job_path: Path, **kwargs) -> None: + async def execute(self, job_path: Path, **kwargs) -> None: """Execute the command in the given job path. :param job_path: Path to the job working directory. diff --git a/src/dirac_cwl/commands/download_config.py b/src/dirac_cwl/commands/download_config.py index 4fa14971..45e84ba6 100644 --- a/src/dirac_cwl/commands/download_config.py +++ b/src/dirac_cwl/commands/download_config.py @@ -8,7 +8,7 @@ class DownloadConfig(PreProcessCommand): """Example command that creates a file with named 'content.cfg'.""" - def execute(self, job_path, **kwargs): + async def execute(self, job_path, **kwargs): """Execute the configuration download. :param job_path: Path to the job working directory. diff --git a/src/dirac_cwl/commands/group_outputs.py b/src/dirac_cwl/commands/group_outputs.py index d58e84ad..120052ae 100644 --- a/src/dirac_cwl/commands/group_outputs.py +++ b/src/dirac_cwl/commands/group_outputs.py @@ -9,7 +9,7 @@ class GroupOutputs(PostProcessCommand): """Example command that merges all of the outputs in a singular file.""" - def execute(self, job_path, **kwargs): + async def execute(self, job_path, **kwargs): """Execute the output file grouping. :param job_path: Path to the job working directory. diff --git a/src/dirac_cwl/commands/store_output.py b/src/dirac_cwl/commands/store_output.py new file mode 100644 index 00000000..d1d8ec35 --- /dev/null +++ b/src/dirac_cwl/commands/store_output.py @@ -0,0 +1,72 @@ +"""Post-processing command that stores output files to grid storage.""" + +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import Any, Sequence + +from DIRAC.DataManagementSystem.Client.DataManager import DataManager # type: ignore[import-untyped] +from DIRACCommon.Core.Utilities.ReturnValues import returnSingleResult # type: ignore[import-untyped] + +from dirac_cwl.commands import PostProcessCommand +from dirac_cwl.mocks.data_manager import MockDataManager + +logger = logging.getLogger(__name__) + + +class StoreOutputCommand(PostProcessCommand): + """Store output files to grid storage elements via DataManager. + + Replaces the output storage logic previously in ExecutionHooksBasePlugin. + """ + + def __init__( + self, + output_paths: dict[str, str], + output_se: list[str], + ) -> None: + self._output_paths = output_paths + self._output_se = output_se + if os.getenv("DIRAC_PROTO_LOCAL") == "1": + self._datamanager: DataManager = MockDataManager() + else: + self._datamanager = DataManager() + + async def execute(self, job_path: Path, **kwargs: Any) -> None: + """Store output files to grid storage. + + :param job_path: Path to the job working directory. + :param kwargs: Must include 'outputs' dict mapping output names to file paths. + """ + outputs: dict[str, str | Path | Sequence[str | Path]] = kwargs.get("outputs", {}) + + for output_name, src_path in outputs.items(): + if not src_path: + raise RuntimeError( + f"src_path parameter required for filesystem storage of {output_name}" + ) + + lfn = self._output_paths.get(output_name, None) + + if lfn: + logger.info("Storing output %s, with source %s", output_name, src_path) + if isinstance(src_path, (str, Path)): + src_path = [src_path] + for src in src_path: + file_lfn = Path(lfn) / Path(src).name + res = None + for se in self._output_se: + res = returnSingleResult( + self._datamanager.putAndRegister(str(file_lfn), src, se) + ) + if res["OK"]: + logger.info( + "Successfully saved file %s with LFN %s", src, file_lfn + ) + break + if res and not res["OK"]: + raise RuntimeError( + f"Could not save file {src} with LFN {str(lfn)} : {res['Message']}" + ) diff --git a/src/dirac_cwl/job/job_wrapper.py b/src/dirac_cwl/job/job_wrapper.py index b69ff0d9..17c6e1b5 100644 --- a/src/dirac_cwl/job/job_wrapper.py +++ b/src/dirac_cwl/job/job_wrapper.py @@ -28,16 +28,15 @@ from rich.text import Text from ruamel.yaml import YAML -from dirac_cwl.commands import PostProcessCommand, PreProcessCommand +from dirac_cwl.commands import PostProcessCommand, PreProcessCommand, StoreOutputCommand from dirac_cwl.core.exceptions import WorkflowProcessingException from dirac_cwl.core.utility import get_lfns -from dirac_cwl.execution_hooks import ExecutionHooksHint -from dirac_cwl.execution_hooks.core import ExecutionHooksBasePlugin from dirac_cwl.job.job_report import JobMinorStatus, JobReport, JobStatus from dirac_cwl.submission_models import ( JobInputModel, JobModel, ) +from diracx.core.models.cwl import JobHint if os.getenv("DIRAC_PROTO_LOCAL") == "1": from dirac_cwl.mocks.sandbox import create_sandbox, download_sandbox # type: ignore[no-redef] @@ -58,7 +57,9 @@ class JobWrapper: def __init__(self, job_id: int) -> None: """Initialize the job wrapper.""" - self._execution_hooks_plugin: ExecutionHooksBasePlugin | None = None + self._preprocess_commands: list[PreProcessCommand] = [] + self._postprocess_commands: list[PostProcessCommand] = [] + self._output_sandbox: list[str] = [] self._job_path: Path = Path() self._job_id = job_id src = "JobWrapper" @@ -78,9 +79,6 @@ async def __download_input_sandbox(self, arguments: JobInputModel, job_path: Pat """ assert arguments.sandbox is not None self._job_report.set_job_status(minor_status=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) - if not self._execution_hooks_plugin: - self._job_report.set_job_status(minor_status=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX) - raise RuntimeError("Could not download sandboxes") for sandbox in arguments.sandbox: await download_sandbox(sandbox, job_path) @@ -88,16 +86,10 @@ async def __upload_output_sandbox( self, outputs: dict[str, str | Path | Sequence[str | Path]], ): - if not self._execution_hooks_plugin: - raise RuntimeError("Could not upload sandbox : Execution hook is not defined.") - outputs_to_sandbox = [] for output_name, src_path in outputs.items(): - if ( - self._execution_hooks_plugin.output_sandbox - and output_name in self._execution_hooks_plugin.output_sandbox - ): - if isinstance(src_path, Path) or isinstance(src_path, str): + if self._output_sandbox and output_name in self._output_sandbox: + if isinstance(src_path, (Path, str)): src_path = [Path(src_path)] for path in src_path: outputs_to_sandbox.append(Path(path)) @@ -105,7 +97,7 @@ async def __upload_output_sandbox( self._job_report.set_job_status(JobStatus.COMPLETING, minor_status=JobMinorStatus.UPLOADING_OUTPUT_SANDBOX) sb_path = Path(await create_sandbox(outputs_to_sandbox)) logger.info( - "Successfully stored output %s in Sandbox %s", self._execution_hooks_plugin.output_sandbox, sb_path + "Successfully stored output %s in Sandbox %s", self._output_sandbox, sb_path ) await self._diracx_client.jobs.assign_sandbox_to_job(self._job_id, f'"{sb_path}"') self._job_report.set_job_status(JobStatus.COMPLETING, minor_status=JobMinorStatus.OUTPUT_SANDBOX_UPLOADED) @@ -122,17 +114,22 @@ async def __download_input_data(self, inputs: JobInputModel, job_path: Path) -> A dictionary mapping each input name to the corresponding downloaded file path(s) located in the working directory. """ + from DIRAC.DataManagementSystem.Client.DataManager import DataManager # type: ignore[import-untyped] + from dirac_cwl.mocks.data_manager import MockDataManager + new_paths: dict[str, Path | list[Path]] = {} self._job_report.set_job_status(minor_status=JobMinorStatus.INPUT_DATA_RESOLUTION) - if not self._execution_hooks_plugin: - raise RuntimeWarning("Could not download input data: Execution hook is not defined.") + if os.getenv("DIRAC_PROTO_LOCAL") == "1": + datamanager: DataManager = MockDataManager() + else: + datamanager = DataManager() lfns_inputs = get_lfns(inputs.cwl) if lfns_inputs: for input_name, lfns in lfns_inputs.items(): - res = returnValueOrRaise(self._execution_hooks_plugin._datamanager.getFile(lfns, str(job_path))) + res = returnValueOrRaise(datamanager.getFile(lfns, str(job_path))) if res["Failed"]: raise RuntimeError(f"Could not get files : {res['Failed']}") paths = res["Successful"] @@ -235,8 +232,8 @@ async def pre_process( YAML().dump(parameter_dict, parameter_file) command.append(str(parameter_path.name)) - if self._execution_hooks_plugin: - return self.__pre_process_hooks(executable, arguments, self._job_path, command) + if self._preprocess_commands: + await self.__run_preprocess_commands(self._job_path) await self._job_report.commit() return command @@ -263,98 +260,61 @@ async def post_process( success = True - if self._execution_hooks_plugin: - success = await self.__post_process_hooks(self._job_path, outputs=outputs) + if self._postprocess_commands: + success = await self.__run_postprocess_commands(self._job_path, outputs=outputs) await self.__upload_output_sandbox(outputs=outputs) await self._job_report.commit() return success - def __pre_process_hooks( - self, - executable: CommandLineTool | Workflow | ExpressionTool, - arguments: Any | None, - job_path: Path, - command: List[str], - **kwargs: Any, - ) -> List[str]: - """Pre-process job inputs and command before execution. - - :param CommandLineTool | Workflow | ExpressionTool executable: - The CWL tool, workflow, or expression to be executed. - :param JobInputModel arguments: - The job inputs, including CWL and LFN data. - :param Path job_path: - Path to the job working directory. - :param list[str] command: - The command to be executed, which will be modified. - :param Any **kwargs: - Additional parameters, allowing extensions to pass extra context - or configuration options. - - :return list[str]: - The modified command, typically including the serialized CWL - input file path. - """ - if not self._execution_hooks_plugin: - raise RuntimeWarning("Could not run pre_process_hooks: Execution hook is not defined.") - - for preprocess_command in self._execution_hooks_plugin.preprocess_commands: - if not issubclass(preprocess_command, PreProcessCommand): - msg = f"The command {preprocess_command} is not a {PreProcessCommand.__name__}" - logger.error(msg) - raise TypeError(msg) - + async def __run_preprocess_commands(self, job_path: Path, **kwargs: Any) -> None: + """Run all pre-process commands.""" + for cmd in self._preprocess_commands: try: - preprocess_command().execute(job_path, **kwargs) + await cmd.execute(job_path, **kwargs) except Exception as e: - msg = f"Command '{preprocess_command.__name__}' failed during the pre-process stage: {e}" + msg = f"Command '{type(cmd).__name__}' failed during the pre-process stage: {e}" logger.exception(msg) raise WorkflowProcessingException(msg) from e - return command - - async def __post_process_hooks( + async def __run_postprocess_commands( self, job_path: Path, outputs: dict[str, str | Path | Sequence[str | Path]] = {}, **kwargs: Any, ) -> bool: - """Post-process job outputs. - - :param Path job_path: - Path to the job working directory. - :param str|None stdout: - cwltool standard output. - :param Any **kwargs: - Additional keyword arguments for extensibility. - """ - if not self._execution_hooks_plugin: - raise RuntimeWarning("Could not run post_process_hooks: Execution hook is not defined.") - - for postprocess_command in self._execution_hooks_plugin.postprocess_commands: - if not issubclass(postprocess_command, PostProcessCommand): - msg = f"The command {postprocess_command} is not a {PostProcessCommand.__name__}" - logger.error(msg) - raise TypeError(msg) - + """Run all post-process commands.""" + for cmd in self._postprocess_commands: try: - postprocess_command().execute(job_path, **kwargs) + await cmd.execute(job_path, outputs=outputs, **kwargs) except Exception as e: - msg = f"Command '{postprocess_command.__name__}' failed during the post-process stage: {e}" + msg = f"Command '{type(cmd).__name__}' failed during the post-process stage: {e}" logger.exception(msg) raise WorkflowProcessingException(msg) from e + return True - self._job_report.set_job_status(minor_status=JobMinorStatus.UPLOADING_OUTPUT_DATA) - try: - await self._execution_hooks_plugin.store_output(outputs) - self._job_report.set_job_status( - status=JobStatus.COMPLETING, minor_status=JobMinorStatus.OUTPUT_DATA_UPLOADED + def _build_commands_from_hint(self, job_hint: JobHint) -> None: + """Build pre/post-process commands from the dirac:Job hint. + + The ``type`` field determines which commands are attached. + I/O config from the hint is used to configure commands. + """ + # Extract I/O config from the hint + output_paths = { + entry.source: entry.output_path for entry in job_hint.output_data + } + output_se = [] + for entry in job_hint.output_data: + output_se.extend(entry.output_se) + output_se = list(set(output_se)) + + self._output_sandbox = [ref.source for ref in job_hint.output_sandbox] + + # Build post-process commands — output storage + if output_paths: + self._postprocess_commands.append( + StoreOutputCommand(output_paths=output_paths, output_se=output_se) ) - except RuntimeError as err: - self._job_report.set_job_status(status=JobStatus.FAILED, minor_status=JobMinorStatus.UPLOADING_OUTPUT_DATA) - raise err - return True async def run_job(self, job: JobModel) -> bool: """Execute a given CWL workflow using cwltool. @@ -365,10 +325,10 @@ async def run_job(self, job: JobModel) -> bool: :return: True if the job is executed successfully, False otherwise. """ logger = logging.getLogger("JobWrapper") - # Instantiate runtime metadata from the serializable descriptor and - # the job context so implementations can access task inputs/overrides. - job_execution_hooks = ExecutionHooksHint.from_cwl(job.task) - self._execution_hooks_plugin = job_execution_hooks.to_runtime(job) if job_execution_hooks else None + + # Extract dirac:Job hint and build commands from type + I/O config + job_hint = JobHint.from_cwl(job.task) + self._build_commands_from_hint(job_hint) # Isolate the job in a specific directory self._job_path = Path(".") / "workernode" / f"{random.randint(1000, 9999)}" diff --git a/src/dirac_cwl/job/job_wrapper_template.py b/src/dirac_cwl/job/job_wrapper_template.py index 473e2160..368c120d 100755 --- a/src/dirac_cwl/job/job_wrapper_template.py +++ b/src/dirac_cwl/job/job_wrapper_template.py @@ -7,6 +7,7 @@ import os import sys import tempfile +from typing import Any import DIRAC # type: ignore[import-untyped] from cwl_utils.parser import load_document_by_uri @@ -21,31 +22,53 @@ async def main(): - """Execute the job wrapper for a given job model.""" + """Execute the job wrapper for a given job model. + + Fetches the CWL workflow definition and input parameters from the + diracX API using the WorkflowID stored in the job config JSON. + """ if len(sys.argv) != 3: logging.error("2 arguments required, ") sys.exit(1) job_id = int(sys.argv[2]) - job_json_file = sys.argv[1] - job_wrapper = JobWrapper(job_id) - with open(job_json_file, "r") as file: - job_model_dict = json.load(file) + # Fetch workflow_id, CWL, and params from diracX API using the job_id + from diracx.client.aio import AsyncDiracClient + + async with AsyncDiracClient() as client: + # Get workflow_id and params from job attributes + job_attrs = await client.jobs.get_single_job(job_id) + workflow_id = getattr(job_attrs, "workflow_id", None) + workflow_params = getattr(job_attrs, "workflow_params", None) + + if not workflow_id: + logging.error("Job %d has no workflow_id", job_id) + sys.exit(1) - task_dict = job_model_dict["task"] + # Fetch CWL definition + workflow_response = await client.jobs.get_workflow(workflow_id) + cwl_yaml = workflow_response["cwl"] + + # Parse CWL + yaml_doc = YAML() + task_dict = yaml_doc.load(cwl_yaml) with tempfile.NamedTemporaryFile("w+", suffix=".cwl", delete=False) as f: YAML().dump(task_dict, f) f.flush() task_obj = load_document_by_uri(f.name) - if job_model_dict["input"]: - cwl_inputs_obj = load_inputfile(job_model_dict["input"]["cwl"]) - job_model_dict["input"]["cwl"] = cwl_inputs_obj - job_model_dict["task"] = task_obj + # Build job model + job_model_dict: dict[str, Any] = {"task": task_obj, "input": None} + + # If workflow_params were stored, use them as CWL inputs + if workflow_params: + cwl_inputs_obj = load_inputfile(workflow_params) + job_model_dict["input"] = {"sandbox": None, "cwl": cwl_inputs_obj} job = JobModel.model_validate(job_model_dict) + job_wrapper = JobWrapper(job_id) res = await job_wrapper.run_job(job) if res: diff --git a/src/dirac_cwl/job/submission_clients.py b/src/dirac_cwl/job/submission_clients.py index 2a8739e3..a926cee2 100644 --- a/src/dirac_cwl/job/submission_clients.py +++ b/src/dirac_cwl/job/submission_clients.py @@ -13,7 +13,6 @@ from rich.console import Console from dirac_cwl.core.utility import get_lfns -from dirac_cwl.execution_hooks import SchedulingHint from dirac_cwl.submission_models import JobModel, JobSubmissionModel console = Console() @@ -127,40 +126,14 @@ async def submit_job(self, job_submission: JobSubmissionModel) -> bool: return True def convert_to_jdl(self, job: JobModel, sandbox_pfn: str) -> str: - """ - Convert job model to jdl. + """Convert job model to jdl. - :param job: The task to execute - :param sandbox_pfn: The sandbox PFN - :return: JDL string + .. deprecated:: + JDL conversion is now handled by ``cwl_to_jdl()`` in diracx-logic. + This method will be removed once the DIRACSubmissionClient is + updated to use the new ``POST /api/jobs/`` CWL endpoint. """ - jdl_lines = [] - jdl_lines.append("Executable = dirac-cwl-exec;") - jdl_lines.append("Arguments = job.json;") - - if job.task.requirements and job.task.requirements[0].coresMin: - jdl_lines.append(f"NumberOfProcessors = {job.task.requirements[0].coresMin};") - - jdl_lines.append("JobName = test;") - jdl_lines.append("OutputSandbox = {std.out, std.err};") - - job_scheduling = SchedulingHint.from_cwl(job.task) - if job_scheduling.priority: - jdl_lines.append(f"Priority = {job_scheduling.priority};") - - if job_scheduling.sites: - jdl_lines.append(f"Site = {job_scheduling.sites};") - - jdl_lines.append(f"InputSandbox = {sandbox_pfn};") - if job.input: - formatted_lfns = [] - lfns_list = get_lfns(job.input.cwl).values() - for lfns in lfns_list: - for lfn in lfns: - formatted_lfns.append(str(lfn).replace("lfn:", "LFN:", 1)) - - lfns_str = ", ".join(formatted_lfns) - if lfns_str: - jdl_lines.append(f"InputData = {lfns_str};") - - return "\n".join(jdl_lines) + raise NotImplementedError( + "JDL conversion has moved to diracx-logic (cwl_to_jdl). " + "Use the POST /api/jobs/ endpoint instead." + ) diff --git a/src/dirac_cwl/submission_models.py b/src/dirac_cwl/submission_models.py index 30336290..5b744716 100644 --- a/src/dirac_cwl/submission_models.py +++ b/src/dirac_cwl/submission_models.py @@ -18,10 +18,9 @@ from pydantic import BaseModel, ConfigDict, field_serializer, model_validator from dirac_cwl.execution_hooks import ( - ExecutionHooksHint, - SchedulingHint, TransformationExecutionHooksHint, ) +from diracx.core.models.cwl import JobHint # ----------------------------------------------------------------------------- # Job models @@ -70,13 +69,13 @@ def serialize_task(self, value): @model_validator(mode="before") def validate_hints(cls, values): - """Validate execution hooks and scheduling hints in the task. + """Validate dirac:Job hint in the task. :param values: Model values dictionary. :return: Validated values dictionary. """ task = values.get("task") - ExecutionHooksHint.from_cwl(task), SchedulingHint.from_cwl(task) + JobHint.from_cwl(task) return values diff --git a/test/conftest.py b/test/conftest.py index 272b99c0..6099e100 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -41,7 +41,13 @@ def __init__(self, **data): def sample_command_line_tool(): """Create a sample CommandLineTool.""" return CommandLineTool( - id=".", inputs=[], outputs=[], requirements=[], cwlVersion="v1.2", baseCommand=["echo", "Hello World"] + id=".", + inputs=[], + outputs=[], + requirements=[], + hints=[{"class": "dirac:Job", "schema_version": "1.0", "type": "User"}], + cwlVersion="v1.2", + baseCommand=["echo", "Hello World"], ) diff --git a/test/test_job_submission_clients.py b/test/test_job_submission_clients.py index 8edeac51..8354cfad 100644 --- a/test/test_job_submission_clients.py +++ b/test/test_job_submission_clients.py @@ -21,74 +21,11 @@ class TestDIRACSubmissionClient: """Test the DIRACSubmissionClient class.""" - @pytest.mark.parametrize( - "cwl_file, cwl_input, expected_jdl", - [ - # --- Hello World example --- - # There is no input expected - ( - "test/workflows/helloworld/description_basic.cwl", - None, - """Executable = dirac-cwl-exec; -Arguments = job.json; -JobName = test; -OutputSandbox = {std.out, std.err}; -Priority = 10; -InputSandbox = SB:SandboxSE|/S3/diracx-sandbox-store/sha256:0001.tar.zst;""", - ), - # --- Test metadata example --- - # A string input is passed - ( - "test/workflows/test_meta/test_meta.cwl", - None, - """Executable = dirac-cwl-exec; -Arguments = job.json; -JobName = test; -OutputSandbox = {std.out, std.err}; -Priority = 100; -Site = CTAO.DESY-ZN.de, CTAO.PIC.es; -InputSandbox = SB:SandboxSE|/S3/diracx-sandbox-store/sha256:0001.tar.zst;""", - ), - # Gather only - ( - "test/workflows/pi/pigather.cwl", - "test/workflows/pi/type_dependencies/job/inputs-pi_gather_catalog.yaml", - """Executable = dirac-cwl-exec; -Arguments = job.json; -NumberOfProcessors = 1; -JobName = test; -OutputSandbox = {std.out, std.err}; -Priority = 10; -InputSandbox = SB:SandboxSE|/S3/diracx-sandbox-store/sha256:0001.tar.zst; -InputData = LFN:/pi/100/result_1.sim, LFN:/pi/100/result_2.sim, LFN:/pi/100/result_3.sim, LFN:/pi/100/result_4.sim, \ -LFN:/pi/100/result_5.sim;""", - ), - ], - ) - def test_convert_to_jdl(self, cwl_file, cwl_input, expected_jdl): - """Test convert_to_jdl.""" + def test_convert_to_jdl_deprecated(self): + """Test that convert_to_jdl raises NotImplementedError (moved to diracx-logic).""" submission_client = DIRACSubmissionClient() - - task_path = cwl_file - task = load_document(pack(task_path)) - sandbox_id = "SB:SandboxSE|/S3/diracx-sandbox-store/sha256:0001.tar.zst" - - task_input = None - if cwl_input: - parameter = load_inputfile(cwl_input) - task_input = JobInputModel( - sandbox=[sandbox_id], - cwl=parameter, - ) - - job = JobModel( - task=task, - input=task_input, - ) - - res = submission_client.convert_to_jdl(job, sandbox_id) - - assert res == expected_jdl + with pytest.raises(NotImplementedError, match="cwl_to_jdl"): + submission_client.convert_to_jdl(None, "") class TestPrototypeSubmissionClient: diff --git a/test/test_job_wrapper.py b/test/test_job_wrapper.py index 848728d5..bbe5e624 100644 --- a/test/test_job_wrapper.py +++ b/test/test_job_wrapper.py @@ -13,7 +13,6 @@ os.environ["DIRAC_PROTO_LOCAL"] = "1" from dirac_cwl.core.exceptions import WorkflowProcessingException -from dirac_cwl.execution_hooks.core import ExecutionHooksBasePlugin from dirac_cwl.job.job_report import JobMinorStatus, JobStatus from dirac_cwl.job.job_wrapper import JobWrapper from dirac_cwl.mocks.status import STATUS_DIR @@ -24,18 +23,16 @@ class TestJobWrapper: @pytest.mark.asyncio async def test_instantiation(self, sample_job): - """Test that ExecutionHooksBasePlugin can be instantiated directly with default behavior.""" - hook = ExecutionHooksBasePlugin() + """Test that JobWrapper can run with no commands.""" job_wrapper = JobWrapper(job_id=0) - job_wrapper._execution_hooks_plugin = hook - # Test default pre_process behavior + # Test default pre_process behavior (no commands) command = ["cwltool", "--parallel", "task.cwl"] result = await job_wrapper.pre_process(sample_job.task, None) - assert result == command # Should return command unchanged + assert result == command - # Test default post_process behavior - result = await job_wrapper.post_process(0, "{}", "{}") # Should not raise any exception + # Test default post_process behavior (no commands) + result = await job_wrapper.post_process(0, "{}", "{}") assert result # Test default run_job behavior @@ -43,91 +40,55 @@ async def test_instantiation(self, sample_job): assert result @pytest.mark.asyncio - async def test_execute(self, job_type_testing, sample_job, mocker, monkeypatch): - """Test the execution of the preprocess and postprocess commands. - - The fixture "job_type_testing" is the class "JobTypeTestingPlugin". - It uses the plugin class from the fixture, even though the commands will be overwritten. - """ + async def test_execute(self, sample_job, mocker): + """Test the execution of pre/post-process commands via the command lists.""" from dirac_cwl.commands import PostProcessCommand, PreProcessCommand - # Initialization class PreProcessCmd(PreProcessCommand): - def execute(job_path, **kwargs): + async def execute(self, job_path, **kwargs): return class PostProcessCmd(PostProcessCommand): - def execute(job_path, **kwargs): - return - - class DualProcessCmd(PreProcessCommand, PostProcessCommand): - def execute(job_path, **kwargs): + async def execute(self, job_path, **kwargs): return - plugin = job_type_testing() job_wrapper = JobWrapper(job_id=0) - job_wrapper._execution_hooks_plugin = plugin - # Mock the "execute" commands to be able to spy them - execute_preprocess_mock = mocker.MagicMock() - execute_postprocess_mock = mocker.MagicMock() - execute_dualprocess_mock = mocker.MagicMock() + pre_cmd = PreProcessCmd() + post_cmd = PostProcessCmd() - monkeypatch.setattr(PreProcessCmd, "execute", execute_preprocess_mock) - monkeypatch.setattr(PostProcessCmd, "execute", execute_postprocess_mock) - monkeypatch.setattr(DualProcessCmd, "execute", execute_dualprocess_mock) + mocker.patch.object(pre_cmd, "execute", new_callable=mocker.AsyncMock) + mocker.patch.object(post_cmd, "execute", new_callable=mocker.AsyncMock) - # Test #1: The commands were set in the correct processing step - # Expected Result: Everything works as expected - plugin.preprocess_commands = [PreProcessCmd, DualProcessCmd] - plugin.postprocess_commands = [PostProcessCmd, DualProcessCmd] + job_wrapper._preprocess_commands = [pre_cmd] + job_wrapper._postprocess_commands = [post_cmd] await job_wrapper.pre_process(sample_job.task, None) - execute_preprocess_mock.assert_called_once() - execute_dualprocess_mock.assert_called_once() - - execute_dualprocess_mock.reset_mock() # Reset the mock to be able to call "assert_called_once" + pre_cmd.execute.assert_called_once() await job_wrapper.post_process(0, "{}", "{}") - execute_postprocess_mock.assert_called_once() - execute_dualprocess_mock.assert_called_once() - - # Test #2: The commands were set in the wrong processing step. - # Expected Result: The call raises "TypeError" - plugin.preprocess_commands = [PostProcessCmd, DualProcessCmd] - plugin.postprocess_commands = [PreProcessCmd, DualProcessCmd] - - with pytest.raises(TypeError): - await job_wrapper.pre_process(sample_job.task, None) - - with pytest.raises(TypeError): - await job_wrapper.post_process(0, "{}", "{}") + post_cmd.execute.assert_called_once() @pytest.mark.asyncio - async def test_command_exception(self, job_type_testing, sample_job, mocker, monkeypatch): - """Test exception report when a command fails. - - The fixture "job_type_testing" is the class "JobTypeTestingPlugin". - It uses the plugin class from the fixture, even though the commands will be overwritten. - """ + async def test_command_exception(self, sample_job): + """Test exception report when a command fails.""" from dirac_cwl.commands import PostProcessCommand, PreProcessCommand - # Initialization and set the execute function to raise an exception - class Command(PreProcessCommand, PostProcessCommand): - def execute(job_path, **kwargs): + class FailingPreCmd(PreProcessCommand): + async def execute(self, job_path, **kwargs): raise NotImplementedError() - plugin = job_type_testing() - job_wrapper = JobWrapper(job_id=0) - job_wrapper._execution_hooks_plugin = plugin + class FailingPostCmd(PostProcessCommand): + async def execute(self, job_path, **kwargs): + raise NotImplementedError() - plugin.preprocess_commands = [Command] - plugin.postprocess_commands = [Command] + job_wrapper = JobWrapper(job_id=0) - # The processing steps should raise a "WorkflowProcessingException" + job_wrapper._preprocess_commands = [FailingPreCmd()] with pytest.raises(WorkflowProcessingException): await job_wrapper.pre_process(sample_job.task, None) + job_wrapper._postprocess_commands = [FailingPostCmd()] with pytest.raises(WorkflowProcessingException): await job_wrapper.post_process(0, "{}", "{}") diff --git a/test/test_submission_models.py b/test/test_submission_models.py index 364128f9..4a126cd9 100644 --- a/test/test_submission_models.py +++ b/test/test_submission_models.py @@ -120,17 +120,20 @@ def test_dash_to_snake_case_conversion(self): assert hasattr(runtime, "query_root") or runtime.query_root == "/data" assert hasattr(runtime, "data_type") or runtime.data_type == "AOD" - def test_from_cwl(self, mocker): + def test_from_cwl(self): """Test from_cwl class method.""" - mock_cwl = mocker.Mock() - mock_descriptor = ExecutionHooksHint(hook_plugin="QueryBasedPlugin") - mock_from_cwl = mocker.patch("dirac_cwl.submission_models.ExecutionHooksHint.from_cwl") - mock_from_cwl.return_value = mock_descriptor - - result = ExecutionHooksHint.from_cwl(mock_cwl) - + from cwl_utils.parser.cwl_v1_2 import CommandLineTool + + task = CommandLineTool( + id=".", + inputs=[], + outputs=[], + hints=[{"class": "dirac:ExecutionHooks", "hook_plugin": "QueryBasedPlugin"}], + cwlVersion="v1.2", + ) + result = ExecutionHooksHint.from_cwl(task) assert isinstance(result, ExecutionHooksHint) - mock_from_cwl.assert_called_once_with(mock_cwl) + assert result.hook_plugin == "QueryBasedPlugin" class TestSubmissionModelsIntegration: diff --git a/test/workflows/bad_references/reference_circular1.cwl b/test/workflows/bad_references/reference_circular1.cwl index dded4944..875fa17a 100644 --- a/test/workflows/bad_references/reference_circular1.cwl +++ b/test/workflows/bad_references/reference_circular1.cwl @@ -21,3 +21,11 @@ steps: input: input out: [output] run: ./reference_circular2.cwl + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/bad_references/reference_circular2.cwl b/test/workflows/bad_references/reference_circular2.cwl index a3db2fde..aec57251 100644 --- a/test/workflows/bad_references/reference_circular2.cwl +++ b/test/workflows/bad_references/reference_circular2.cwl @@ -21,3 +21,11 @@ steps: input: input out: [output] run: ./reference_circular1.cwl + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/bad_references/reference_doesnotexists.cwl b/test/workflows/bad_references/reference_doesnotexists.cwl index 82631519..bed82223 100644 --- a/test/workflows/bad_references/reference_doesnotexists.cwl +++ b/test/workflows/bad_references/reference_doesnotexists.cwl @@ -21,3 +21,11 @@ steps: input: input out: [output] run: ./doesnotexist.cwl + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/bad_references/reference_itself.cwl b/test/workflows/bad_references/reference_itself.cwl index a313a05c..952b48a2 100644 --- a/test/workflows/bad_references/reference_itself.cwl +++ b/test/workflows/bad_references/reference_itself.cwl @@ -21,3 +21,11 @@ steps: input: input out: [output] run: ./reference_itself.cwl + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/crypto/base64.cwl b/test/workflows/crypto/base64.cwl index 4842a1c7..e2498a01 100644 --- a/test/workflows/crypto/base64.cwl +++ b/test/workflows/crypto/base64.cwl @@ -20,3 +20,11 @@ outputs: type: File outputBinding: glob: "base64_result.txt" + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/crypto/caesar.cwl b/test/workflows/crypto/caesar.cwl index e122d65a..044bb649 100644 --- a/test/workflows/crypto/caesar.cwl +++ b/test/workflows/crypto/caesar.cwl @@ -25,3 +25,11 @@ outputs: type: File outputBinding: glob: "caesar_result.txt" + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/crypto/description.cwl b/test/workflows/crypto/description.cwl index 4c0dfab6..44f0f31d 100644 --- a/test/workflows/crypto/description.cwl +++ b/test/workflows/crypto/description.cwl @@ -59,3 +59,11 @@ steps: input_string: input_string out: [output] run: ./rot13.cwl + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/crypto/md5.cwl b/test/workflows/crypto/md5.cwl index 31f790d7..ca4ecbf6 100644 --- a/test/workflows/crypto/md5.cwl +++ b/test/workflows/crypto/md5.cwl @@ -20,3 +20,11 @@ outputs: type: File outputBinding: glob: "md5_result.txt" + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/crypto/rot13.cwl b/test/workflows/crypto/rot13.cwl index e6844cba..f77ca339 100644 --- a/test/workflows/crypto/rot13.cwl +++ b/test/workflows/crypto/rot13.cwl @@ -20,3 +20,11 @@ outputs: type: File outputBinding: glob: "rot13_result.txt" + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/helloworld/description_basic.cwl b/test/workflows/helloworld/description_basic.cwl index f87baf02..5242b24d 100644 --- a/test/workflows/helloworld/description_basic.cwl +++ b/test/workflows/helloworld/description_basic.cwl @@ -1,10 +1,15 @@ cwlVersion: v1.2 -# What type of CWL process we have in this document: ComandLineTool or Workflow. class: CommandLineTool -# The inputs for this process: none. +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + inputs: [] -# The outputs for this process: none. outputs: [] baseCommand: ["echo", "Hello World"] + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/helloworld/description_with_inputs.cwl b/test/workflows/helloworld/description_with_inputs.cwl index 0d77098a..112cdc6f 100644 --- a/test/workflows/helloworld/description_with_inputs.cwl +++ b/test/workflows/helloworld/description_with_inputs.cwl @@ -1,6 +1,11 @@ cwlVersion: v1.2 class: CommandLineTool +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + inputs: message: type: string @@ -12,3 +17,6 @@ inputs: outputs: [] baseCommand: echo + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/malformed_description/description_malformed_class.cwl b/test/workflows/malformed_description/description_malformed_class.cwl index e98bfcb4..257c998c 100644 --- a/test/workflows/malformed_description/description_malformed_class.cwl +++ b/test/workflows/malformed_description/description_malformed_class.cwl @@ -6,3 +6,11 @@ inputs: [] outputs: [] baseCommand: ["echo", "Hello World"] + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/malformed_description/description_malformed_command.cwl b/test/workflows/malformed_description/description_malformed_command.cwl index e232c9ea..c55ae93f 100644 --- a/test/workflows/malformed_description/description_malformed_command.cwl +++ b/test/workflows/malformed_description/description_malformed_command.cwl @@ -6,3 +6,11 @@ outputs: [] # baseComand instead of baseCommand baseComand: malformedcommand + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/parallel/description.cwl b/test/workflows/parallel/description.cwl index a385606d..28f479f4 100644 --- a/test/workflows/parallel/description.cwl +++ b/test/workflows/parallel/description.cwl @@ -31,13 +31,9 @@ steps: hints: - - class: dirac:ExecutionHooks - hook_plugin: "QueryBasedPlugin" - output_sandbox: ["a_out", "b_out"] - + - class: dirac:Job + schema_version: "1.0" + type: User $namespaces: - dirac: "../../schemas/dirac-metadata.json#/$defs/" # Generated schema from Pydantic models - -$schemas: - - "../../schemas/dirac-metadata.json" + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/parallel/sleep.cwl b/test/workflows/parallel/sleep.cwl index 7880a80c..3ac61217 100644 --- a/test/workflows/parallel/sleep.cwl +++ b/test/workflows/parallel/sleep.cwl @@ -14,3 +14,11 @@ arguments: date +%s%N > "$(inputs.label).txt" sleep "$(inputs.seconds)" date +%s%N >> "$(inputs.label).txt" + +hints: + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" diff --git a/test/workflows/pi/description.cwl b/test/workflows/pi/description.cwl index 930d4b33..5033a50c 100644 --- a/test/workflows/pi/description.cwl +++ b/test/workflows/pi/description.cwl @@ -7,10 +7,12 @@ doc: > a unit circle inscribed in the square. $namespaces: - dirac: "../../schemas/dirac-metadata.json#/$defs/" + dirac: "https://diracgrid.org/cwl#" -$schemas: - - "../../schemas/dirac-metadata.json" +hints: + - class: dirac:Job + schema_version: "1.0" + type: User # Define the inputs of the workflow inputs: diff --git a/test/workflows/pi/pigather.cwl b/test/workflows/pi/pigather.cwl index 0ecc7f3c..536ecbd2 100644 --- a/test/workflows/pi/pigather.cwl +++ b/test/workflows/pi/pigather.cwl @@ -7,7 +7,12 @@ requirements: ramMin: 1024 hints: - $import: "type_dependencies/transformation/metadata-pi_gather.yaml" + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" inputs: input-data: diff --git a/test/workflows/pi/pisimulate.cwl b/test/workflows/pi/pisimulate.cwl index 444c464c..753250e1 100644 --- a/test/workflows/pi/pisimulate.cwl +++ b/test/workflows/pi/pisimulate.cwl @@ -7,7 +7,12 @@ requirements: ramMin: 1024 hints: - $import: "type_dependencies/transformation/metadata-pi_simulate.yaml" + - class: dirac:Job + schema_version: "1.0" + type: User + +$namespaces: + dirac: "https://diracgrid.org/cwl#" inputs: num-points: diff --git a/test/workflows/test_meta/test_meta.cwl b/test/workflows/test_meta/test_meta.cwl index fbc289ba..8d98958c 100644 --- a/test/workflows/test_meta/test_meta.cwl +++ b/test/workflows/test_meta/test_meta.cwl @@ -1,28 +1,20 @@ cwlVersion: v1.2 -# What type of CWL process we have in this document: ComandLineTool or Workflow. class: CommandLineTool -# The inputs for this process: none. -inputs: [ ] -# The outputs for this process: none. -outputs: [ ] +inputs: [] +outputs: [] -baseCommand: [ "echo", "Hello World" ] +baseCommand: ["echo", "Hello World"] $namespaces: - dirac: "../../schemas/dirac-metadata.json#/$defs/" # Generated schema from Pydantic models - -$schemas: - - "../../schemas/dirac-metadata.json" + dirac: "https://diracgrid.org/cwl#" hints: - - class: dirac:ExecutionHooks - hook_plugin: "QueryBasedPlugin" - configuration: - campaign: PROD5 - site: LaPalma - - - class: dirac:Scheduling + - class: dirac:Job + schema_version: "1.0" + type: User platform: x86_64 priority: 100 - sites: CTAO.DESY-ZN.de, CTAO.PIC.es + sites: + - CTAO.DESY-ZN.de + - CTAO.PIC.es diff --git a/test/workflows/test_outputs/test_outputs.cwl b/test/workflows/test_outputs/test_outputs.cwl index a8b27410..04508a3d 100644 --- a/test/workflows/test_outputs/test_outputs.cwl +++ b/test/workflows/test_outputs/test_outputs.cwl @@ -15,13 +15,9 @@ baseCommand: ["touch", "file1"] $namespaces: - dirac: "../../schemas/dirac-metadata.json#/$defs/" # Generated schema from Pydantic models - -$schemas: - - "../../schemas/dirac-metadata.json" + dirac: "https://diracgrid.org/cwl#" hints: - - class: dirac:ExecutionHooks - hook_plugin: "QueryBasedPlugin" - output_paths: - output1: "lfn:/test/output" + - class: dirac:Job + schema_version: "1.0" + type: User diff --git a/test/workflows/test_outputs/test_outputs_sandbox.cwl b/test/workflows/test_outputs/test_outputs_sandbox.cwl index 1999d427..04508a3d 100644 --- a/test/workflows/test_outputs/test_outputs_sandbox.cwl +++ b/test/workflows/test_outputs/test_outputs_sandbox.cwl @@ -15,12 +15,9 @@ baseCommand: ["touch", "file1"] $namespaces: - dirac: "../../schemas/dirac-metadata.json#/$defs/" # Generated schema from Pydantic models - -$schemas: - - "../../schemas/dirac-metadata.json" + dirac: "https://diracgrid.org/cwl#" hints: - - class: dirac:ExecutionHooks - hook_plugin: "QueryBasedPlugin" - output_sandbox: ["output1"] + - class: dirac:Job + schema_version: "1.0" + type: User