#800 - Local workflow execution#802
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds StandaloneExecutor and ComposeExecutor, CLI subcommands for standalone and docker-compose, Bazel build/test targets, cookbook filegroup, docs and .gitignore updates, plus comprehensive unit and Docker-gated integration tests. ChangesStandalone & Compose Integration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cli/local.py`:
- Around line 52-66: The parsed args.docker_cmd is not forwarded; update the
call in _run_local to pass docker_cmd=args.docker_cmd into
local_executor.run_workflow_locally, add a docker_cmd parameter to
run_workflow_locally in src/utils/local_executor.py, and thread that parameter
into the LocalExecutor instantiation/use so LocalExecutor receives and uses the
docker command (e.g., add docker_cmd to LocalExecutor.__init__ or the specific
method that launches containers and wire it through all intermediate calls).
In `@src/utils/local_executor.py`:
- Around line 273-295: run_workflow_locally currently always removes work_dir on
success which can delete a caller-supplied directory; change it to only remove
the directory when it was created inside the function. Introduce a flag (e.g.,
created_temp_dir) before calling tempfile.mkdtemp and set it true when you
create the temp directory; then only call shutil.rmtree(work_dir, ...) when
success is True, keep_work_dir is False, and created_temp_dir is True. Update
references in run_workflow_locally and any related cleanup logic so
user-provided work_dir is never removed.
- Around line 228-247: The debug log prints the fully expanded docker_args
(including '-e KEY=value' entries), exposing secrets from task_spec.environment;
update the logic that builds or logs docker_args (references: docker_args,
task_spec.environment, _substitute_tokens, logger.debug) to redact environment
values before logging—e.g., replace values for any '-e KEY=...' entries with a
placeholder like 'REDACTED' (or construct a sanitized copy of docker_args that
masks environment values) and log that sanitized list instead of the original;
ensure actual docker_args passed to subprocess remain unchanged so execution is
unaffected.
- Around line 90-113: The method currently treats an empty run as success when
no tasks were executed (e.g., DAG cycle). After the ready loop, check if no
tasks were executed (self._results is empty or a counter is zero) while there
are tasks defined (self._task_nodes not empty); if so, log an error indicating
the workflow is stalled/contains a cycle (include spec.name) and return False.
Use the existing symbols _find_ready_tasks, _results, _task_nodes and spec.name
to implement this early failure path so stalled workflows are reported as
failures.
In `@src/utils/tests/BUILD`:
- Around line 34-42: The py_test target named "test_local_executor" is marked
manual which hides its coverage from CI; remove the tags = ["manual"] attribute
from the "test_local_executor" target so the suite runs in CI, and if you must
keep Docker-only cases manual, split them out of test_local_executor.py into a
separate test file and add a new py_test target (e.g.,
"test_local_executor_docker") that depends on the Docker-only tests and retains
tags = ["manual"] instead of marking the whole "test_local_executor" target
manual.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ff387238-7afc-4e5c-8e25-06298591e48d
📒 Files selected for processing (7)
src/cli/BUILDsrc/cli/local.pysrc/cli/main_parser.pysrc/utils/BUILDsrc/utils/local_executor.pysrc/utils/tests/BUILDsrc/utils/tests/test_local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (3)
src/utils/local_executor.py (3)
91-114:⚠️ Potential issue | 🔴 CriticalReport stalled DAGs as failures.
If
readybecomes empty before every node has a result, this falls through to the success log even though some tasks never ran. A cycle is the obvious case, but any permanently blocked dependency hits the same path.Suggested fix
while ready: for task_name in ready: node = self._task_nodes[task_name] logger.info('--- Running task: %s (image: %s) ---', task_name, node.spec.image) result = self._run_task(node, spec) @@ ready = self._find_ready_tasks() + unresolved_tasks = sorted(set(self._task_nodes) - set(self._results)) + if unresolved_tasks: + logger.error( + 'Workflow "%s" stalled with unresolved tasks: %s', + spec.name, + ', '.join(unresolved_tasks), + ) + return False + failed = [name for name, r in self._results.items() if r.exit_code != 0]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 91 - 114, The loop exits as success when no ready tasks remain even if some nodes never ran; update the run loop after ready = self._find_ready_tasks() to detect a stalled DAG by comparing self._results keys against all task names in self._task_nodes (or using a helper) and if there are unreached tasks (e.g., due to a cycle or blocked dependency) log an error listing those task names and return False instead of proceeding to success; reference the methods/attributes _find_ready_tasks, _results, _task_nodes, and leave existing failure handling via _cancel_downstream/_run_task intact.
286-290:⚠️ Potential issue | 🔴 CriticalOnly delete work directories created inside this helper.
A successful run currently
rmtree()s anywork_dir, including a caller-supplied path.--work-dirshould never wipe an existing folder on the normal success path.Suggested fix
def run_workflow_locally(spec_path: str, work_dir: str | None = None, keep_work_dir: bool = False) -> bool: - if work_dir is None: + created_temp_work_dir = work_dir is None + if created_temp_work_dir: work_dir = tempfile.mkdtemp(prefix='osmo-local-') logger.info('Using temporary work directory: %s', work_dir) @@ - if not keep_work_dir and success: + if created_temp_work_dir and not keep_work_dir and success: logger.info('Cleaning up work directory: %s', work_dir) shutil.rmtree(work_dir, ignore_errors=True)Also applies to: 306-308
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 286 - 290, The current run_workflow_locally function always removes work_dir on success which can delete caller-supplied folders; modify run_workflow_locally to track whether the helper created the directory (e.g., set created_temp_dir = True when assigning work_dir = tempfile.mkdtemp(...)) and only call shutil.rmtree(work_dir) at cleanup when created_temp_dir is True and keep_work_dir is False; ensure all cleanup branches (including the success path and exception paths around the existing logic that references work_dir and keep_work_dir) use this flag so externally provided work_dir is never deleted by the helper.
241-260:⚠️ Potential issue | 🟠 MajorRedact env values before logging the Docker command.
By the time this debug log runs,
docker_argsalready contains resolved-e KEY=valuepairs, so any secret passed throughenvironment:is written to logs verbatim.Suggested fix
- logger.debug('Docker command: %s', ' '.join(docker_args)) + redacted_docker_args = list(docker_args) + for index, value in enumerate(redacted_docker_args[:-1]): + if value == '-e': + key = redacted_docker_args[index + 1].split('=', 1)[0] + redacted_docker_args[index + 1] = f'{key}=***' + logger.debug('Docker command: %s', ' '.join(redacted_docker_args))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 241 - 260, The docker_args list currently includes resolved environment pairs from task_spec.environment and is logged verbatim by logger.debug('Docker command: %s', ' '.join(docker_args)); change this so secrets are redacted before logging by creating a sanitized copy of docker_args that replaces any '-e KEY=value' entries (or items following '-e') with '-e KEY=REDACTED' (or only '-e KEY') and log that sanitized list instead; locate the block that builds docker_args (references: task_spec.environment, docker_args, resolved_value, logger.debug) and use the sanitized copy in the debug call rather than the original docker_args.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/local_executor.py`:
- Around line 162-173: Local execution currently ignores TaskSpec.privileged and
TaskSpec.hostNetwork causing workflows to run with different semantics; update
the local executor to reject these flags like other unsupported features by
checking task_spec.privileged and task_spec.hostNetwork and appending
descriptive messages to the unsupported_features list (use f'Task
"{task_spec.name}": privileged containers are not supported by local executor'
and f'Task "{task_spec.name}": hostNetwork is not supported by local executor')
so callers see and block workflows that rely on those settings; locate checks
around unsupported_features and task_spec in the local_executor code and add
these two checks.
- Around line 295-300: The current Jinja guard in local_executor.py uses the
wrong block marker '{%%' so blocks like '{% ... %}' slip past; update the
template_markers tuple used where spec_text is checked (the template_markers
variable and the any(marker in spec_text for marker in template_markers) check)
to look for the correct Jinja start markers (use '{%' and '{#' and keep
'default-values'), then run the existing any(...) check and keep the same
ValueError raise path so templated specs are rejected earlier with the clear
message.
- Line 2: Add a local pylint suppression for the long SPDX copyright header by
inserting a single-line comment "# pylint: disable=line-too-long" immediately
after the SPDX header (the long copyright line at the top of
src/utils/local_executor.py) so the file-level long-line rule is silenced for
that header only; ensure the comment is placed directly below the header and
does not alter any other lines or logic.
- Around line 208-213: The _task_gpu_count function currently looks up resources
via spec.resources[task_spec.resource] but TaskSpec exposes resources directly;
change _task_gpu_count to use task_spec.resources (the ResourceSpec object) to
read the gpu attribute (keep returning 0 if missing). Also ensure the Docker GPU
CLI argument construction used in subprocess.run keeps the literal quotes around
the device string (pass a single argv element containing the quoted string,
e.g., '"device=0,1"', so Docker receives the quoted value for --gpus).
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 91-114: The loop exits as success when no ready tasks remain even
if some nodes never ran; update the run loop after ready =
self._find_ready_tasks() to detect a stalled DAG by comparing self._results keys
against all task names in self._task_nodes (or using a helper) and if there are
unreached tasks (e.g., due to a cycle or blocked dependency) log an error
listing those task names and return False instead of proceeding to success;
reference the methods/attributes _find_ready_tasks, _results, _task_nodes, and
leave existing failure handling via _cancel_downstream/_run_task intact.
- Around line 286-290: The current run_workflow_locally function always removes
work_dir on success which can delete caller-supplied folders; modify
run_workflow_locally to track whether the helper created the directory (e.g.,
set created_temp_dir = True when assigning work_dir = tempfile.mkdtemp(...)) and
only call shutil.rmtree(work_dir) at cleanup when created_temp_dir is True and
keep_work_dir is False; ensure all cleanup branches (including the success path
and exception paths around the existing logic that references work_dir and
keep_work_dir) use this flag so externally provided work_dir is never deleted by
the helper.
- Around line 241-260: The docker_args list currently includes resolved
environment pairs from task_spec.environment and is logged verbatim by
logger.debug('Docker command: %s', ' '.join(docker_args)); change this so
secrets are redacted before logging by creating a sanitized copy of docker_args
that replaces any '-e KEY=value' entries (or items following '-e') with '-e
KEY=REDACTED' (or only '-e KEY') and log that sanitized list instead; locate the
block that builds docker_args (references: task_spec.environment, docker_args,
resolved_value, logger.debug) and use the sanitized copy in the debug call
rather than the original docker_args.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: e830cb69-d490-4557-9c39-1898c8b721c2
📒 Files selected for processing (1)
src/utils/local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (5)
src/utils/local_executor.py (5)
91-114:⚠️ Potential issue | 🔴 CriticalStalled/cyclic workflows incorrectly report success.
If the DAG contains a cycle or no tasks are initially ready,
readyis empty on the first pass, the while loop never executes,_resultsstays empty, and the method falls through to the success log at line 113 even though no tasks ran.Proposed fix to detect stalled workflows
while ready: for task_name in ready: node = self._task_nodes[task_name] logger.info('--- Running task: %s (image: %s) ---', task_name, node.spec.image) result = self._run_task(node, spec) self._results[task_name] = result if result.exit_code != 0: logger.error('Task "%s" failed with exit code %d', task_name, result.exit_code) self._cancel_downstream(task_name) return False logger.info('Task "%s" completed successfully', task_name) ready = self._find_ready_tasks() + unresolved_tasks = sorted(set(self._task_nodes) - set(self._results)) + if unresolved_tasks: + logger.error( + 'Workflow "%s" stalled with unresolved tasks (possible cycle): %s', + spec.name, + ', '.join(unresolved_tasks), + ) + return False + failed = [name for name, r in self._results.items() if r.exit_code != 0]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 91 - 114, The run loop can silently succeed when no tasks ever become ready (e.g., cycles) because ready is empty and _results stays empty; modify the logic in the method using _find_ready_tasks, _task_nodes and _results to detect stalled workflows: if ready is empty but there remain tasks not in _results (i.e., pending tasks exist), log an error indicating a stalled/cyclic workflow (include spec.name and the list of pending task names) and return False instead of falling through to success; alternatively detect lack of progress between iterations (no newly completed tasks) and treat that as failure, calling the same error path (and keep existing _cancel_downstream usage where appropriate).
297-302:⚠️ Potential issue | 🟡 MinorFix the Jinja block marker check.
'{%%'never matches a normal{% ... %}block, so templated specs skip this guard and fail later with unclear errors.- template_markers = ('{%%', '{#', 'default-values') + template_markers = ('{%', '{#', 'default-values')🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 297 - 302, The guard that detects Jinja templates is using the wrong marker '{%%' so it never matches normal Jinja block syntax and lets templated specs through; update the template_markers used when scanning spec_text (the tuple named template_markers near the ValueError raise) to include the correct block marker '{%' (and also add '{{' for expression markers and keep '{#' and 'default-values') so the any(marker in spec_text for marker in template_markers) check correctly detects Jinja templates and triggers the existing ValueError path.
162-173:⚠️ Potential issue | 🟠 MajorMissing validation for
privilegedandhostNetworkflags.
TaskSpecexposesprivilegedandhostNetworkfields, but local execution ignores them. Workflows relying on either setting will be accepted but run with different container semantics than the cluster path.Proposed fix
if task_spec.volumeMounts: unsupported_features.append( f'Task "{task_spec.name}": volumeMounts require cluster-level host paths') + + if task_spec.privileged: + unsupported_features.append( + f'Task "{task_spec.name}": privileged mode is not supported in local execution') + + if task_spec.hostNetwork: + unsupported_features.append( + f'Task "{task_spec.name}": hostNetwork is not supported in local execution')🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 162 - 173, local_executor currently ignores TaskSpec.privileged and TaskSpec.hostNetwork, so workflows with those flags are accepted but executed with different semantics; update the validation in local_executor.py where unsupported_features is built (the block handling task_spec.credentials/checkpoint/volumeMounts) to also detect task_spec.privileged and task_spec.hostNetwork and append clear messages like f'Task "{task_spec.name}": privileged requires cluster-level privileges' and f'Task "{task_spec.name}": hostNetwork requires cluster networking' so these workflows are rejected for local execution the same way other unsupported features are.
288-313:⚠️ Potential issue | 🔴 CriticalDon't delete caller-supplied work directories.
A successful run currently
rmtree()s anywork_dir, not just the temporary directory created inside this helper. If the user points--work-dirat an existing folder, it will be wiped after a normal success path.Proposed fix
def run_workflow_locally(spec_path: str, work_dir: str | None = None, keep_work_dir: bool = False) -> bool: + created_temp_work_dir = work_dir is None if work_dir is None: work_dir = tempfile.mkdtemp(prefix='osmo-local-') logger.info('Using temporary work directory: %s', work_dir) with open(spec_path) as f: spec_text = f.read() template_markers = ('{%', '{#', 'default-values') if any(marker in spec_text for marker in template_markers): raise ValueError( 'This spec uses Jinja templates which require server-side expansion.\n' 'Run "osmo workflow submit --dry-run -f <spec>" first to get the expanded spec,\n' 'then save that output and run it locally.') executor = LocalExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir) spec = executor.load_spec(spec_text) success = executor.execute(spec) - if not keep_work_dir and success: + if created_temp_work_dir and not keep_work_dir and success: logger.info('Cleaning up work directory: %s', work_dir) shutil.rmtree(work_dir, ignore_errors=True) elif not success: logger.info('Work directory preserved for debugging: %s', work_dir) return success🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 288 - 313, The helper run_workflow_locally currently always deletes work_dir on success which can wipe a caller-supplied directory; change it to only remove the directory when it was created inside this function (i.e., when you called tempfile.mkdtemp). Add a flag (e.g., created_temp = True/False) set to True right after tempfile.mkdtemp and only call shutil.rmtree(work_dir, ...) when created_temp is True and keep_work_dir is False and execution succeeded; leave existing logging behavior for preserved dirs and ensure created_temp is referenced around the shutil.rmtree and logging so externally provided work_dir is never removed.
262-262:⚠️ Potential issue | 🟠 MajorAvoid logging environment variable values that may contain secrets.
docker_argscontains fully expanded-e KEY=valuepairs from the workflow spec'senvironmentfield. The debug log prints them verbatim, potentially exposing tokens or passwords.Proposed fix to redact environment values
- logger.debug('Docker command: %s', ' '.join(docker_args)) + redacted_args = list(docker_args) + for i, arg in enumerate(redacted_args[:-1]): + if arg == '-e': + key = redacted_args[i + 1].split('=', 1)[0] + redacted_args[i + 1] = f'{key}=***' + logger.debug('Docker command: %s', ' '.join(redacted_args))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` at line 262, The debug log prints the full docker_args (including expanded '-e KEY=value' entries) which may expose secrets; update the logger.debug call to log a redacted copy instead: create a redacted_args from docker_args by replacing environment-value parts (e.g. tokens matching '-e...=value' or strings containing '-e' and '=') with just the key and a placeholder (e.g. '-e KEY=REDACTED') and then log ' '.join(redacted_args) instead of the original; use the existing docker_args variable and a small regex or token-based transform (and add import re if needed) where logger.debug('Docker command: %s', ' '.join(docker_args)) is currently called.
🧹 Nitpick comments (1)
src/utils/local_executor.py (1)
264-269: Subprocess call with spec-derived arguments is acceptable for local execution.Static analysis flags S603 (subprocess with untrusted input). However, this is a local execution tool where the user explicitly provides the workflow spec. The trust boundary is appropriate - users running
osmo local runare intentionally executing containers defined in their own spec files.Consider adding a brief comment noting this design decision for future maintainers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 264 - 269, Add a short comment above the subprocess.run call in the local_executor (around the docker_args / subprocess.run usage) explaining that docker_args are derived from the user-provided workflow spec and thus this is an intentional local execution design decision (trusted boundary), and append a linter suppression token (e.g., "# nosec") to the subprocess.run line to document/quiet static analysis S603; reference the docker_args variable, the subprocess.run call, and the surrounding try/except that returns TaskResult.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/local_executor.py`:
- Around line 236-239: The GPU device string is being built with literal double
quotes which become part of the argument passed to Docker; in the GPU handling
branch (see docker_args, _task_gpu_count and logger usage) remove the extra
quotes so the second element appended is device=0,1 (no surrounding ") by
changing the formatted string from including quotes to a plain
f'device={",".join(...) }' so subprocess.run receives the unquoted device list.
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 91-114: The run loop can silently succeed when no tasks ever
become ready (e.g., cycles) because ready is empty and _results stays empty;
modify the logic in the method using _find_ready_tasks, _task_nodes and _results
to detect stalled workflows: if ready is empty but there remain tasks not in
_results (i.e., pending tasks exist), log an error indicating a stalled/cyclic
workflow (include spec.name and the list of pending task names) and return False
instead of falling through to success; alternatively detect lack of progress
between iterations (no newly completed tasks) and treat that as failure, calling
the same error path (and keep existing _cancel_downstream usage where
appropriate).
- Around line 297-302: The guard that detects Jinja templates is using the wrong
marker '{%%' so it never matches normal Jinja block syntax and lets templated
specs through; update the template_markers used when scanning spec_text (the
tuple named template_markers near the ValueError raise) to include the correct
block marker '{%' (and also add '{{' for expression markers and keep '{#' and
'default-values') so the any(marker in spec_text for marker in template_markers)
check correctly detects Jinja templates and triggers the existing ValueError
path.
- Around line 162-173: local_executor currently ignores TaskSpec.privileged and
TaskSpec.hostNetwork, so workflows with those flags are accepted but executed
with different semantics; update the validation in local_executor.py where
unsupported_features is built (the block handling
task_spec.credentials/checkpoint/volumeMounts) to also detect
task_spec.privileged and task_spec.hostNetwork and append clear messages like
f'Task "{task_spec.name}": privileged requires cluster-level privileges' and
f'Task "{task_spec.name}": hostNetwork requires cluster networking' so these
workflows are rejected for local execution the same way other unsupported
features are.
- Around line 288-313: The helper run_workflow_locally currently always deletes
work_dir on success which can wipe a caller-supplied directory; change it to
only remove the directory when it was created inside this function (i.e., when
you called tempfile.mkdtemp). Add a flag (e.g., created_temp = True/False) set
to True right after tempfile.mkdtemp and only call shutil.rmtree(work_dir, ...)
when created_temp is True and keep_work_dir is False and execution succeeded;
leave existing logging behavior for preserved dirs and ensure created_temp is
referenced around the shutil.rmtree and logging so externally provided work_dir
is never removed.
- Line 262: The debug log prints the full docker_args (including expanded '-e
KEY=value' entries) which may expose secrets; update the logger.debug call to
log a redacted copy instead: create a redacted_args from docker_args by
replacing environment-value parts (e.g. tokens matching '-e...=value' or strings
containing '-e' and '=') with just the key and a placeholder (e.g. '-e
KEY=REDACTED') and then log ' '.join(redacted_args) instead of the original; use
the existing docker_args variable and a small regex or token-based transform
(and add import re if needed) where logger.debug('Docker command: %s', '
'.join(docker_args)) is currently called.
---
Nitpick comments:
In `@src/utils/local_executor.py`:
- Around line 264-269: Add a short comment above the subprocess.run call in the
local_executor (around the docker_args / subprocess.run usage) explaining that
docker_args are derived from the user-provided workflow spec and thus this is an
intentional local execution design decision (trusted boundary), and append a
linter suppression token (e.g., "# nosec") to the subprocess.run line to
document/quiet static analysis S603; reference the docker_args variable, the
subprocess.run call, and the surrounding try/except that returns TaskResult.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: bf7af37f-080d-422d-91a5-301615578ab8
📒 Files selected for processing (1)
src/utils/local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/cli/local.py (1)
73-88: Consider markingservice_clientas unused.The
_run_localfunction receivesservice_clientper the CLI dispatcher contract (seesrc/cli/cli.py:118) but doesn't use it since local execution bypasses the OSMO service. Consider adding an underscore prefix or a brief comment to signal this is intentional:Suggested change
-def _run_local(service_client, args: argparse.Namespace): +def _run_local(_service_client, args: argparse.Namespace): # service_client unused for local runs🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cli/local.py` around lines 73 - 88, The _run_local function accepts a service_client parameter that is intentionally unused; mark it as unused by renaming it to _service_client or _ (or add an inline comment like "# service_client unused for local execution") so linters and readers understand this is deliberate; update the function signature for _run_local and any internal references accordingly to avoid linter warnings while preserving the CLI dispatcher contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cli/local.py`:
- Around line 1-17: The SPDX header docstring at the top of src/cli/local.py
contains lines exceeding 100 characters; add a module-level pylint suppression
for line-too-long to the file so the SPDX lines are allowed unwrapped. Insert a
single "# pylint: disable=line-too-long" comment near the top of the module
(e.g., immediately above or below the existing triple-quoted SPDX docstring) so
the header is exempt from the linter without modifying the license text.
---
Nitpick comments:
In `@src/cli/local.py`:
- Around line 73-88: The _run_local function accepts a service_client parameter
that is intentionally unused; mark it as unused by renaming it to
_service_client or _ (or add an inline comment like "# service_client unused for
local execution") so linters and readers understand this is deliberate; update
the function signature for _run_local and any internal references accordingly to
avoid linter warnings while preserving the CLI dispatcher contract.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d904529c-e0cd-4ea4-b9f7-ac507838a76d
📒 Files selected for processing (4)
.gitignoresrc/cli/local.pysrc/utils/local_executor.pysrc/utils/tests/BUILD
✅ Files skipped from review due to trivial changes (2)
- src/utils/tests/BUILD
- .gitignore
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
src/utils/tests/test_local_executor.py (2)
159-160: Prefer specific exception types over bareException.The tests at lines 159, 179, and 189 catch bare
Exception. While this works, asserting a more specific exception type (e.g.,ValueErrororpydantic.ValidationError) would make test failures more informative and verify the correct error path.This is a minor improvement for test quality.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 159 - 160, Replace the three tests that use self.assertRaises(Exception) to assert a more specific error type when calling executor.load_spec; locate the assertions in the test file where executor.load_spec(spec_text) is expected to fail and change the raised type to the appropriate specific exception (e.g., ValueError or pydantic.ValidationError) that the load_spec implementation actually raises so the tests validate the correct error path.
1-17: Consider adding pylint suppression for long copyright header.Similar to the production file, line 2 exceeds 100 characters. If the same linting rules apply to test files, add the suppression comment.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 1 - 17, Add the same pylint suppression for long lines at the top of this test module as in production: insert a module-level pylint disable for "line-too-long" (e.g., the same "# pylint: disable=line-too-long" comment used in the production file) so the long copyright/header string in test_local_executor.py won't trigger the linter.src/utils/local_executor.py (1)
182-183: Specify explicit encoding for file operations.Several
open()calls lack an explicit encoding parameter. While Python 3 defaults to the locale encoding, explicitly specifyingencoding='utf-8'ensures consistent behavior across systems.Affected locations: lines 182, 189, 351, 450.
Example fix
- with open(self._state_file_path, 'w') as f: + with open(self._state_file_path, 'w', encoding='utf-8') as f: json.dump(state, f, indent=2)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 182 - 183, Update all open() calls in src/utils/local_executor.py to specify encoding='utf-8' to ensure consistent file encoding; for example, in the state writer inside the LocalExecutor class where you call with open(self._state_file_path, 'w') as f: json.dump(...), add encoding='utf-8', and similarly update the other open() calls referenced (the ones around the blocks using self._state_file_path, the read/write at the region near line ~351, and the open call near line ~450) so all reads and writes use open(..., encoding='utf-8').
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 759-774: The test_jinja_block_detected test currently uses the
nonstandard marker '{%% ... %%}' which mirrors an existing bug; update the spec
string to use real Jinja block syntax '{% ... %}' so the test detects standard
Jinja tags; locate the test_jinja_block_detected function and the temporary spec
created for run_workflow_locally and replace occurrences of '{%%' and '%%}' with
'{%' and '%}' respectively, keeping the rest of the assertion flow unchanged.
---
Nitpick comments:
In `@src/utils/local_executor.py`:
- Around line 182-183: Update all open() calls in src/utils/local_executor.py to
specify encoding='utf-8' to ensure consistent file encoding; for example, in the
state writer inside the LocalExecutor class where you call with
open(self._state_file_path, 'w') as f: json.dump(...), add encoding='utf-8', and
similarly update the other open() calls referenced (the ones around the blocks
using self._state_file_path, the read/write at the region near line ~351, and
the open call near line ~450) so all reads and writes use open(...,
encoding='utf-8').
In `@src/utils/tests/test_local_executor.py`:
- Around line 159-160: Replace the three tests that use
self.assertRaises(Exception) to assert a more specific error type when calling
executor.load_spec; locate the assertions in the test file where
executor.load_spec(spec_text) is expected to fail and change the raised type to
the appropriate specific exception (e.g., ValueError or
pydantic.ValidationError) that the load_spec implementation actually raises so
the tests validate the correct error path.
- Around line 1-17: Add the same pylint suppression for long lines at the top of
this test module as in production: insert a module-level pylint disable for
"line-too-long" (e.g., the same "# pylint: disable=line-too-long" comment used
in the production file) so the long copyright/header string in
test_local_executor.py won't trigger the linter.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 6eefeced-a5c1-410f-95df-3ce162b20399
📒 Files selected for processing (3)
src/cli/local.pysrc/utils/local_executor.pysrc/utils/tests/test_local_executor.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/cli/local.py
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (4)
src/utils/local_executor.py (3)
280-290:⚠️ Potential issue | 🟠 MajorReject
privilegedandhostNetworkin local mode.Local execution currently accepts these flags but ignores them, so the same workflow can be accepted here with materially different container semantics than on the cluster. They should be added to
unsupported_featuresalongsidevolumeMounts.Suggested fix
if task_spec.volumeMounts: unsupported_features.append( f'Task "{task_spec.name}": volumeMounts require cluster-level host paths') + if task_spec.privileged: + unsupported_features.append( + f'Task "{task_spec.name}": privileged containers are not supported by local executor') + if task_spec.hostNetwork: + unsupported_features.append( + f'Task "{task_spec.name}": hostNetwork is not supported by local executor')🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 280 - 290, Add checks that reject task_spec.privileged and task_spec.hostNetwork in local mode by appending explanatory strings to the existing unsupported_features list (same style as the existing checks for volumeMounts, checkpoint, and credentials). Locate the block that inspects task_spec (the code that appends to unsupported_features for task_spec.volumeMounts / task_spec.checkpoint / task_spec.credentials) and add two entries: one like f'Task "{task_spec.name}": privileged containers are not supported in local mode' for task_spec.privileged, and one like f'Task "{task_spec.name}": hostNetwork is not supported in local mode' for task_spec.hostNetwork, so these flags are explicitly rejected rather than ignored.
446-468:⚠️ Potential issue | 🔴 CriticalDon't delete caller-supplied
work_diron success.This helper still
rmtree()s any successfulwork_dir, even when it came from--work-dir. That can wipe user data after a normal run; only the temporary directory created inside this function should be auto-cleaned. This also means the current success-cleanup test is asserting the wrong contract.Suggested fix
- if work_dir is None: + created_temp_work_dir = work_dir is None + if created_temp_work_dir: work_dir = tempfile.mkdtemp(prefix='osmo-local-') logger.info('Using temporary work directory: %s', work_dir) @@ - if not keep_work_dir and success: + if created_temp_work_dir and not keep_work_dir and success: logger.info('Cleaning up work directory: %s', work_dir) shutil.rmtree(work_dir, ignore_errors=True)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 446 - 468, The code currently always deletes work_dir on success even when the caller supplied it; modify the logic that creates the temp dir to record ownership (e.g., set a flag like created_work_dir = True only when tempfile.mkdtemp(...) is called), and change the cleanup condition to only rmtree when created_work_dir is True (and not keep_work_dir and success). Update references around LocalExecutor(...), spec = executor.load_spec(...), and the final cleanup block to use that ownership flag and adjust the success-cleanup test to assert that caller-supplied work_dir is preserved.
1-17:⚠️ Potential issue | 🟡 MinorAdd the local
pylintsuppression for the SPDX header.Line 2 is still over the repo's 100-character limit, so this new module needs the file-level
line-too-longsuppression instead of reflowing the copyright text.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 1 - 17, Add a file-level pylint suppression for the SPDX header in this module by inserting a "# pylint: disable=line-too-long" comment at the top of local_executor.py (as a module-level suppression, e.g., immediately before or immediately after the module docstring) so the long copyright line is exempt from the 100-character rule.src/cli/local.py (1)
1-17:⚠️ Potential issue | 🟡 MinorAdd the local
pylintsuppression for the SPDX header.Line 2 is still over the repo's 100-character limit, so this new CLI module needs the file-level
line-too-longsuppression.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cli/local.py` around lines 1 - 17, The SPDX header exceeds the 100-character pylint limit; add a file-level pylint suppression for line-too-long in this module by placing a "# pylint: disable=line-too-long" comment at the top of src/cli/local.py (immediately after the module docstring or as the first non-docstring line) so the long copyright/SPDX lines are ignored by pylint; ensure the comment targets "line-too-long" specifically and avoid modifying the existing SPDX text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cli/local.py`:
- Around line 76-87: The try/except should also catch filesystem-related errors
from local_executor.run_workflow_locally (e.g., FileNotFoundError and
PermissionError) and handle them the same way as ValueError: print the error
message to stderr and sys.exit(1). Update the except block around the call to
local_executor.run_workflow_locally to catch FileNotFoundError and
PermissionError (either as separate except clauses or combined with the existing
ValueError) and ensure the handler prints f'Error: {error}' to sys.stderr and
exits with code 1.
In `@src/utils/local_executor.py`:
- Around line 116-124: Reset the executor state and ensure task output dirs are
cleaned before scheduling: in execute() clear self._results at the start, then
update _setup_directories() (or call a small helper before scheduling) to remove
and re-create per-task output directories when not doing a full resume; when
resume or from_step is set, call _restore_completed_tasks(from_step) first and
then delete output directories for tasks that are not in the restored completed
set so stale files can't be reused. Reference execute(), _setup_directories(),
_restore_completed_tasks(), and the self._results member when making these
changes.
In `@src/utils/tests/test_local_executor.py`:
- Around line 1-17: Add a file-level pylint suppression for the SPDX header in
the test module test_local_executor.py by inserting a module-level comment to
disable the "line-too-long" lint (i.e. the pylint disable for line-too-long)
near the top of the file so the long copyright line is allowed; ensure the
directive is present before or immediately after the file docstring so pylint
skips the long SPDX lines.
---
Duplicate comments:
In `@src/cli/local.py`:
- Around line 1-17: The SPDX header exceeds the 100-character pylint limit; add
a file-level pylint suppression for line-too-long in this module by placing a "#
pylint: disable=line-too-long" comment at the top of src/cli/local.py
(immediately after the module docstring or as the first non-docstring line) so
the long copyright/SPDX lines are ignored by pylint; ensure the comment targets
"line-too-long" specifically and avoid modifying the existing SPDX text.
In `@src/utils/local_executor.py`:
- Around line 280-290: Add checks that reject task_spec.privileged and
task_spec.hostNetwork in local mode by appending explanatory strings to the
existing unsupported_features list (same style as the existing checks for
volumeMounts, checkpoint, and credentials). Locate the block that inspects
task_spec (the code that appends to unsupported_features for
task_spec.volumeMounts / task_spec.checkpoint / task_spec.credentials) and add
two entries: one like f'Task "{task_spec.name}": privileged containers are not
supported in local mode' for task_spec.privileged, and one like f'Task
"{task_spec.name}": hostNetwork is not supported in local mode' for
task_spec.hostNetwork, so these flags are explicitly rejected rather than
ignored.
- Around line 446-468: The code currently always deletes work_dir on success
even when the caller supplied it; modify the logic that creates the temp dir to
record ownership (e.g., set a flag like created_work_dir = True only when
tempfile.mkdtemp(...) is called), and change the cleanup condition to only
rmtree when created_work_dir is True (and not keep_work_dir and success). Update
references around LocalExecutor(...), spec = executor.load_spec(...), and the
final cleanup block to use that ownership flag and adjust the success-cleanup
test to assert that caller-supplied work_dir is preserved.
- Around line 1-17: Add a file-level pylint suppression for the SPDX header in
this module by inserting a "# pylint: disable=line-too-long" comment at the top
of local_executor.py (as a module-level suppression, e.g., immediately before or
immediately after the module docstring) so the long copyright line is exempt
from the 100-character rule.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: cde946a5-8785-4b48-9d4f-4f11c1022077
📒 Files selected for processing (3)
src/cli/local.pysrc/utils/local_executor.pysrc/utils/tests/test_local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/utils/local_executor.py (1)
116-124:⚠️ Potential issue | 🟠 MajorClear
self._resultsat the start ofexecute()to prevent stale state.
execute()builds the DAG and clears_task_nodes, but_resultsis never reset. If an executor instance is reused (e.g., for different specs or repeated runs), stale results from a previous execution could incorrectly mark tasks as completed—especially for tasks with the same name across different workflow specs.Proposed fix
def execute(self, spec: workflow_module.WorkflowSpec, resume: bool = False, from_step: str | None = None) -> bool: """Run all tasks in topological order, returning True if the entire workflow succeeds.""" + self._results.clear() self._build_dag(spec) self._validate_for_local(spec)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 116 - 124, The execute() method can reuse stale state in the instance because self._results is not reset; modify execute (the method named execute in this class) to clear self._results (e.g., set to an empty dict) at the start of the method before DAG execution/restoration logic runs so prior run results don't mark tasks as completed, ensuring this reset happens before calling _build_dag, _validate_for_local, _setup_directories and before _restore_completed_tasks/from_step logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 1-17: The SPDX header block in the test file has lines over 100
characters and needs the pylint suppression; add a "# pylint:
disable=line-too-long" comment associated with the SPDX header (e.g.,
immediately after the header block or on the SPDX-License-Identifier line) so
pylint ignores the long copyright lines; target the existing SPDX header /
"SPDX-License-Identifier" lines in src/utils/tests/test_local_executor.py when
applying this change.
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 116-124: The execute() method can reuse stale state in the
instance because self._results is not reset; modify execute (the method named
execute in this class) to clear self._results (e.g., set to an empty dict) at
the start of the method before DAG execution/restoration logic runs so prior run
results don't mark tasks as completed, ensuring this reset happens before
calling _build_dag, _validate_for_local, _setup_directories and before
_restore_completed_tasks/from_step logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 6ff38215-d7e5-45ad-a464-77e6c436e296
📒 Files selected for processing (3)
src/cli/local.pysrc/utils/local_executor.pysrc/utils/tests/test_local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/utils/tests/test_local_executor.py (1)
580-746: Cover the remaining_validate_for_local()rejection branches.
src/utils/local_executor.py:262-303also rejectscredentials,checkpoint,volumeMounts,privileged, andhostNetwork, but this suite only exercises dataset/URL inputs/outputs plus the allowedfiles/environmentpaths. A smallsubTesttable for the remaining fields would keep the unsupported-feature matrix from drifting untested.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 580 - 746, Add unit tests that exercise the remaining rejection branches in LocalExecutor._validate_for_local by creating small spec variants that include each unsupported field: credentials, checkpoint, volumeMounts, privileged, and hostNetwork; for each field use a subTest row that loads the spec via LocalExecutor._make_executor()/load_spec(), calls _build_dag(spec) and asserts _validate_for_local(spec) raises ValueError, and verify the raised message mentions the offending feature (e.g., assertIn('credentials', str(exc))) so all branches in _validate_for_local are covered.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 817-818: Move the negative-path tests out from under the
class-level `@unittest.skipUnless`(DOCKER_AVAILABLE, SKIP_DOCKER_MSG) decorator so
they run even when Docker is unavailable: locate the TestDockerExecution class
and extract the test methods test_docker_not_found_graceful_failure,
test_unsupported_spec_*, test_template_hello_world, and
test_nonexistent_file_raises into a non-Docker-gated test class (or place
method-level skips only on tests that actually call containers), since these
tests exercise early logic in run_workflow_locally() and _validate_for_local()
(invoked during executor.execute()) and do not require Docker; ensure the moved
tests still import/instantiate the same helper utilities and keep any
Docker-specific assertions confined to tests that truly start containers.
- Around line 1321-1326: The helper _run_cookbook_spec currently calls
self.skipTest when the cookbook spec file is missing; change this to a fail-fast
assertion by replacing the skip with self.assertTrue(os.path.exists(spec_path),
f'Cookbook file not found: {spec_path}') so missing data files cause a test
failure rather than silently skipping; update the logic inside
_run_cookbook_spec (referenced symbol) to assert existence before calling
run_workflow_locally.
---
Nitpick comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 580-746: Add unit tests that exercise the remaining rejection
branches in LocalExecutor._validate_for_local by creating small spec variants
that include each unsupported field: credentials, checkpoint, volumeMounts,
privileged, and hostNetwork; for each field use a subTest row that loads the
spec via LocalExecutor._make_executor()/load_spec(), calls _build_dag(spec) and
asserts _validate_for_local(spec) raises ValueError, and verify the raised
message mentions the offending feature (e.g., assertIn('credentials', str(exc)))
so all branches in _validate_for_local are covered.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a22922da-486c-48bc-9539-e19174804a2d
📒 Files selected for processing (1)
src/utils/tests/test_local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (4)
src/cli/local.py (1)
1-3:⚠️ Potential issue | 🟡 MinorMove the pylint directive outside the docstring.
# pylint: disable=line-too-longis currently inside the triple-quoted string, so pylint will not treat it as a directive.Suggested fix
+# pylint: disable=line-too-long """ -SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cli/local.py` around lines 1 - 3, The pylint directive is currently inside the module docstring (the top triple-quoted string) so it is ignored; move the `# pylint: disable=line-too-long` comment outside the docstring (immediately after the closing triple quotes or at the top of the file) so pylint recognizes it, leaving the copyright text unchanged and keeping the directive adjacent to the module docstring for clarity.src/utils/tests/test_local_executor.py (1)
1-3:⚠️ Potential issue | 🟡 MinorPlace the pylint long-line suppression as a real comment, not inside the docstring.
Current placement won’t be interpreted by pylint.
Suggested fix
+# pylint: disable=line-too-long """ -SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 1 - 3, The copyright header currently embeds "# pylint: disable=line-too-long" inside the module docstring, which pylint will ignore; remove that token from the docstring and place a real inline comment (e.g., "# pylint: disable=line-too-long") on its own line immediately above or below the docstring so the suppression is recognized; update the module-level string in src/utils/tests/test_local_executor.py accordingly.src/utils/local_executor.py (2)
120-129:⚠️ Potential issue | 🟠 MajorReset execution state and clean pending task outputs before scheduling.
execute()currently reusesself._resultsacross runs and does not purge stale output directories for tasks that will run again. This can incorrectly skip tasks or consume stale artifacts.Suggested fix
def execute(self, spec: workflow_module.WorkflowSpec, resume: bool = False, from_step: str | None = None) -> bool: """Run all tasks in topological order, returning True if the entire workflow succeeds.""" + self._results.clear() self._build_dag(spec) self._validate_for_local(spec) - self._setup_directories() if resume or from_step: self._restore_completed_tasks(from_step) + + self._setup_directories() + for task_name in self._task_nodes: + if task_name in self._results: + continue + task_dir = os.path.join(self._work_dir, task_name) + shutil.rmtree(task_dir, ignore_errors=True) + os.makedirs(os.path.join(task_dir, 'output'), exist_ok=True)Also applies to: 309-314
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 120 - 129, The execute() method is reusing self._results and leaving stale task output directories, causing skipped or stale artifacts; fix by resetting execution state and cleaning pending outputs before scheduling: in execute (and the analogous flow used around the resume/from_step logic), clear or reinitialize self._results, clear any per-run state used to mark tasks as completed, and remove/clean output directories for tasks that will be re-run (i.e., tasks not restored as completed by _restore_completed_tasks) before creating the scheduling dag; ensure these changes integrate with _build_dag, _setup_directories and _restore_completed_tasks so restored tasks keep their outputs but any pending/rerun tasks have their outputs removed.
1-3:⚠️ Potential issue | 🟡 MinorMake the pylint suppression effective by moving it outside the module docstring.
The current inline text is part of the string literal and won’t suppress linting.
Suggested fix
+# pylint: disable=line-too-long """ -SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 1 - 3, The pylint suppression is currently inside the module docstring and therefore not effective; move the "# pylint: disable=line-too-long" token out of the triple-quoted string so it is a real comment. Specifically, edit the top of the module around the module docstring (the triple-quoted string containing the SPDX copyright text) to place the pylint disable comment on its own line either immediately above or immediately below the docstring so that the suppression applies.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cli/local.py`:
- Around line 27-38: Add entries to the Codebase Structure section of AGENTS.md
documenting the new local-execution components: describe the new CLI subcommand
implemented by setup_parser in src/cli/local.py (the 'local' subcommand with its
nested 'run' action) and the utility module src/utils/local_executor.py (explain
its purpose, e.g., orchestrating local Docker workflow execution and any primary
class or function such as LocalExecutor or execute_locally). Keep the
description concise, include where to find usage examples or flags (mention
'run' subcommand), and note any high-level responsibilities so readers know
these files manage local workflow execution.
In `@src/utils/local_executor.py`:
- Around line 359-364: The code joins file_spec.path into files_dir without
sanitizing, allowing path traversal (e.g., ../../) to escape files_dir; update
the materialization logic (the loop over task_spec.files in local_executor.py
that calls self._substitute_tokens and computes host_path) to: 1) normalize and
resolve the target path using os.path.abspath/os.path.realpath for both
files_dir and the final host_path, 2) ensure the resolved host_path is inside
the resolved files_dir using os.path.commonpath (or os.path.commonprefix
carefully) and reject or raise an exception if it would escape, and 3) only
create/write files when the containment check passes; apply the same
sanitization/containment check to the other inline-file materialization site
(the similar block handling file_spec.path later in the file).
---
Duplicate comments:
In `@src/cli/local.py`:
- Around line 1-3: The pylint directive is currently inside the module docstring
(the top triple-quoted string) so it is ignored; move the `# pylint:
disable=line-too-long` comment outside the docstring (immediately after the
closing triple quotes or at the top of the file) so pylint recognizes it,
leaving the copyright text unchanged and keeping the directive adjacent to the
module docstring for clarity.
In `@src/utils/local_executor.py`:
- Around line 120-129: The execute() method is reusing self._results and leaving
stale task output directories, causing skipped or stale artifacts; fix by
resetting execution state and cleaning pending outputs before scheduling: in
execute (and the analogous flow used around the resume/from_step logic), clear
or reinitialize self._results, clear any per-run state used to mark tasks as
completed, and remove/clean output directories for tasks that will be re-run
(i.e., tasks not restored as completed by _restore_completed_tasks) before
creating the scheduling dag; ensure these changes integrate with _build_dag,
_setup_directories and _restore_completed_tasks so restored tasks keep their
outputs but any pending/rerun tasks have their outputs removed.
- Around line 1-3: The pylint suppression is currently inside the module
docstring and therefore not effective; move the "# pylint:
disable=line-too-long" token out of the triple-quoted string so it is a real
comment. Specifically, edit the top of the module around the module docstring
(the triple-quoted string containing the SPDX copyright text) to place the
pylint disable comment on its own line either immediately above or immediately
below the docstring so that the suppression applies.
In `@src/utils/tests/test_local_executor.py`:
- Around line 1-3: The copyright header currently embeds "# pylint:
disable=line-too-long" inside the module docstring, which pylint will ignore;
remove that token from the docstring and place a real inline comment (e.g., "#
pylint: disable=line-too-long") on its own line immediately above or below the
docstring so the suppression is recognized; update the module-level string in
src/utils/tests/test_local_executor.py accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 15fe56f2-ae20-4bf5-a292-03af00b2f491
📒 Files selected for processing (5)
cookbook/tutorials/BUILDsrc/cli/local.pysrc/utils/local_executor.pysrc/utils/tests/BUILDsrc/utils/tests/test_local_executor.py
✅ Files skipped from review due to trivial changes (1)
- cookbook/tutorials/BUILD
🚧 Files skipped from review as they are similar to previous changes (1)
- src/utils/tests/BUILD
There was a problem hiding this comment.
♻️ Duplicate comments (2)
src/utils/local_executor.py (2)
120-128:⚠️ Potential issue | 🟠 MajorClear
_resultsat the start ofexecute()to support executor reuse.If
execute()is called multiple times on the sameLocalExecutorinstance (e.g., running different specs or retrying), stale results from the previous run persist in_results. This can cause tasks to be incorrectly skipped or produce wrong scheduling decisions. The_build_dag()method clears_task_nodesbut_resultsis never reset.Proposed fix
def execute(self, spec: workflow_module.WorkflowSpec, resume: bool = False, from_step: str | None = None) -> bool: """Run all tasks in topological order, returning True if the entire workflow succeeds.""" + self._results.clear() self._build_dag(spec) self._validate_for_local(spec)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 120 - 128, The executor retains stale task outcomes because _results is not reset when execute() runs again; modify LocalExecutor.execute to clear the instance result store (reset self._results) at the start of the method before calling _build_dag/_validate_for_local so previous-run results won't affect scheduling or skipping; ensure this reset happens prior to the resume/from_step logic so _restore_completed_tasks can repopulate _results correctly when resuming.
385-388:⚠️ Potential issue | 🟠 MajorRemove literal quotes from GPU device argument.
When using
subprocess.runwith a list (noshell=True), arguments are passed directly without shell interpretation. The literal double-quotes in the f-string become part of the argument value passed to Docker, e.g.,"device=0,1"instead ofdevice=0,1. Docker's--gpusflag expects the unquoted value when receiving arguments via direct invocation.Proposed fix
elif gpu_count > available: logger.warning( 'Task "%s" requests %d GPU(s) but only %d available — running with %d GPU(s)', node.name, gpu_count, available, available) - docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(available))}"'] + docker_args += ['--gpus', f'device={",".join(str(i) for i in range(available))}'] else: - docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(gpu_count))}"'] + docker_args += ['--gpus', f'device={",".join(str(i) for i in range(gpu_count))}']🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 385 - 388, The GPU device string is being built with literal double-quotes so docker_args gets entries like '"device=0,1"' which pass the quotes into Docker; update the two places that append to docker_args (the branches that use available and gpu_count) to remove the surrounding double-quotes in the f-strings so they produce device=0,1 (e.g., change f'"device={...}"' to f'device={...}'), keeping the rest of the list-based subprocess invocation and logger.info(node.name, gpu_count, available) unchanged.
🧹 Nitpick comments (1)
src/utils/tests/test_local_executor.py (1)
751-817: Consider making the class attribute immutable or documenting it's intentionally shared.The
_UNSUPPORTED_SPECSdictionary is a mutable class attribute (RUF012). While this is safe since the tests only read from it, making it a module-level constant or usingtypes.MappingProxyTypewould better express the intent.Optional: Move to module level as a constant
+# Test data for unsupported feature validation +_UNSUPPORTED_SPECS = { + 'credentials': { + 'yaml': textwrap.dedent('''\ + ... + '''), + 'expected_substring': 'credentials', + }, + # ... rest of specs +} + + class TestValidateForLocalRemainingBranches(unittest.TestCase): """Verify that _validate_for_local rejects credentials, checkpoint, volumeMounts, privileged, and hostNetwork.""" - _UNSUPPORTED_SPECS = { - ... - } - def test_unsupported_fields_rejected(self): """Each unsupported task-level field is detected and rejected with a descriptive error.""" - for feature, case in self._UNSUPPORTED_SPECS.items(): + for feature, case in _UNSUPPORTED_SPECS.items():🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 751 - 817, The _UNSUPPORTED_SPECS class attribute is a mutable dict; make its immutability explicit by either moving it to module-level as a constant (e.g., UNSUPPORTED_SPECS) or wrapping it with types.MappingProxyType before assigning back to _UNSUPPORTED_SPECS so consumers cannot mutate it; update any references to the attribute accordingly and/or add a short comment documenting that this is intentionally shared/read-only.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 120-128: The executor retains stale task outcomes because _results
is not reset when execute() runs again; modify LocalExecutor.execute to clear
the instance result store (reset self._results) at the start of the method
before calling _build_dag/_validate_for_local so previous-run results won't
affect scheduling or skipping; ensure this reset happens prior to the
resume/from_step logic so _restore_completed_tasks can repopulate _results
correctly when resuming.
- Around line 385-388: The GPU device string is being built with literal
double-quotes so docker_args gets entries like '"device=0,1"' which pass the
quotes into Docker; update the two places that append to docker_args (the
branches that use available and gpu_count) to remove the surrounding
double-quotes in the f-strings so they produce device=0,1 (e.g., change
f'"device={...}"' to f'device={...}'), keeping the rest of the list-based
subprocess invocation and logger.info(node.name, gpu_count, available)
unchanged.
---
Nitpick comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 751-817: The _UNSUPPORTED_SPECS class attribute is a mutable dict;
make its immutability explicit by either moving it to module-level as a constant
(e.g., UNSUPPORTED_SPECS) or wrapping it with types.MappingProxyType before
assigning back to _UNSUPPORTED_SPECS so consumers cannot mutate it; update any
references to the attribute accordingly and/or add a short comment documenting
that this is intentionally shared/read-only.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 08e56e8c-6b0c-47d6-a623-97e46ae520b2
📒 Files selected for processing (3)
AGENTS.mdsrc/utils/local_executor.pysrc/utils/tests/test_local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
src/utils/local_executor.py (2)
120-129:⚠️ Potential issue | 🟠 MajorRestore state before creating task output directories.
_setup_directories()recreates every output directory before_restore_completed_tasks()checksos.path.isdir(info['output_dir']), so resume can skip tasks against brand-new empty dirs. It also leaves stale outputs in place for rerun /from_steptasks.Also applies to: 204-209, 310-314
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 120 - 129, The code currently calls _setup_directories() before _restore_completed_tasks(), causing freshly recreated output dirs to prevent detecting completed tasks; change the order in execute (and in the other places noted) so that _restore_completed_tasks(from_step) is invoked before _setup_directories(); search for all call sites where _setup_directories() precedes _restore_completed_tasks() and reorder them (e.g., inside the execute method and the other similar run/restore entrypoints) to ensure restoration of completed-task state happens prior to directory creation.
1-2:⚠️ Potential issue | 🟡 MinorMove the
pylintpragma out of the module string.
# pylint: disable=line-too-longis currently part of the docstring text, so the linter never sees it.Suggested fix
-""" -SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +# pylint: disable=line-too-long +""" +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 1 - 2, The module-level docstring currently contains the pylint pragma "# pylint: disable=line-too-long" as plain text; remove that pragma from inside the triple-quoted string and add it as a standalone comment immediately after the closing docstring (or at top of the file) so pylint will recognize it; locate the module docstring in local_executor.py and move the pragma out of the string into a top-level comment (keeping the copyright text unchanged).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/local_executor.py`:
- Around line 466-489: The temp work_dir is created before validating spec_text,
causing leaked dirs on early failures; either move the work_dir creation (the
created_work_dir/work_dir logic) to after the Jinja/template check (i.e., after
reading spec_text and verifying template_markers) or wrap the creation +
subsequent processing in a try/finally that will call shutil.rmtree(work_dir,
ignore_errors=True) when created_work_dir is true and keep_work_dir is false,
ensuring cleanup on exceptions; update the code paths around created_work_dir,
work_dir, LocalExecutor(...) and executor.load_spec/executor.execute to use the
new placement or the finally cleanup.
- Around line 180-185: The current state serialization in local_executor.py only
writes task names, exit codes and output dirs into the state dict (constructed
from self._results), which allows stale outputs to be reused across different
workflow revisions; update the state written to .osmo-state.json to include a
workflow-level hash (or per-task signature) derived from the current workflow
spec (e.g., spec hash or task signatures) when creating the state dict, and add
logic in the resume/load path that reads that stored hash and compares it to the
current workflow hash, refusing to accept/resume from the saved state if the
hashes differ and returning a clear error; ensure the change touches the state
construction (where state = {...}) and the corresponding state load/validation
code paths referenced around lines 204-221 so mismatched workflows are rejected.
- Around line 373-392: The current logic only appends '--shm-size' when GPUs are
requested, so a user-specified self._shm_size is ignored for CPU-only tasks;
move the shm size handling out of the GPU branch in LocalExecutor (referencing
docker_args, _task_gpu_count, _detect_available_gpus, self._shm_size and
DEFAULT_SHM_SIZE) so that after GPU detection/argument construction you always
compute shm_size = self._shm_size or self.DEFAULT_SHM_SIZE and append
['--shm-size', shm_size] to docker_args; ensure the value is string-compatible
with the docker args.
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 120-129: The code currently calls _setup_directories() before
_restore_completed_tasks(), causing freshly recreated output dirs to prevent
detecting completed tasks; change the order in execute (and in the other places
noted) so that _restore_completed_tasks(from_step) is invoked before
_setup_directories(); search for all call sites where _setup_directories()
precedes _restore_completed_tasks() and reorder them (e.g., inside the execute
method and the other similar run/restore entrypoints) to ensure restoration of
completed-task state happens prior to directory creation.
- Around line 1-2: The module-level docstring currently contains the pylint
pragma "# pylint: disable=line-too-long" as plain text; remove that pragma from
inside the triple-quoted string and add it as a standalone comment immediately
after the closing docstring (or at top of the file) so pylint will recognize it;
locate the module docstring in local_executor.py and move the pragma out of the
string into a top-level comment (keeping the copyright text unchanged).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: b1e87e8f-5942-4678-9825-714d120cd138
📒 Files selected for processing (1)
src/utils/local_executor.py
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
♻️ Duplicate comments (3)
src/utils/local_executor.py (3)
477-490:⚠️ Potential issue | 🟡 MinorClean up auto-created temp dirs on exception paths too.
The temp directory is only removed when
successis returned. Ifload_spec()orexecute()raises after Line 479,osmo-local-*is leaked even withkeep_work_dir=False. Wrap the executor calls intry/finallyand delete auto-created dirs whenever the run aborts.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 477 - 490, The auto-created temp dir cleanup is only done on success; wrap the calls to executor.load_spec(spec_text) and executor.execute(...) in a try/finally so the auto-created work_dir (tracked by created_work_dir) is removed on all exit paths when keep_work_dir is False; specifically, create the LocalExecutor(work_dir=work_dir, ...) then call load_spec and execute inside a try block and in the finally check if created_work_dir and not keep_work_dir and then shutil.rmtree(work_dir, ignore_errors=True), re-raising any exception after cleanup so errors are not swallowed.
120-129:⚠️ Potential issue | 🟠 MajorClean non-resumed task directories before scheduling.
_setup_directories()only creates<task>/output, and it runs before_restore_completed_tasks(). Reusing awork_dirtherefore preserves stale files for tasks that will rerun, and a deleted output directory gets recreated before the resume check, so a task can be skipped with empty outputs. Restore first, then wipe/recreate dirs only for tasks that are not resumed.Also applies to: 310-314
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 120 - 129, The execute method currently calls _setup_directories() before _restore_completed_tasks(), which recreates task output dirs and preserves stale files for tasks that will be re-run; change the flow to call _restore_completed_tasks(from_step) immediately after _build_dag(spec) and _validate_for_local(spec) so resume state is known first, then call _setup_directories() with information about which tasks were restored (or add a parameter/flag to _setup_directories to accept a set of resumed task names) and modify _setup_directories() to only wipe/recreate work_dir/output for tasks that are NOT resumed; ensure _restore_completed_tasks and any helpers return or expose the resumed task list so _setup_directories/_schedule logic can skip cleaning/recreation for those tasks (refer to execute, _restore_completed_tasks, _setup_directories, work_dir, and output).
178-221:⚠️ Potential issue | 🟠 MajorBind resume state to the current workflow revision.
.osmo-state.jsonstill only records task names and output paths. If the spec changes but task names stay stable,--resume/--from-stepcan skip incompatible work and reuse stale outputs. Persist a workflow/spec hash and reject mismatches during restore.
🧹 Nitpick comments (1)
src/utils/tests/test_local_executor.py (1)
1619-1635: This doesn't actually test the override path.Passing
docker_cmd='docker'hits the default value, so the test still passes if the override is ignored. Use a sentinel command withsubprocess.runmocked and assertargv[0]matches it; that also removes the Docker dependency for this case.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 1619 - 1635, The test test_custom_docker_command is not verifying the override path because passing docker_cmd='docker' equals the default; change the test to supply a sentinel command (e.g. 'my-sentinel-docker') to LocalExecutor(docker_cmd='my-sentinel-docker'), mock subprocess.run, call executor.execute(spec) as before, and assert the mocked subprocess.run was called once and that its first argument (argv[0]) equals the sentinel; reference LocalExecutor, docker_cmd, test_custom_docker_command, and subprocess.run to locate and update the test and remove the real Docker dependency.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 477-490: The auto-created temp dir cleanup is only done on
success; wrap the calls to executor.load_spec(spec_text) and
executor.execute(...) in a try/finally so the auto-created work_dir (tracked by
created_work_dir) is removed on all exit paths when keep_work_dir is False;
specifically, create the LocalExecutor(work_dir=work_dir, ...) then call
load_spec and execute inside a try block and in the finally check if
created_work_dir and not keep_work_dir and then shutil.rmtree(work_dir,
ignore_errors=True), re-raising any exception after cleanup so errors are not
swallowed.
- Around line 120-129: The execute method currently calls _setup_directories()
before _restore_completed_tasks(), which recreates task output dirs and
preserves stale files for tasks that will be re-run; change the flow to call
_restore_completed_tasks(from_step) immediately after _build_dag(spec) and
_validate_for_local(spec) so resume state is known first, then call
_setup_directories() with information about which tasks were restored (or add a
parameter/flag to _setup_directories to accept a set of resumed task names) and
modify _setup_directories() to only wipe/recreate work_dir/output for tasks that
are NOT resumed; ensure _restore_completed_tasks and any helpers return or
expose the resumed task list so _setup_directories/_schedule logic can skip
cleaning/recreation for those tasks (refer to execute, _restore_completed_tasks,
_setup_directories, work_dir, and output).
---
Nitpick comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 1619-1635: The test test_custom_docker_command is not verifying
the override path because passing docker_cmd='docker' equals the default; change
the test to supply a sentinel command (e.g. 'my-sentinel-docker') to
LocalExecutor(docker_cmd='my-sentinel-docker'), mock subprocess.run, call
executor.execute(spec) as before, and assert the mocked subprocess.run was
called once and that its first argument (argv[0]) equals the sentinel; reference
LocalExecutor, docker_cmd, test_custom_docker_command, and subprocess.run to
locate and update the test and remove the real Docker dependency.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 3d64adbb-41f6-4218-a518-ebe2310388f0
📒 Files selected for processing (2)
src/utils/local_executor.pysrc/utils/tests/test_local_executor.py
|
workflow hash in state file: A feature request to detect spec changes across resume. The user explicitly chooses --resume/--from-step and controls the spec. Defining a proper "workflow hash" (images? args? env vars?) is a design decision, not a bug fix. Not fixing. _setup_directories before _restore_completed_tasks: The reviewer claims directory creation prevents detecting completed tasks. But _restore_completed_tasks gates on info['exit_code'] == 0 from the state file -- the os.path.isdir check is secondary validation, not a detection mechanism. The directories already exist from the previous run anyway (--resume requires the same --work-dir). Not fixing -- the concern is incorrect. |
vvnpn-nv
left a comment
There was a problem hiding this comment.
Thanks for the contribution, the motivation is solid and we should support it. However, there are some architectural concerns worth discussing before merging.
Key Issues
-
Container paths diverge from on-cluster behavior. On-cluster,
{{output}}resolves to/osmo/data/outputand{{input:0}}to/osmo/data/input/0. This PR mounts at host-absolute paths (/tmp/osmo-local-abc123/task1/output), so scripts that work locally may break on-cluster (or vice versa). -
"Parallel" tasks run sequentially.
_find_ready_tasks()returns all independent tasks, but theforloop executes them one at a time and bails on first failure. This masks race conditions and makes execution order non-deterministic. -
No cycle detection. Circular dependencies produce zero ready tasks. The loop never enters,
_resultsis empty, andexecute()returns True — reporting success when nothing ran. -
Task group semantics are lost. Groups are flattened to individual tasks, dropping barrier synchronization, lead-task failure policy (
ignoreNonleadStatus), and inter-task DNS ({{host:taskname}}). -
--dockerflag is silently ignored. Accepted in argparse but never passed torun_workflow_locally()orLocalExecutor. -
Unresolved Jinja variables pass through silently. Template detection checks for
{%%,{#, default-values but misses bare{{ variable }}without adefault-values:section.
Suggestion on an alternative solution to solve this problem:
Rather than a custom executor reimplementing scheduling, consider generating a docker-compose.yml and running docker compose up behind the scenes. This gives you:
- Correct paths: bind-mount at
/osmo/data/outputand/osmo/data/input/N:ro, matching on-cluster behavior - Real parallel execution and native dependency ordering via
depends_on: { condition: service_completed_successfully } - Cycle detection for free (Compose validates the DAG)
- Group DNS — Docker Compose service names are DNS-addressable, so
{{host:taskname}}just works - No custom scheduler to maintain — Compose handles lifecycle, logs, cleanup, and failure propagation
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/utils/local_executor.py (1)
477-493:⚠️ Potential issue | 🟡 MinorTemp directory leaked on exceptions after creation.
If
executor.load_spec()orexecutor.execute()raises an exception, the auto-created temp directory at line 479 is not cleaned up. The cleanup logic at lines 488-490 only runs on the normal return path.Suggested fix using try/finally
created_work_dir = work_dir is None if work_dir is None: work_dir = tempfile.mkdtemp(prefix='osmo-local-') logger.info('Using temporary work directory: %s', work_dir) - executor = LocalExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir, - docker_cmd=docker_cmd, shm_size=shm_size) - spec = executor.load_spec(spec_text) - success = executor.execute(spec, resume=resume or from_step is not None, - from_step=from_step) - - if created_work_dir and not keep_work_dir and success: - logger.info('Cleaning up work directory: %s', work_dir) - shutil.rmtree(work_dir, ignore_errors=True) - elif not success: - logger.info('Work directory preserved for debugging: %s', work_dir) - - return success + try: + executor = LocalExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir, + docker_cmd=docker_cmd, shm_size=shm_size) + spec = executor.load_spec(spec_text) + success = executor.execute(spec, resume=resume or from_step is not None, + from_step=from_step) + + if created_work_dir and not keep_work_dir and success: + logger.info('Cleaning up work directory: %s', work_dir) + shutil.rmtree(work_dir, ignore_errors=True) + elif not success: + logger.info('Work directory preserved for debugging: %s', work_dir) + + return success + except Exception: + if created_work_dir and not keep_work_dir: + shutil.rmtree(work_dir, ignore_errors=True) + raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 477 - 493, The temp work_dir created when work_dir is None can leak if executor.load_spec() or executor.execute() raises; wrap the creation-to-execution block in a try/finally so that if created_work_dir and not keep_work_dir the directory is removed in the finally block. Concretely, after setting created_work_dir and creating work_dir, instantiate LocalExecutor and call executor.load_spec(...) and executor.execute(...) inside a try, set/record success as now, and in the finally check the same condition (created_work_dir and not keep_work_dir) and call shutil.rmtree(work_dir, ignore_errors=True) to guarantee cleanup on exceptions as well as on success. Ensure references to created_work_dir, work_dir, LocalExecutor, load_spec, execute, and keep_work_dir are preserved.
🧹 Nitpick comments (1)
src/utils/local_executor.py (1)
407-409: Path traversal check missing for file mount volume arguments.The path traversal check at lines 363-366 validates the file path when writing contents, but the volume mount construction at line 408 performs a separate
os.path.realpathwithout re-validating that the result stays withinfiles_dir. While an attacker would need to bypass the earlier check first, it's safer to use the already-validatedhost_pathcomputed earlier rather than recomputing it.However, looking more carefully: the second block at lines 407-409 iterates over the same
task_spec.filesand computes the same path. If the first check passed, the second would compute the same value. This is not a bug but is redundant and slightly inconsistent.Consider caching validated paths to avoid redundant computation
+ validated_file_paths: Dict[str, str] = {} for file_spec in task_spec.files: resolved_contents = self._substitute_tokens(file_spec.contents, token_map) host_path = os.path.realpath(os.path.join(files_dir, file_spec.path.lstrip('/'))) if not host_path.startswith(os.path.realpath(files_dir) + os.sep): raise ValueError( f'Task "{node.name}": file path "{file_spec.path}" escapes the task directory') os.makedirs(os.path.dirname(host_path), exist_ok=True) with open(host_path, 'w', encoding='utf-8') as f: f.write(resolved_contents) + validated_file_paths[file_spec.path] = host_path ... for file_spec in task_spec.files: - host_path = os.path.realpath(os.path.join(files_dir, file_spec.path.lstrip('/'))) + host_path = validated_file_paths[file_spec.path] docker_args += ['-v', f'{host_path}:{file_spec.path}:ro']🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 407 - 409, The volume-mount creation recomputes host_path via os.path.realpath for each entry in task_spec.files, which is redundant and risks inconsistency with the earlier path-traversal validation; instead, reuse the already-validated host paths computed during the validation phase (cache them when validating files in the earlier loop) and append mounts using that cached host_path to docker_args in the mount loop (referencing task_spec.files and docker_args), avoiding a second os.path.realpath and ensuring the mounted path remains the validated one.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/local_executor.py`:
- Around line 121-131: The problem is that _setup_directories() creates output
dirs before _restore_completed_tasks() determines which tasks will be skipped,
leaving stale files for tasks that will be re-run; change execute() so when
resume or from_step is set you call self._restore_completed_tasks(from_step)
before self._setup_directories(), and update _setup_directories() to only
create/prepare output directories for tasks that will actually run (or to clear
existing output files for tasks that are going to be re-executed). Adjust
references in execute(), and ensure any helper logic that relies on
self._build_dag() or self._results remains consistent with this ordering.
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 477-493: The temp work_dir created when work_dir is None can leak
if executor.load_spec() or executor.execute() raises; wrap the
creation-to-execution block in a try/finally so that if created_work_dir and not
keep_work_dir the directory is removed in the finally block. Concretely, after
setting created_work_dir and creating work_dir, instantiate LocalExecutor and
call executor.load_spec(...) and executor.execute(...) inside a try, set/record
success as now, and in the finally check the same condition (created_work_dir
and not keep_work_dir) and call shutil.rmtree(work_dir, ignore_errors=True) to
guarantee cleanup on exceptions as well as on success. Ensure references to
created_work_dir, work_dir, LocalExecutor, load_spec, execute, and keep_work_dir
are preserved.
---
Nitpick comments:
In `@src/utils/local_executor.py`:
- Around line 407-409: The volume-mount creation recomputes host_path via
os.path.realpath for each entry in task_spec.files, which is redundant and risks
inconsistency with the earlier path-traversal validation; instead, reuse the
already-validated host paths computed during the validation phase (cache them
when validating files in the earlier loop) and append mounts using that cached
host_path to docker_args in the mount loop (referencing task_spec.files and
docker_args), avoiding a second os.path.realpath and ensuring the mounted path
remains the validated one.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 054f9e06-260a-411c-b742-7f7a9eaeb059
📒 Files selected for processing (2)
src/utils/local_executor.pysrc/utils/tests/test_local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
src/utils/local_executor.py (1)
374-380:⚠️ Potential issue | 🟠 MajorHonor inline
task.resources.gputoo.
_task_gpu_count()only checks the namedresource:indirection fromworkflow.resources. A task that setsresources: {gpu: ...}directly will silently run without--gpus, even thoughTaskSpecexposes that field.Proposed fix
def _task_gpu_count(self, task_spec: task_module.TaskSpec, spec: workflow_module.WorkflowSpec) -> int: """Return the number of GPUs requested by a task's resource spec, defaulting to 0.""" + if task_spec.resources and task_spec.resources.gpu: + return task_spec.resources.gpu + resource_spec = spec.resources.get(task_spec.resource) if resource_spec and resource_spec.gpu: return resource_spec.gpu return 0🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 374 - 380, The _task_gpu_count function currently only looks up GPU counts via spec.resources.get(task_spec.resource) and ignores an inline GPU declaration on the TaskSpec itself; update _task_gpu_count to first check task_spec.resources.gpu (or use it as a fallback) and then fall back to resolving spec.resources.get(task_spec.resource) so tasks that set resources: { gpu: ... } are honored; reference the function _task_gpu_count, the TaskSpec field task_spec.resources.gpu, the WorkflowSpec mapping spec.resources, and the existing local variable resource_spec when implementing the check.
🧹 Nitpick comments (2)
src/utils/tests/test_local_executor.py (2)
1047-1147: Cover inlineresources.gpuin the GPU/shm tests.These cases only exercise the named
resource:lookup. Add one mocked task with inlineresources: {gpu: 1}and assert--gpus/--shm-sizeare still emitted, otherwise the task-level GPU path can drift unnoticed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 1047 - 1147, Add a new unit within test_local_executor.py that mirrors the GPU/shm tests but uses inline task-level resources (resources: {gpu: 1}) instead of a named resource block to ensure the task-level GPU path is covered; instantiate LocalExecutor and run the task via executor._run_task on the task node from executor._task_nodes (same pattern as test_gpu_task_gets_default_shm_size/test_gpu_task_gets_custom_shm_size), then assert the generated docker command contains the expected '--gpus' flag and the correct '--shm-size' value (default or custom) to verify both GPU and shm handling for inline resources.
147-190: Add a regression for empty/scalar YAML here.These invalid-spec cases still miss the
yaml.safe_load()→ non-mapping path. Onceload_spec()raisesValueErrorfor empty or scalar YAML, add a test in this section so the loader does not regress back toTypeError.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 147 - 190, Add a regression test that ensures LocalExecutor.load_spec raises ValueError (not TypeError) when yaml.safe_load returns a non-mapping (empty/null/scalar) value: add a new test method (e.g., test_empty_or_scalar_yaml_rejected) in src/utils/tests/test_local_executor.py that constructs an empty string and a scalar YAML (or "null") as spec_text, instantiates LocalExecutor, and asserts that executor.load_spec(spec_text) raises ValueError; reference LocalExecutor.load_spec and the yaml.safe_load → non-mapping path to catch regressions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/local_executor.py`:
- Around line 508-524: The temp workdir created via tempfile.mkdtemp (when
work_dir is None) can leak if LocalExecutor(...) construction,
executor.load_spec(spec_text) or executor.execute(...) raises; wrap the executor
creation, spec loading and execution (the code using LocalExecutor, load_spec
and execute) in a try/finally so that if created_work_dir is True and
keep_work_dir is False you always call shutil.rmtree(work_dir,
ignore_errors=True) on the error path before re-raising the exception, and keep
the existing conditional cleanup logic for the normal-success/keep_work_dir
cases.
- Around line 115-119: The load_spec method currently passes the result of
yaml.safe_load() directly into workflow_module.VersionedWorkflowSpec(**raw),
which crashes with TypeError for non-mapping YAML (None, int, list) instead of
producing a clean validation error; update load_spec to validate that raw is a
mapping (isinstance(raw, dict)) after safe_load and before constructing
VersionedWorkflowSpec, and if not, raise a clear ValueError (e.g. "workflow spec
must be a mapping/object") so run_workflow_locally() can handle it via the
normal validation/error path.
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 374-380: The _task_gpu_count function currently only looks up GPU
counts via spec.resources.get(task_spec.resource) and ignores an inline GPU
declaration on the TaskSpec itself; update _task_gpu_count to first check
task_spec.resources.gpu (or use it as a fallback) and then fall back to
resolving spec.resources.get(task_spec.resource) so tasks that set resources: {
gpu: ... } are honored; reference the function _task_gpu_count, the TaskSpec
field task_spec.resources.gpu, the WorkflowSpec mapping spec.resources, and the
existing local variable resource_spec when implementing the check.
---
Nitpick comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 1047-1147: Add a new unit within test_local_executor.py that
mirrors the GPU/shm tests but uses inline task-level resources (resources: {gpu:
1}) instead of a named resource block to ensure the task-level GPU path is
covered; instantiate LocalExecutor and run the task via executor._run_task on
the task node from executor._task_nodes (same pattern as
test_gpu_task_gets_default_shm_size/test_gpu_task_gets_custom_shm_size), then
assert the generated docker command contains the expected '--gpus' flag and the
correct '--shm-size' value (default or custom) to verify both GPU and shm
handling for inline resources.
- Around line 147-190: Add a regression test that ensures
LocalExecutor.load_spec raises ValueError (not TypeError) when yaml.safe_load
returns a non-mapping (empty/null/scalar) value: add a new test method (e.g.,
test_empty_or_scalar_yaml_rejected) in src/utils/tests/test_local_executor.py
that constructs an empty string and a scalar YAML (or "null") as spec_text,
instantiates LocalExecutor, and asserts that executor.load_spec(spec_text)
raises ValueError; reference LocalExecutor.load_spec and the yaml.safe_load →
non-mapping path to catch regressions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ef2a6275-33d4-4bda-a54b-73f9f876b130
📒 Files selected for processing (2)
src/utils/local_executor.pysrc/utils/tests/test_local_executor.py
There was a problem hiding this comment.
♻️ Duplicate comments (5)
src/utils/local_executor.py (4)
1-17:⚠️ Potential issue | 🟡 MinorMove the pylint pragma out of the module docstring.
The
# pylint: disable=line-too-longtext on Line 2 is part of the SPDX string, not an actual pragma, so lint still sees the overlong header.Suggested fix
+# pylint: disable=line-too-long """ -SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 1 - 17, The module docstring currently contains the pylint pragma on the same line as the SPDX copyright (inside the triple-quoted string), so the linter still flags the long line; remove the `# pylint: disable=line-too-long` text from inside the docstring and add it as a standalone comment directly above or below the opening triple quotes (e.g., place `# pylint: disable=line-too-long` immediately before the module docstring) so the pragma is recognized while keeping the SPDX text intact.
130-133:⚠️ Potential issue | 🟠 MajorClear outputs for tasks that will be re-executed on resume.
_setup_directories()only creates directories. After_restore_completed_tasks(), any task not restored keeps whatever was already under<work_dir>/<task>/output, so--resume/--from-stepcan rerun against stale artifacts.Suggested fix
self._setup_directories() if resume or from_step: self._restore_completed_tasks(from_step) + for task_name in self._task_nodes: + if task_name in self._results: + continue + task_output_dir = os.path.join(self._work_dir, task_name, 'output') + shutil.rmtree(task_output_dir, ignore_errors=True) + os.makedirs(task_output_dir, exist_ok=True)Also applies to: 368-372
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 130 - 133, After calling _restore_completed_tasks(from_step) ensure any tasks that were not restored (i.e., will be re-executed under resume or from_step) have their output directories cleared to avoid stale artifacts; update the logic around _setup_directories() and _restore_completed_tasks() so you enumerate tasks in work_dir/<task>/output, detect which tasks were not marked as completed by _restore_completed_tasks, and delete or truncate those output contents (and repeat the same clear behavior for the other code path referenced around lines 368-372) before continuing.
118-121:⚠️ Potential issue | 🟠 MajorValidate
safe_load()output before unpacking it.An empty file, scalar, or list makes
yaml.safe_load()return a non-mapping, so Line 121 raisesTypeErrorinstead of the normal spec-validation error path.Suggested fix
def load_spec(self, spec_text: str) -> workflow_module.WorkflowSpec: """Parse raw YAML text into a validated WorkflowSpec via the versioned spec model.""" raw = yaml.safe_load(spec_text) + if not isinstance(raw, dict): + raise ValueError('Workflow spec must be a YAML mapping with a top-level "workflow" key') versioned = workflow_module.VersionedWorkflowSpec(**raw) return versioned.workflow🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 118 - 121, In load_spec, validate the result of yaml.safe_load before unpacking into VersionedWorkflowSpec: check that raw is a mapping/dict (e.g., isinstance(raw, dict)) and if not raise a clear ValueError (or a WorkflowSpec-specific validation error) explaining the spec is empty or not a mapping so the caller sees a proper validation failure instead of a TypeError; then proceed to construct workflow_module.VersionedWorkflowSpec(**raw) as before.
567-580:⚠️ Potential issue | 🟡 MinorClean auto-created work dirs on exceptions too.
If
load_spec()orexecute()raises after Line 569, the function exits before the success cleanup block and leaks the temporaryosmo-local-*directory even whenkeep_work_diris false.Suggested fix
executor = LocalExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir, docker_cmd=docker_cmd, shm_size=shm_size) - spec = executor.load_spec(spec_text) - success = executor.execute(spec, resume=resume or from_step is not None, - from_step=from_step) + try: + spec = executor.load_spec(spec_text) + success = executor.execute(spec, resume=resume or from_step is not None, + from_step=from_step) + except Exception: + if created_work_dir and not keep_work_dir: + shutil.rmtree(work_dir, ignore_errors=True) + raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 567 - 580, Temporary work directories created when work_dir is None can leak if LocalExecutor.load_spec or executor.execute raises; wrap the spec loading and execution in a try/finally (or try/except+re-raise) so that if created_work_dir is True and keep_work_dir is False the finally block always calls shutil.rmtree(work_dir, ignore_errors=True) after execution or on exception; ensure you still return/propagate exceptions and only delete when created_work_dir and not keep_work_dir, referencing work_dir, created_work_dir, keep_work_dir, LocalExecutor.load_spec and LocalExecutor.execute.src/utils/tests/test_local_executor.py (1)
1-17:⚠️ Potential issue | 🟡 MinorMove the pylint pragma out of the test module docstring.
The directive on Line 2 is inside the string literal, so it does not suppress the overlong SPDX header.
Suggested fix
+# pylint: disable=line-too-long """ -SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");As per coding guidelines, "If copyright lines exceed 100 characters, add
# pylint: disable=line-too-longcomment instead of breaking into multiple lines".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 1 - 17, The `# pylint: disable=line-too-long` pragma is currently inside the module docstring (SPDX header) so it has no effect; remove that pragma from the string and add it as a standalone comment outside the docstring (for example immediately above or immediately below the module docstring) so pylint will honor it—update the top of the test module in test_local_executor.py accordingly, keeping the SPDX header text unchanged but moving the pragma out of the string.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 1-17: The module docstring currently contains the pylint pragma on
the same line as the SPDX copyright (inside the triple-quoted string), so the
linter still flags the long line; remove the `# pylint: disable=line-too-long`
text from inside the docstring and add it as a standalone comment directly above
or below the opening triple quotes (e.g., place `# pylint:
disable=line-too-long` immediately before the module docstring) so the pragma is
recognized while keeping the SPDX text intact.
- Around line 130-133: After calling _restore_completed_tasks(from_step) ensure
any tasks that were not restored (i.e., will be re-executed under resume or
from_step) have their output directories cleared to avoid stale artifacts;
update the logic around _setup_directories() and _restore_completed_tasks() so
you enumerate tasks in work_dir/<task>/output, detect which tasks were not
marked as completed by _restore_completed_tasks, and delete or truncate those
output contents (and repeat the same clear behavior for the other code path
referenced around lines 368-372) before continuing.
- Around line 118-121: In load_spec, validate the result of yaml.safe_load
before unpacking into VersionedWorkflowSpec: check that raw is a mapping/dict
(e.g., isinstance(raw, dict)) and if not raise a clear ValueError (or a
WorkflowSpec-specific validation error) explaining the spec is empty or not a
mapping so the caller sees a proper validation failure instead of a TypeError;
then proceed to construct workflow_module.VersionedWorkflowSpec(**raw) as
before.
- Around line 567-580: Temporary work directories created when work_dir is None
can leak if LocalExecutor.load_spec or executor.execute raises; wrap the spec
loading and execution in a try/finally (or try/except+re-raise) so that if
created_work_dir is True and keep_work_dir is False the finally block always
calls shutil.rmtree(work_dir, ignore_errors=True) after execution or on
exception; ensure you still return/propagate exceptions and only delete when
created_work_dir and not keep_work_dir, referencing work_dir, created_work_dir,
keep_work_dir, LocalExecutor.load_spec and LocalExecutor.execute.
In `@src/utils/tests/test_local_executor.py`:
- Around line 1-17: The `# pylint: disable=line-too-long` pragma is currently
inside the module docstring (SPDX header) so it has no effect; remove that
pragma from the string and add it as a standalone comment outside the docstring
(for example immediately above or immediately below the module docstring) so
pylint will honor it—update the top of the test module in test_local_executor.py
accordingly, keeping the SPDX header text unchanged but moving the pragma out of
the string.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 687e28c5-6fa4-428d-be92-599e75c40a97
📒 Files selected for processing (2)
src/utils/local_executor.pysrc/utils/tests/test_local_executor.py
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/cli/BUILD (1)
30-50:⚠️ Potential issue | 🟠 MajorWire the new Docker Compose CLI into
cli_lib.This BUILD target includes
standalone.py, but the newsrc/cli/docker_compose.pymodule from this PR is still missing fromsrcs, and//src/utils:compose_executoris missing fromdeps. Ifmain_parser.pyimports that subcommand, Bazel builds/package assembly will fail or ship a CLI without the new command wired in.Proposed fix
srcs = [ "access_token.py", "app.py", "bucket.py", "config.py", "credential.py", "data.py", "dataset.py", + "docker_compose.py", "editor.py", "formatters.py", "standalone.py", "login.py", "main_parser.py", @@ "//src/lib/utils:validation", "//src/lib/utils:version", "//src/lib/utils:workflow", + "//src/utils:compose_executor", "//src/utils:standalone_executor", ], )Also applies to: 51-78
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cli/BUILD` around lines 30 - 50, The BUILD srcs list is missing the new CLI module and its runtime dependency; add "docker_compose.py" alongside "standalone.py" in the srcs array and add the dependency "//src/utils:compose_executor" to the deps for this target so the new Docker Compose subcommand imported by main_parser.py is compiled and packaged correctly; ensure any other identical srcs/deps blocks (lines 51-78 equivalent) are updated the same way to wire the new docker_compose module into the cli_lib target.
♻️ Duplicate comments (2)
src/utils/standalone_executor.py (2)
118-122:⚠️ Potential issue | 🟠 MajorReject non-mapping YAML before unpacking it.
yaml.safe_load()returnsNonefor an empty file and can also return scalars/lists. In those cases,VersionedWorkflowSpec(**raw)throwsTypeError, which skips the normalValueError/stderr path in the CLI and surfaces an internal traceback instead of a clean validation error.Proposed fix
def load_spec(self, spec_text: str) -> workflow_module.WorkflowSpec: """Parse raw YAML text into a validated WorkflowSpec via the versioned spec model.""" raw = yaml.safe_load(spec_text) + if not isinstance(raw, dict): + raise ValueError('Workflow spec must be a YAML mapping with a top-level "workflow" key') versioned = workflow_module.VersionedWorkflowSpec(**raw) return versioned.workflow🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 118 - 122, The load_spec function should validate the YAML result before unpacking into VersionedWorkflowSpec: check the local variable raw returned by yaml.safe_load(spec_text) and if it is None or not an instance of dict/mapping, raise a ValueError with a clear message about expecting a mapping workflow spec; then only call workflow_module.VersionedWorkflowSpec(**raw) and return versioned.workflow. This ensures load_spec rejects empty/scalar/list YAML inputs instead of letting VersionedWorkflowSpec raise a TypeError.
567-583:⚠️ Potential issue | 🟡 MinorClean up auto-created work dirs when setup or execution raises.
Once
mkdtemp()succeeds, any exception fromStandaloneExecutor(...),load_spec(...), orexecute(...)leaksosmo-standalone-*whenkeep_work_diris false, because cleanup only runs on the normal success/failure path.Proposed fix
created_work_dir = work_dir is None if work_dir is None: work_dir = tempfile.mkdtemp(prefix='osmo-standalone-') logger.info('Using temporary work directory: %s', work_dir) - executor = StandaloneExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir, - docker_cmd=docker_cmd, shm_size=shm_size) - spec = executor.load_spec(spec_text) - success = executor.execute(spec, resume=resume or from_step is not None, - from_step=from_step) + try: + executor = StandaloneExecutor(work_dir=work_dir, keep_work_dir=keep_work_dir, + docker_cmd=docker_cmd, shm_size=shm_size) + spec = executor.load_spec(spec_text) + success = executor.execute(spec, resume=resume or from_step is not None, + from_step=from_step) + except Exception: + if created_work_dir and not keep_work_dir: + shutil.rmtree(work_dir, ignore_errors=True) + raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 567 - 583, The code currently creates a temp dir (mkdtemp) and then calls StandaloneExecutor(...), executor.load_spec(...), and executor.execute(...), but only cleans the dir on the normal success/failure path; wrap the executor creation, load_spec, and execute calls in a try/finally so that any exception triggers cleanup: after creating work_dir (when created_work_dir is True) ensure in the finally block that if not keep_work_dir the directory is removed with shutil.rmtree(work_dir, ignore_errors=True); keep the existing logging (info about using temp dir and preserved dir) but move the final cleanup logic into the finally to guarantee no leak even when StandaloneExecutor, load_spec, or execute raise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/compose_executor.py`:
- Around line 511-526: After creating a temporary work dir (created_work_dir)
but before constructing ComposeExecutor and calling ComposeExecutor.load_spec /
ComposeExecutor.execute, wrap the executor creation, spec loading and execution
in a try/finally so that if any exception occurs you still remove the
auto-created work dir when keep_work_dir is false; in the finally block check
created_work_dir and not keep_work_dir and then call shutil.rmtree(work_dir,
ignore_errors=True). Ensure variables referenced (created_work_dir, work_dir,
keep_work_dir, ComposeExecutor, load_spec, execute) are in scope and that
logging for preserved/cleaned up dirs is preserved or moved appropriately.
- Around line 86-95: The ComposeExecutor.execute override currently ignores the
resume and from_step flags from StandaloneExecutor.execute causing silent
behavior changes; update ComposeExecutor.execute(spec, resume=False,
from_step=None) to either implement resume semantics or explicitly reject
unsupported flags — e.g., at the top of the method check if resume or from_step
is set and raise a clear exception (ValueError or NotImplementedError)
indicating compose mode does not support resume/from_step yet, or implement the
resume logic by wiring resume/from_step into the DAG traversal in
_build_dag/_validate_for_compose/_generate_compose_file so execution begins at
the requested step; reference the execute method in ComposeExecutor and the
_build_dag/_generate_compose_file helpers to locate where to apply the change.
---
Outside diff comments:
In `@src/cli/BUILD`:
- Around line 30-50: The BUILD srcs list is missing the new CLI module and its
runtime dependency; add "docker_compose.py" alongside "standalone.py" in the
srcs array and add the dependency "//src/utils:compose_executor" to the deps for
this target so the new Docker Compose subcommand imported by main_parser.py is
compiled and packaged correctly; ensure any other identical srcs/deps blocks
(lines 51-78 equivalent) are updated the same way to wire the new docker_compose
module into the cli_lib target.
---
Duplicate comments:
In `@src/utils/standalone_executor.py`:
- Around line 118-122: The load_spec function should validate the YAML result
before unpacking into VersionedWorkflowSpec: check the local variable raw
returned by yaml.safe_load(spec_text) and if it is None or not an instance of
dict/mapping, raise a ValueError with a clear message about expecting a mapping
workflow spec; then only call workflow_module.VersionedWorkflowSpec(**raw) and
return versioned.workflow. This ensures load_spec rejects empty/scalar/list YAML
inputs instead of letting VersionedWorkflowSpec raise a TypeError.
- Around line 567-583: The code currently creates a temp dir (mkdtemp) and then
calls StandaloneExecutor(...), executor.load_spec(...), and
executor.execute(...), but only cleans the dir on the normal success/failure
path; wrap the executor creation, load_spec, and execute calls in a try/finally
so that any exception triggers cleanup: after creating work_dir (when
created_work_dir is True) ensure in the finally block that if not keep_work_dir
the directory is removed with shutil.rmtree(work_dir, ignore_errors=True); keep
the existing logging (info about using temp dir and preserved dir) but move the
final cleanup logic into the finally to guarantee no leak even when
StandaloneExecutor, load_spec, or execute raise.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 96a9a10d-7d76-4af0-8773-526ae76dc345
📒 Files selected for processing (11)
AGENTS.mdsrc/cli/BUILDsrc/cli/docker_compose.pysrc/cli/main_parser.pysrc/cli/standalone.pysrc/utils/BUILDsrc/utils/compose_executor.pysrc/utils/standalone_executor.pysrc/utils/tests/BUILDsrc/utils/tests/test_compose_executor.pysrc/utils/tests/test_standalone_executor.py
✅ Files skipped from review due to trivial changes (3)
- src/cli/main_parser.py
- src/utils/tests/BUILD
- AGENTS.md
There was a problem hiding this comment.
♻️ Duplicate comments (3)
src/utils/standalone_executor.py (3)
462-468:⚠️ Potential issue | 🟠 MajorQuote multi-GPU selectors before passing
--gpusto Docker.Official NVIDIA/Docker examples use a quoted selector for specific devices, and the unquoted
device=0,1form is documented as failing withcannot set both Count and DeviceIDs. Here the argv element is emitted as plaindevice=0,1, so multi-GPU tasks can fail to start. (docs.nvidia.com)Suggested fix
- docker_args += ['--gpus', f'device={",".join(str(i) for i in range(available))}'] + docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(available))}"'] else: - docker_args += ['--gpus', f'device={",".join(str(i) for i in range(gpu_count))}'] + docker_args += ['--gpus', f'"device={",".join(str(i) for i in range(gpu_count))}"']🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 462 - 468, The multi-GPU selector passed to Docker is unquoted and can trigger "cannot set both Count and DeviceIDs" errors; update both places where docker_args is appended (the branch handling gpu_count > available and the else branch) so the selector value is wrapped in quotes (i.e., pass a quoted "device=0,1" string instead of device=0,1) when adding ['--gpus', ...], ensuring node.name/gpu_count/available logic remains the same.
417-423:⚠️ Potential issue | 🟠 MajorHonor inline
resources.gpuin_task_gpu_count().This helper only consults
spec.resources[task_spec.resource], so tasks that declare GPUs inline underresources:are treated as CPU-only.ComposeExecutor._build_compose_service()inherits the same bug through this helper.Suggested fix
def _task_gpu_count(self, task_spec: task_module.TaskSpec, spec: workflow_module.WorkflowSpec) -> int: """Return the number of GPUs requested by a task's resource spec, defaulting to 0.""" - resource_spec = spec.resources.get(task_spec.resource) + if task_spec.resources and task_spec.resources.gpu: + return task_spec.resources.gpu + resource_spec = spec.resources.get(task_spec.resource) if resource_spec and resource_spec.gpu: return resource_spec.gpu return 0🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 417 - 423, _task_gpu_count currently only looks up GPUs via spec.resources.get(task_spec.resource) and ignores inline declarations on the TaskSpec; update _task_gpu_count to first inspect task_spec.resources (e.g., task_spec.resources.gpu or task_spec.resources.get("gpu") depending on structure) and return that value if present, then fall back to resolving spec.resources.get(task_spec.resource) as before; also ensure ComposeExecutor._build_compose_service (which uses _task_gpu_count) will then honor inline task-level GPU requests without further changes.
131-137:⚠️ Potential issue | 🟠 MajorClean output directories for tasks that will be re-executed.
_setup_directories()only creates missing folders. On--resume/--from-step, any task not restored intoself._resultskeeps its previous<work_dir>/<task>/outputcontents, so a rerun can read stale artifacts from{{output}}before it writes new ones.Suggested fix
self._validate_for_standalone(spec) self._setup_directories() if resume or from_step: self._restore_completed_tasks(from_step) + for task_name in self._task_nodes: + if task_name in self._results: + continue + task_output_dir = os.path.join(self._work_dir, task_name, 'output') + shutil.rmtree(task_output_dir, ignore_errors=True) + os.makedirs(task_output_dir, exist_ok=True)Also applies to: 372-376
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 131 - 137, When resuming/from-step, output directories for tasks that will be re-executed are not cleaned because _setup_directories only creates missing folders; after calling _restore_completed_tasks(...) you must identify tasks that remain to run (e.g., all task ids in the DAG minus keys in self._results) and remove or empty their <work_dir>/<task>/output directories so no stale artifacts are read; implement this cleanup right after _restore_completed_tasks(...) in the block that calls _build_dag, _validate_for_standalone, _setup_directories (and replicate the same cleanup in the analogous block around lines 372-376) to ensure tasks selected for re-execution start with a clean output directory.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/utils/standalone_executor.py`:
- Around line 462-468: The multi-GPU selector passed to Docker is unquoted and
can trigger "cannot set both Count and DeviceIDs" errors; update both places
where docker_args is appended (the branch handling gpu_count > available and the
else branch) so the selector value is wrapped in quotes (i.e., pass a quoted
"device=0,1" string instead of device=0,1) when adding ['--gpus', ...], ensuring
node.name/gpu_count/available logic remains the same.
- Around line 417-423: _task_gpu_count currently only looks up GPUs via
spec.resources.get(task_spec.resource) and ignores inline declarations on the
TaskSpec; update _task_gpu_count to first inspect task_spec.resources (e.g.,
task_spec.resources.gpu or task_spec.resources.get("gpu") depending on
structure) and return that value if present, then fall back to resolving
spec.resources.get(task_spec.resource) as before; also ensure
ComposeExecutor._build_compose_service (which uses _task_gpu_count) will then
honor inline task-level GPU requests without further changes.
- Around line 131-137: When resuming/from-step, output directories for tasks
that will be re-executed are not cleaned because _setup_directories only creates
missing folders; after calling _restore_completed_tasks(...) you must identify
tasks that remain to run (e.g., all task ids in the DAG minus keys in
self._results) and remove or empty their <work_dir>/<task>/output directories so
no stale artifacts are read; implement this cleanup right after
_restore_completed_tasks(...) in the block that calls _build_dag,
_validate_for_standalone, _setup_directories (and replicate the same cleanup in
the analogous block around lines 372-376) to ensure tasks selected for
re-execution start with a clean output directory.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: af05a982-9fd9-494e-8e5f-7bf2cc94808d
📒 Files selected for processing (3)
src/cli/BUILDsrc/utils/compose_executor.pysrc/utils/standalone_executor.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/cli/BUILD
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/compose_executor.py`:
- Around line 193-197: Validation currently accepts any declared credential name
by checking task_spec.credentials against self._credentials, but
_build_compose_service() only mounts credentials whose value is a string, so
dict-style credential mappings silently get dropped; update the validator in
compose mode (the loop referencing task_spec.credentials and self._credentials)
to also check that self._credentials[cred_name] is a str and reject/raise an
error if it's not (e.g., is a dict), and make the same change in the other
validator block referenced around lines 368-371 so both validation sites enforce
string-only credential values.
- Around line 542-546: The finally block currently removes auto-created work
dirs even on failure; update the cleanup logic in compose_executor.py so that
shutil.rmtree(work_dir, ...) only runs when created_work_dir is true,
keep_work_dir is false, AND success is true (i.e., delete only on successful
runs), and ensure when not success you log/preserve the work_dir via
logger.info('Work directory preserved for debugging: %s', work_dir); adjust the
conditionals around created_work_dir, keep_work_dir, success, and the logger
call accordingly to preserve generated docker-compose.yml on failure.
In `@src/utils/standalone_executor.py`:
- Around line 78-81: The docstring listing unsupported features is out of date:
it still claims "Credentials" are unsupported even though StandaloneExecutor
accepts --credential mounts and binds them in the _run_task() method; update the
text in the unsupported-features block to remove "Credentials" (or explicitly
mark them as supported via --credential) and, if helpful, add a short note
referencing the --credential flag and that binding happens in
StandaloneExecutor._run_task() so the comment matches the implementation.
- Around line 357-361: The loop that validates TaskSpec.credentials currently
treats each credential as a string and silently ignores dict-style credentials;
update the validation in the credentials-checking loop (iterating over
task_spec.credentials) to detect dict items (isinstance(item, dict)) and add a
clear rejection message to unsupported_features (e.g., 'Task "{task_spec.name}":
credential "{key}" uses dict-style mapping which the standalone executor does
not support; provide credentials as NAME=/path or flatten the mapping') instead
of treating them as missing, and apply the same explicit rejection check where
credentials are mounted (the block around the mount code that references
self._credentials) so dict credential entries are rejected consistently rather
than skipped.
- Around line 673-677: The finally block currently deletes auto-created work
dirs whenever created_work_dir is True and keep_work_dir is False, even on
failure; change the logic so auto-created dirs are removed only when the run
succeeded and keep_work_dir is False. Specifically, update the condition around
shutil.rmtree in the finally block (referencing created_work_dir, keep_work_dir,
success, work_dir) to run only when success is True (e.g., if created_work_dir
and not keep_work_dir and success) and ensure the branch that logs preservation
(logger.info('Work directory preserved for debugging: %s', work_dir)) runs when
the run failed (not success) so that auto-created dirs are always kept on
failure.
- Around line 580-636: _spec_has_templates currently only looks for '{%', '{#',
and 'default-values' so specs that only use '{{ ... }}' never trigger local
expansion; additionally _OSMO_TOKEN_PATTERN only matches unspaced tokens like
'{{output}}' so spaced tokens like '{{ output }}' are not protected and Jinja
raises StrictUndefined. Update _OSMO_TOKEN_PATTERN to allow optional whitespace
inside the braces and to match the full token (e.g.
r'\{\{\s*(uuid|workflow_id|output|input:[^}]+|host:[^}]+)\s*\}\}'), and in
_expand_jinja_locally use the match.group(0) (the full token including spaces)
as original_token when building placeholder_map so replacements preserve
spacing; also update _spec_has_templates to return True if '{{' appears (in
addition to '{%' and '{#' and 'default-values') so plain '{{ var }}' triggers
local Jinja expansion.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 0c881831-6032-4c86-b602-7a0c4af46766
📒 Files selected for processing (8)
.gitignoreAGENTS.mdsrc/cli/BUILDsrc/cli/docker_compose.pysrc/cli/standalone.pysrc/utils/BUILDsrc/utils/compose_executor.pysrc/utils/standalone_executor.py
✅ Files skipped from review due to trivial changes (1)
- .gitignore
🚧 Files skipped from review as they are similar to previous changes (3)
- src/cli/BUILD
- src/utils/BUILD
- AGENTS.md
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
src/utils/compose_executor.py (1)
77-78: Project name may be invalid if workflow name starts with non-alphanumeric.If the workflow name starts with a special character (e.g.,
_myworkflow), the resulting project nameosmo--myworkflowhas a double hyphen but is valid. However, if a workflow name is all special characters (unlikely but possible), the result could be problematic.This is a minor edge case, but consider validating the workflow name earlier or stripping leading/trailing hyphens:
def _compose_project_name(self, spec: workflow_module.WorkflowSpec) -> str: - return f'osmo-{re.sub(r"[^a-z0-9-]", "-", spec.name.lower())}' + sanitized = re.sub(r"[^a-z0-9-]", "-", spec.name.lower()).strip('-') + return f'osmo-{sanitized or "workflow"}'🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/compose_executor.py` around lines 77 - 78, The _compose_project_name function can produce leading/trailing or repeated hyphens (or an empty name) when spec.name contains only non-alphanumeric chars; update the logic in _compose_project_name to first replace non [a-z0-9] with hyphens, collapse consecutive hyphens into a single hyphen, strip leading/trailing hyphens, and if the result is empty fallback to a safe default like "osmo-default" (all while keeping the "osmo-" prefix and using spec.name.lower() as the input); locate and update the _compose_project_name method in compose_executor.py to implement these transformations and the fallback.src/utils/standalone_executor.py (2)
508-509: Consider addingstrict=Truetozip()for defensive coding.While
task_spec.environment.keys()andresolved_env_valuesare guaranteed to have the same length (built from the same dict), addingstrict=Truemakes this invariant explicit and would catch any future refactoring bugs:- for env_key, resolved_value in zip(task_spec.environment.keys(), resolved_env_values): + for env_key, resolved_value in zip(task_spec.environment.keys(), resolved_env_values, strict=True):🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 508 - 509, The loop that builds docker_args uses zip(task_spec.environment.keys(), resolved_env_values) — change it to zip(task_spec.environment.keys(), resolved_env_values, strict=True) to enforce the invariant that both iterables have the same length; update the for-loop over (env_key, resolved_value) accordingly (still appending to docker_args), and run tests/CI to ensure Python runtime supports zip(..., strict=True).
203-214: State file does not include workflow version/hash.The state file only persists task results without any workflow fingerprint. If the spec changes between runs (different images, args, etc.) but task names stay the same,
--resumewill skip incompatible tasks and reuse stale outputs.The PR discussion indicates this was a deliberate design decision, but consider at minimum logging a warning when resuming that the user is responsible for ensuring spec consistency.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 203 - 214, The state file currently saved by _save_state lacks any workflow fingerprint, so add a deterministic workflow/version hash to the saved JSON (e.g., compute from the workflow spec, image names, args -- implement a helper like _compute_workflow_fingerprint and include its result under a top-level key such as "workflow_fingerprint" alongside "tasks"), update _save_state to write that fingerprint to self._state_file_path, and then update the resume/load path (the method that reads the state, e.g., _load_state or the resume logic that calls it) to compare the saved fingerprint with the current computed fingerprint and emit a clear processLogger.warning (or similar logger used in this class) when fingerprints are missing or do not match so users are warned before reusing stale outputs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/standalone_executor.py`:
- Around line 499-501: The docker_args construction in standalone_executor.py is
adding literal double-quote characters into the GPU spec (in the f-strings used
to build the '--gpus' argument), which breaks Docker when invoking
subprocess.run() with list mode; update both branches that set docker_args (the
branch using available and the branch using gpu_count) to remove the surrounding
quotes so the value is e.g. device=0,1 instead of "device=0,1". Locate the code
that appends to docker_args (the two f'"device=..."' occurrences) and change
them to unquoted f'device=...'; keep using subprocess.run() list invocation
unchanged.
---
Nitpick comments:
In `@src/utils/compose_executor.py`:
- Around line 77-78: The _compose_project_name function can produce
leading/trailing or repeated hyphens (or an empty name) when spec.name contains
only non-alphanumeric chars; update the logic in _compose_project_name to first
replace non [a-z0-9] with hyphens, collapse consecutive hyphens into a single
hyphen, strip leading/trailing hyphens, and if the result is empty fallback to a
safe default like "osmo-default" (all while keeping the "osmo-" prefix and using
spec.name.lower() as the input); locate and update the _compose_project_name
method in compose_executor.py to implement these transformations and the
fallback.
In `@src/utils/standalone_executor.py`:
- Around line 508-509: The loop that builds docker_args uses
zip(task_spec.environment.keys(), resolved_env_values) — change it to
zip(task_spec.environment.keys(), resolved_env_values, strict=True) to enforce
the invariant that both iterables have the same length; update the for-loop over
(env_key, resolved_value) accordingly (still appending to docker_args), and run
tests/CI to ensure Python runtime supports zip(..., strict=True).
- Around line 203-214: The state file currently saved by _save_state lacks any
workflow fingerprint, so add a deterministic workflow/version hash to the saved
JSON (e.g., compute from the workflow spec, image names, args -- implement a
helper like _compute_workflow_fingerprint and include its result under a
top-level key such as "workflow_fingerprint" alongside "tasks"), update
_save_state to write that fingerprint to self._state_file_path, and then update
the resume/load path (the method that reads the state, e.g., _load_state or the
resume logic that calls it) to compare the saved fingerprint with the current
computed fingerprint and emit a clear processLogger.warning (or similar logger
used in this class) when fingerprints are missing or do not match so users are
warned before reusing stale outputs.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 8e89b1d9-9eb4-425d-bcbf-3ba7b3572583
📒 Files selected for processing (2)
src/utils/compose_executor.pysrc/utils/standalone_executor.py
There was a problem hiding this comment.
🧹 Nitpick comments (3)
src/utils/standalone_executor.py (2)
280-290: Minor: Consider usingcollections.dequefor BFS queue.
list.pop(0)is O(n) per operation. For large workflows,collections.deque.popleft()provides O(1) performance. This also applies to_cancel_downstreamat lines 469-477.Suggested improvement
+from collections import deque + def _get_downstream_tasks(self, task_name: str) -> Set[str]: """Return all transitive downstream dependents of the given task via BFS.""" visited: Set[str] = set() - queue = [task_name] + queue = deque([task_name]) while queue: - current = queue.pop(0) + current = queue.popleft() for downstream in self._task_nodes[current].downstream: if downstream not in visited: visited.add(downstream) queue.append(downstream) return visited🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 280 - 290, Replace the BFS queue implementation that uses a list and pop(0) with a collections.deque to get O(1) pops: import deque from collections, initialize queue = deque([task_name]) in _get_downstream_tasks and use queue.popleft() instead of pop(0); make the same change in the _cancel_downstream function (initialize its queue as a deque and call popleft()) to avoid O(n) behavior on large workflows while preserving the existing visited/append logic.
126-141: Consider includingfilescontents in the workflow fingerprint.The fingerprint captures task name, image, command, args, environment, inputs, and resource but omits
filescontents. If a user changes inline file contents without changing other fields, the fingerprint won't detect it and--resumemay reuse stale outputs.Suggested improvement
fingerprint_data.append({ 'name': name, 'image': spec.image, 'command': spec.command, 'args': spec.args, 'environment': dict(sorted(spec.environment.items())), 'inputs': [str(i) for i in spec.inputs], 'resource': spec.resource, + 'files': [(f.path, f.contents) for f in spec.files], })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/standalone_executor.py` around lines 126 - 141, The _compute_workflow_fingerprint function currently omits task file contents; update it to include each task spec's files in the fingerprint by adding a deterministic representation of spec.files (e.g., a sorted list of {path, content} or a map path->content) into fingerprint_data for each task in _task_nodes. Ensure ordering is stable (sort file paths), handle missing/None safely, and convert file contents to strings (or a content hash) so the JSON dump stays deterministic before hashing; keep using json.dumps(sort_keys=True, separators=(',', ':')) and hashlib.sha256 to produce the final hex digest.src/utils/compose_executor.py (1)
493-506: Consider logging a hint when JSON parsing fails to find the service.The JSON parsing handles both single-object and array formats from
docker compose ps, which is good. However, if the output format changes in future Docker Compose versions, parsing could silently fail and return exit code 1, making debugging difficult.Suggested improvement
for line in result.stdout.strip().splitlines(): line = line.strip() if not line: continue try: container_info = json.loads(line) except json.JSONDecodeError: + logger.debug('Could not parse compose ps output line: %s', line[:100]) continue🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/compose_executor.py` around lines 493 - 506, The loop that parses result.stdout with json.loads can silently skip malformed lines and return 1, so update the parsing in compose_executor.py (the block iterating over result.stdout.splitlines and using json.loads) to log a hint when JSONDecodeError occurs and when no matching service is found: catch json.JSONDecodeError and call the module logger (e.g., logger.error or process_logger.error) including the exception and the raw line from result.stdout, and after the loop, if no entry matched service_name, log a warning that the service was not found in parsed container_info along with the full result.stdout to aid debugging.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/utils/compose_executor.py`:
- Around line 493-506: The loop that parses result.stdout with json.loads can
silently skip malformed lines and return 1, so update the parsing in
compose_executor.py (the block iterating over result.stdout.splitlines and using
json.loads) to log a hint when JSONDecodeError occurs and when no matching
service is found: catch json.JSONDecodeError and call the module logger (e.g.,
logger.error or process_logger.error) including the exception and the raw line
from result.stdout, and after the loop, if no entry matched service_name, log a
warning that the service was not found in parsed container_info along with the
full result.stdout to aid debugging.
In `@src/utils/standalone_executor.py`:
- Around line 280-290: Replace the BFS queue implementation that uses a list and
pop(0) with a collections.deque to get O(1) pops: import deque from collections,
initialize queue = deque([task_name]) in _get_downstream_tasks and use
queue.popleft() instead of pop(0); make the same change in the
_cancel_downstream function (initialize its queue as a deque and call popleft())
to avoid O(n) behavior on large workflows while preserving the existing
visited/append logic.
- Around line 126-141: The _compute_workflow_fingerprint function currently
omits task file contents; update it to include each task spec's files in the
fingerprint by adding a deterministic representation of spec.files (e.g., a
sorted list of {path, content} or a map path->content) into fingerprint_data for
each task in _task_nodes. Ensure ordering is stable (sort file paths), handle
missing/None safely, and convert file contents to strings (or a content hash) so
the JSON dump stays deterministic before hashing; keep using
json.dumps(sort_keys=True, separators=(',', ':')) and hashlib.sha256 to produce
the final hex digest.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d75af904-9945-450c-8df8-dabef5dbcd16
📒 Files selected for processing (2)
src/utils/compose_executor.pysrc/utils/standalone_executor.py
- Updated file operations in `local_executor.py` to explicitly use UTF-8 encoding when reading and writing files, ensuring better compatibility with various text formats. - Adjusted exception handling in `test_local_executor.py` to raise `ValueError` instead of a generic `Exception` for clearer error reporting. - Modified test documentation to reflect the correct Jinja block syntax in error messages.
…dules - Expanded exception handling in `local.py` to include `FileNotFoundError` and `PermissionError` for improved robustness. - Updated comments in `local_executor.py` to clarify unsupported features in local mode, specifically regarding privileged containers and host networking. - Modified test case in `test_local_executor.py` to ensure caller-supplied work directories are preserved on success, enhancing test accuracy.
- Introduced `--shm-size` argument in the CLI for specifying shared memory size for GPU containers, defaulting to 16g. - Updated `LocalExecutor` to accept and utilize the shared memory size during Docker command construction. - Added unit tests to verify correct handling of shared memory size for both default and custom values in GPU tasks, ensuring no shared memory argument is included for non-GPU tasks.
- Created a new `tutorial_specs` filegroup in the `cookbook/tutorials/BUILD` to include YAML specifications. - Updated the `BUILD` file in `src/utils/tests` to include the new `tutorial_specs` as data for local tests. - Added a new test class in `test_local_executor.py` to validate unsupported features in cookbook specifications, ensuring proper error handling for unsupported fields. - Implemented additional tests to verify that specific unsupported features are correctly rejected during local execution.
…traversal - Enhanced the `LocalExecutor` class to validate file paths, ensuring they do not escape the task directory. This prevents potential security risks associated with directory traversal attacks. - Added unit tests in `test_local_executor.py` to verify that invalid file paths raise appropriate exceptions, while valid paths are accepted without errors. - Updated documentation in `AGENTS.md` to include the new local executor functionality for Docker-based workflow execution.
- Updated the `LocalExecutor` class to remove unnecessary quotes around GPU device specifications in Docker command arguments, ensuring correct formatting. - Cleared previous results at the start of the `execute` method to prevent data carryover between executions.
- Updated the `LocalExecutor` class to ensure the `--shm-size` argument is included for CPU-only tasks when explicitly specified by the user. - Adjusted the logic for setting the shared memory size to improve clarity and maintainability. - Enhanced unit tests in `test_local_executor.py` to verify correct behavior for tasks with and without GPU resources regarding shared memory size.
- Updated the `LocalExecutor` class to replace host paths with container-side paths in the token mapping for output and input sources. - Adjusted the `_build_token_map` method to eliminate the output directory parameter, aligning with the new container path structure. - Modified Docker argument construction to reflect the new paths, ensuring correct volume mounts for inputs and outputs. - Enhanced unit tests in `test_local_executor.py` to validate the updated token mapping behavior and ensure proper handling of container paths.
- Implemented a `_check_for_cycles` method in the `LocalExecutor` class to identify and raise a `ValueError` for circular dependencies in task DAGs. - Added a new test class `TestCycleDetection` in `test_local_executor.py` to validate the detection of direct and indirect cycles, ensuring proper error reporting for various cyclic configurations. - Enhanced existing tests to confirm that non-cyclic task structures are accepted without errors.
…pport
- Added support for a lead-task failure policy in the `LocalExecutor`, allowing non-lead task failures to be ignored when `ignoreNonleadStatus` is set to true.
- Implemented checks to determine if a task's failure can be ignored based on its lead status and group configuration.
- Introduced validation for tasks using `{{host:taskname}}` tokens, ensuring proper error handling for unsupported configurations.
- Expanded unit tests in `test_local_executor.py` to cover various scenarios for lead-task behavior and host token usage, ensuring robust functionality.
- Added functionality to detect unresolved {{variable}} tokens in command, args, environment values, and file contents before executing tasks in the `LocalExecutor`.
- Implemented a `_check_unresolved_tokens` method to raise a ValueError if unresolved tokens are found, providing a helpful error message suggesting the use of a dry-run for template expansion.
- Introduced a new test class `TestUnresolvedTokenDetection` in `test_local_executor.py` to validate the detection of unresolved tokens across various scenarios, ensuring robust error handling.
- Replaced the `LocalExecutor` with `StandaloneExecutor` to facilitate Docker-based workflow execution without Kubernetes. - Updated relevant documentation in `AGENTS.md` to reflect the new executor's capabilities and entry points. - Modified CLI commands and parser to integrate the `standalone` subcommand for executing workflows in standalone mode. - Adjusted build configurations to include the new `standalone_executor` and updated test references accordingly. - Introduced comprehensive tests for the `StandaloneExecutor` to ensure functionality and validate workflow execution scenarios.
- Introduced `ComposeExecutor` to enable Docker Compose-based parallel execution of workflows, extending the functionality of `StandaloneExecutor`. - Updated `AGENTS.md` to document the new `ComposeExecutor` and its capabilities, including wave-parallel scheduling and GPU support. - Added a new `docker_compose.py` CLI subcommand for executing workflows using Docker Compose. - Implemented tests for `ComposeExecutor` to validate functionality and ensure correct generation of Docker Compose files. - Adjusted build configurations to include the new `compose_executor` library and corresponding tests.
…nd cleanup - Added error handling in `ComposeExecutor` to raise `NotImplementedError` for unsupported resume functionality. - Refactored `run_workflow_compose` and `run_workflow_standalone` to ensure proper cleanup of temporary work directories using a `try-finally` block. - Introduced validation in `StandaloneExecutor` to check for correct YAML mapping in workflow specifications, raising a `ValueError` for invalid formats. - Updated build configurations to include the new `docker_compose.py` file and the `compose_executor` library.
- Introduced a new method `_clean_rerun_output_dirs` to remove output directories for tasks that will be re-executed, ensuring no stale artifacts remain. - Updated the `run_workflow_standalone` method to call the cleanup function when resuming or starting from a specific step. - Adjusted GPU device specification formatting in Docker arguments to ensure correct syntax with quotes around device lists.
- Added support for `--credential`, `--set`, and `--set-string` arguments in both `docker_compose.py` and `standalone.py` to allow users to map credential names to local directories and override default values in workflow specifications. - Implemented `_parse_credentials` function to validate and parse credential inputs. - Updated `ComposeExecutor` and `StandaloneExecutor` to handle new credential and variable parameters, ensuring proper binding of directories and values during execution. - Enhanced error handling for unsupported credential configurations in workflow specifications.
- Updated credential processing to support dict-style mappings, raising clear errors for unsupported configurations. - Improved error messages for missing credentials, guiding users on the correct format for credential specification. - Adjusted cleanup logic in `run_workflow_compose` and `run_workflow_standalone` to ensure work directories are only removed on successful execution.
- Introduced a new method `_compute_workflow_fingerprint` to generate a SHA-256 hash of task specifications, allowing for detection of workflow changes across runs. - Updated the state management to include the workflow fingerprint, enabling verification of spec consistency between runs and warning users of potential stale outputs. - Improved GPU device specification formatting in Docker arguments to ensure correct syntax without unnecessary quotes.
- Added error logging for JSON parsing failures in `ComposeExecutor` to aid in debugging. - Enhanced warning messages in `ComposeExecutor` to include the full Docker compose output when no container info is found. - Updated `StandaloneExecutor` to use `deque` for improved performance in task processing queues.
…ng and state management - Replaced string splitting with `shlex.split` in `ComposeExecutor` to ensure correct parsing of command arguments. - Enhanced `_compute_workflow_fingerprint` in `StandaloneExecutor` to accept `WorkflowSpec` directly, improving clarity and functionality. - Updated state file handling in `StandaloneExecutor` to use a temporary file for safer writes and added error handling for corrupt state files.
…ction logic - Updated the cycle detection logic to use list unpacking for clarity in `StandaloneExecutor`. - Changed hash generation from MD5 to SHA-256 for improved security in `_expand_jinja_locally`, ensuring better uniqueness and collision resistance.
…ncoding - Introduced base64 decoding for file writing in `StandaloneExecutor`, allowing for proper handling of encoded content. - Ensured cleanup of rerun output directories is executed when resuming tasks, improving state management.
- Added support for credential data mapping in `_compute_workflow_fingerprint`, allowing for better management of task-specific credentials. - Updated `_validate_for_standalone` to raise a ValueError for unsupported timeout features in standalone mode, improving error handling and user guidance. - Ensured directory setup is executed correctly during workflow execution, enhancing state management.
- Updated GPU allocation logic to use 'all' when requested GPUs exceed available resources, improving resource management. - Adjusted logging messages for clarity when running tasks without GPU support or with limited GPU availability. - Ensured correct formatting for GPU device specifications in Docker arguments.
ab35a7f to
e782e4d
Compare
…r and ComposeExecutor - Updated file writing logic in `StandaloneExecutor` to use more descriptive variable names for file handlers, improving code readability. - Enhanced test cases in `TestComposeValidation` and `TestJinjaTemplateDetection` to better validate credential handling and Jinja template expansion, ensuring robustness in workflow specifications. - Adjusted error messages for clarity in credential validation tests, providing more informative feedback for users.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/utils/tests/test_compose_executor.py`:
- Around line 476-477: Replace the hard-coded '/tmp/*' paths in the test that
instantiates ComposeExecutor with runtime-created temporary directories: create
a temporary directory (via tempfile.TemporaryDirectory() or the pytest
tmp_path/tmp_path_factory fixture), use that directory path for both the
work_dir and the credentials value (e.g., credentials={'my-secret': str(tmp_dir
/ "secret-dir")}), ensure the secret subdirectory exists before creating the
ComposeExecutor, and let the context manager/fixture handle cleanup so there are
no fixed '/tmp' strings in the test referencing ComposeExecutor.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 21a20a61-2b02-4ef6-ab9e-509046913f22
📒 Files selected for processing (3)
src/utils/standalone_executor.pysrc/utils/tests/test_compose_executor.pysrc/utils/tests/test_standalone_executor.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/utils/standalone_executor.py
…ries for credential handling - Updated the test case to create a temporary directory for storing credentials, improving isolation and preventing side effects during tests. - Enhanced readability by removing hardcoded paths and utilizing context management for temporary directory creation.
Description
A new osmo local run CLI subcommand that executes OSMO workflow specification YAMLs locally using Docker containers, bypassing the Kubernetes control plane entirely. This eliminates the need for a running OSMO service, a Kubernetes cluster, Redis, PostgreSQL, or any other platform infrastructure.
Issue #800
Checklist
Summary by CodeRabbit
New Features
Tests
Documentation
Chores