Add support for multi deployment using workflow cwl#968
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #968 +/- ##
==========================================
+ Coverage 88.22% 88.30% +0.07%
==========================================
Files 88 88
Lines 20569 20755 +186
Branches 2702 2744 +42
==========================================
+ Hits 18148 18327 +179
- Misses 1733 1735 +2
- Partials 688 693 +5 ☔ View full report in Codecov by Sentry. |
fmigneault
left a comment
There was a problem hiding this comment.
Review still in progress. Posting pending comments to stage in the meantime.
| # Since there's no workflow, first tool should be the main process | ||
| main_id = result["processSummary"]["id"] | ||
| assert main_id in [f"{test_id}-tool-1", f"{test_id}-tool-2"] |
There was a problem hiding this comment.
I think this operation should instead be 400.
Based on https://www.commonwl.org/v1.2/CommandLineTool.html#Packed_documents
If the reference to the packed document does not include a fragment identifier, the runner must choose the top-level process object as the entry point. If there is no top-level process object (as in the case of $graph) then the runner must choose the process object with an id of #main. If there is no #main object, the runner must return an error.
Therefore, CWL will execute by default only the first one, which makes the rest of the definition irrelevant. To avoid proliferation / misinterpretation of partial definitions, I think Weaver should detect and refuse this case early on.
| return package | ||
| # type: (CWL) -> Union[CWL, List[CWL]] | ||
| """ | ||
| Resolve CWL $graph into deployable packages. |
There was a problem hiding this comment.
Missing :term: and code quotes for fragments found inside CWL, Python keywords, etc.
Same for other docstrings.
| If an embedded ``executionUnit`` containing the :term:`CWL` is desired, provide ``body={}`` explicitly. | ||
| Can also be a list of :term:`CWL` definitions (dicts, strings, or file paths) for multi-process | ||
| deployment, which will be combined into a multipart request. When deploying multiple CWL files, | ||
| the workflow (if present) should be last in the list, and all tools should come before it. |
There was a problem hiding this comment.
I think the "last in the list" requirement is a side effect of the order the code uses to do validation, which should not impact the deployment content structure. Or it this just an artifact of earlier definition since the code has a sorting/resolution function.
As long as there is 1 Workflow, I think it could be anywhere.
Since we only handle 1 workflow (for now at least), the Content-ID or id reference in the CWL could be used to identify the relevant one, as per https://www.commonwl.org/v1.2/CommandLineTool.html#Packed_documents.
| # ============================================================================= | ||
| # Multipart Deployment Parsing Tests | ||
| # ============================================================================= | ||
| # These tests directly test the internal multipart parsing functions to cover | ||
| # branches that are difficult to reach through integration tests. |
There was a problem hiding this comment.
Move under common class or remove comment
Consider adding a @pytest.mark.multipart marker on class or each test individually.
| assert result.body["id"] == test_id | ||
| assert result.body["processDescriptionURL"] | ||
|
|
||
| def test_deploy_multi_cwl_tools_only(self): |
There was a problem hiding this comment.
Error according to previous comment.
| def test_get_multipart_content_with_bytes(): | ||
| """ | ||
| Test _get_multipart_content with bytes input. | ||
| """ | ||
| content = b"test content" | ||
| result = _get_multipart_content(content, request=None) | ||
| assert result == b"test content" | ||
| assert isinstance(result, bytes) |
There was a problem hiding this comment.
This and other ones similar below should be combined under a single test function with @pytest.mark.parametrized to make supported combinations easier to interpret all at once.
| if not isinstance(part_data, dict): | ||
| return process_description | ||
|
|
||
| if 'class' in part_data and part_data['class'] in ['CommandLineTool', 'Workflow', 'ExpressionTool']: |
There was a problem hiding this comment.
inconsistent string, use double quotes, here and below
| def parse_multipart_deploy(content, content_type, request=None): | ||
| # type: (Union[str, bytes], str, Optional[AnyRequestType]) -> Tuple[List[CWL], Optional[JSON]] | ||
| """ | ||
| Parse multipart/mixed or multipart/related deployment content. |
There was a problem hiding this comment.
A lot of ignored code paths in this function (# pragma: no cover). If those are considered supported error-handling/robustness cases, they should be tested properly.
| # Check if content should be fetched from Content-Location | ||
| # Only fetch if part body is empty AND Content-Location looks like a URL | ||
| if content_location and (not part_content or not part_content.strip()): |
There was a problem hiding this comment.
Move this if block in a separate function to make the main one easier to interpret.
| # Reorder if root workflow specified and validate it | ||
| if root_workflow_cid and root_workflow_cid in parts_by_cid: | ||
| root_pkg = parts_by_cid[root_workflow_cid] | ||
| # Validate that the root is actually a Workflow (per RFC 5621 and multipart/related requirements) | ||
| root_class = root_pkg.get("class", "") | ||
| if root_class != "Workflow": | ||
| raise HTTPBadRequest(json={ | ||
| "title": "Invalid root workflow reference", | ||
| "description": ( | ||
| f"The 'start' parameter references a CWL with class '{root_class}', " | ||
| "but only 'Workflow' is permitted as root document in multipart/related." | ||
| ), | ||
| "cause": {"Content-ID": root_workflow_cid, "class": root_class} | ||
| }) | ||
| cwl_packages = [pkg for pkg in cwl_packages if pkg is not root_pkg] | ||
| cwl_packages.append(root_pkg) | ||
| elif not root_workflow_cid and cwl_packages: | ||
| # No explicit start parameter: validate first element is a Workflow (RFC 5621 §7 default) | ||
| first_pkg = cwl_packages[0] | ||
| first_class = first_pkg.get("class", "") | ||
| if first_class and first_class != "Workflow": | ||
| LOGGER.warning( | ||
| "No 'start' parameter provided in multipart/related. First element has class '%s' " | ||
| "but 'Workflow' is recommended for root document. Proceeding with deployment.", | ||
| first_class | ||
| ) |
There was a problem hiding this comment.
Move this in a separate function
| def parse_multipart_deploy(content, content_type, request=None): | ||
| # type: (Union[str, bytes], str, Optional[AnyRequestType]) -> Tuple[List[CWL], Optional[JSON]] |
There was a problem hiding this comment.
Please refactor in smaller chunks.
In the long run, I plan to support multipart "Deploy+Execute" (like this #834 (comment)).
Therefore, I will need to inject other intermediate/diverging steps, and would like to reuse these functions such as _classify_multipart_part. I would need to have the steps better split out:
- parse multipart => [parts]
- for part => interpret part
- for now, you consider only CWL / process description as you currently do
- I can latch on these functions filter "Execute" later
- filter / sort parts in relevant way
| # Verify child tools were deployed | ||
| if "deployedProcesses" in result: | ||
| assert f"{test_id}-echo-tool" in result["deployedProcesses"] | ||
| assert f"{test_id}-cat-tool" in result["deployedProcesses"] |
| # Verify child tools were deployed | ||
| if "deployedProcesses" in result: | ||
| assert f"{test_id}-echo-tool" in result["deployedProcesses"] | ||
| assert f"{test_id}-cat-tool" in result["deployedProcesses"] | ||
|
|
||
| # Verify we can retrieve the child tools | ||
| desc = self.get_process_description(f"{test_id}-echo-tool", schema=ProcessSchema.OLD) | ||
| assert desc["process"]["id"] == f"{test_id}-echo-tool" | ||
| pkg = self.get_application_package(f"{test_id}-echo-tool") | ||
| assert pkg["class"] == "CommandLineTool" | ||
|
|
||
| desc = self.get_process_description(f"{test_id}-cat-tool", schema=ProcessSchema.OLD) | ||
| assert desc["process"]["id"] == f"{test_id}-cat-tool" | ||
| pkg = self.get_application_package(f"{test_id}-cat-tool") | ||
| assert pkg["class"] == "CommandLineTool" |
There was a problem hiding this comment.
only main proces should be checked then ?
There was a problem hiding this comment.
My bad, I over-selected lines. The deployedProcesses part is not applicable, since it is never returned. Only the "main workflow" process is returned in the response. For validation of the procedure though, yes, check that each process individually was deployed correctly.
Uh oh!
There was an error while loading. Please reload this page.