Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion run/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.
SPDX-License-Identifier: Apache-2.0
"""

load("//bzl:py.bzl", "osmo_py_test")
load("//bzl:py.bzl", "osmo_py_binary", "osmo_py_test")
load("@osmo_python_deps//:requirements.bzl", "requirement")

osmo_py_test(
Expand All @@ -29,3 +29,15 @@ osmo_py_test(
size = "medium",
visibility = ["//visibility:public"],
)

osmo_py_binary(
name = "test_scheduler_none_kind",
main = "test_scheduler_none_kind.py",
srcs = ["test_scheduler_none_kind.py"],
deps = [
"//src/lib/utils:priority",
"//src/utils/connectors",
"//src/utils/job",
],
visibility = ["//visibility:public"],
)
236 changes: 236 additions & 0 deletions run/tests/test_scheduler_none_kind.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
"""
SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
Comment on lines +1 to +2

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add the long-line pylint waiver to this SPDX header too.

The new file has the same over-100-character copyright line, so it needs the inline line-too-long disable to stay consistent with repo policy.

As per coding guidelines, "If copyright lines exceed 100 characters, add # pylint: disable=line-too-long comment instead of breaking into multiple lines".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@run/tests/test_scheduler_none_kind.py` around lines 1 - 2, The SPDX header in
this new test file exceeds 100 characters and needs the same pylint waiver used
elsewhere; edit the file-level header (the top triple-quoted string containing
the SPDX line) to append the inline pylint disable for line-too-long (i.e., add
the comment "# pylint: disable=line-too-long" alongside the SPDX copyright
statement) so the long copyright line is exempted from the linter.


Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

SPDX-License-Identifier: Apache-2.0

End-to-end verification of issue #936 'none' scheduler type on a real kind cluster.

Generates pod specs using NoneK8sObjectFactory and KaiK8sObjectFactory directly,
applies them to a kind cluster that has NO kai-scheduler installed, and verifies:

* Kai-style resources fail to apply (PodGroup CRD missing) — proves the cluster
really lacks kai-scheduler, so the test can fail.
* None-style resources apply cleanly and the pod reaches Running, scheduled by
the cluster's default kube-scheduler — proves Option C removes the kai
dependency.

Usage:
KIND_CONTEXT=kind-issue-936-none python test_scheduler_none_kind.py
"""

import datetime
import json
import os
import subprocess
import sys
import time

# Make sibling source tree importable when invoked directly.
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))

from src.lib.utils import priority as wf_priority # noqa: E402
from src.utils import connectors # noqa: E402
from src.utils.job import kb_objects # noqa: E402

KIND_CONTEXT = os.environ.get('KIND_CONTEXT', 'kind-issue-936-none')
NAMESPACE = os.environ.get('TEST_NAMESPACE', 'issue-936-test')
POOL_NAME = 'pool-a'
GROUP_UUID = 'issue-936-grp-1234567890ab'


def kubectl(args: list, *, input_data: str | None = None, check: bool = True) \
-> subprocess.CompletedProcess:
cmd = ['kubectl', '--context', KIND_CONTEXT, '-n', NAMESPACE] + args
return subprocess.run(
cmd, input=input_data, text=True, capture_output=True, check=check)


def make_backend(scheduler_type: connectors.BackendSchedulerType) -> connectors.Backend:
return connectors.Backend(
name='backend-test',
description='kind test backend',
version='1.0.0',
k8s_uid='kind-uid',
k8s_namespace=NAMESPACE,
dashboard_url='http://test',
grafana_url='http://test',
tests=[],
scheduler_settings=connectors.BackendSchedulerSettings(
scheduler_type=scheduler_type,
scheduler_name=('kai-scheduler'
if scheduler_type == connectors.BackendSchedulerType.KAI
else ''),
),
node_conditions=connectors.BackendNodeConditions(),
last_heartbeat=datetime.datetime.now(),
created_date=datetime.datetime.now(),
router_address='router',
online=True,
)


def make_pod(name: str) -> dict:
return {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': name,
'labels': {
'osmo.task_name': name,
'osmo.group_uid': GROUP_UUID,
},
'annotations': {},
},
'spec': {
'restartPolicy': 'Never',
'containers': [{
'name': 'sleeper',
'image': 'busybox:1.36',
'command': ['sh', '-c', 'echo ok && sleep 5'],
}],
},
}


def assert_kind_context_has_no_kai() -> None:
print(f'[setup] verifying {KIND_CONTEXT} has no kai-scheduler installed...')
result = subprocess.run(
['kubectl', '--context', KIND_CONTEXT, 'get', 'crd', '-o', 'name'],
capture_output=True, text=True, check=True)
bad = [line for line in result.stdout.splitlines()
if 'scheduling.run.ai' in line or 'kai.scheduler' in line]
if bad:
print(f' FAIL: cluster has kai/run.ai CRDs: {bad}', file=sys.stderr)
sys.exit(2)
print(' OK: no kai-scheduler CRDs')


def reset_namespace() -> None:
subprocess.run(
['kubectl', '--context', KIND_CONTEXT, 'delete', 'ns', NAMESPACE,
'--ignore-not-found', '--wait=true', '--timeout=60s'],
check=True)
subprocess.run(
['kubectl', '--context', KIND_CONTEXT, 'create', 'ns', NAMESPACE], check=True)


def red_phase_kai_fails() -> None:
print('\n[RED] Kai-style spec must FAIL to apply (proves cluster has no kai)')
backend = make_backend(connectors.BackendSchedulerType.KAI)
factory = kb_objects.get_k8s_object_factory(backend)
assert isinstance(factory, kb_objects.KaiK8sObjectFactory)
pods = [make_pod('kai-task-1')]
resources = factory.create_group_k8s_resources(
GROUP_UUID, pods, {'osmo.label': 'v'}, POOL_NAME,
wf_priority.WorkflowPriority.NORMAL, [], [])
kinds = sorted({r['kind'] for r in resources})
print(f' factory produced kinds={kinds}')
if 'PodGroup' not in kinds:
print(' FAIL: KaiK8sObjectFactory should have produced a PodGroup', file=sys.stderr)
sys.exit(2)

yaml_doc = '\n---\n'.join(json.dumps(r) for r in resources)
result = kubectl(['apply', '-f', '-'], input_data=yaml_doc, check=False)
if result.returncode == 0:
print(' FAIL: kai PodGroup unexpectedly applied — cluster has kai installed?',
file=sys.stderr)
sys.exit(2)
if 'PodGroup' not in result.stderr and 'no matches' not in result.stderr:
print(f' FAIL: unexpected error: {result.stderr}', file=sys.stderr)
sys.exit(2)
print(f' OK: apply rejected as expected: {result.stderr.strip().splitlines()[-1]}')


def green_phase_none_succeeds() -> None:
print('\n[GREEN] None-style spec must apply and pod must reach Running')
backend = make_backend(connectors.BackendSchedulerType.NONE)
factory = kb_objects.get_k8s_object_factory(backend)
assert isinstance(factory, kb_objects.NoneK8sObjectFactory)

pods = [make_pod('none-task-1')]
resources = factory.create_group_k8s_resources(
GROUP_UUID, pods, {'osmo.label': 'v'}, POOL_NAME,
wf_priority.WorkflowPriority.NORMAL, [], [])

kinds = sorted({r['kind'] for r in resources})
print(f' factory produced kinds={kinds}')
if kinds != ['Pod']:
print(f' FAIL: expected [Pod] only, got {kinds}', file=sys.stderr)
sys.exit(2)

pod = resources[0]
if 'schedulerName' in pod['spec']:
print(f' FAIL: pod has schedulerName={pod["spec"]["schedulerName"]}',
file=sys.stderr)
sys.exit(2)
bad_label_keys = [k for k in pod['metadata']['labels']
if k.startswith('kai.scheduler/') or k.startswith('runai/')]
if bad_label_keys:
print(f' FAIL: pod has kai labels: {bad_label_keys}', file=sys.stderr)
sys.exit(2)
print(' pod has no schedulerName, no kai labels ✓')

yaml_doc = '\n---\n'.join(json.dumps(r) for r in resources)
result = kubectl(['apply', '-f', '-'], input_data=yaml_doc)
print(f' apply: {result.stdout.strip()}')

print(' waiting for pod to reach Running (or Succeeded)...')
deadline = time.time() + 120
pod_name = pods[0]['metadata']['name']
last_phase = ''
while time.time() < deadline:
result = kubectl(
['get', 'pod', pod_name, '-o', 'jsonpath={.status.phase}'], check=False)
last_phase = result.stdout.strip()
if last_phase in ('Running', 'Succeeded'):
break
time.sleep(2)
if last_phase not in ('Running', 'Succeeded'):
kubectl(['describe', 'pod', pod_name], check=False)
print(f' FAIL: pod stuck in phase={last_phase}', file=sys.stderr)
sys.exit(2)
print(f' OK: pod phase={last_phase}')

result = kubectl(
['get', 'pod', pod_name, '-o',
'jsonpath={.spec.schedulerName}|{.spec.nodeName}'])
scheduler_name, _, node_name = result.stdout.partition('|')
print(f' scheduled by={scheduler_name!r} on node={node_name!r}')
# K8s defaults to "default-scheduler" when schedulerName is unset.
if scheduler_name not in ('default-scheduler', ''):
print(f' FAIL: unexpected scheduler {scheduler_name}', file=sys.stderr)
sys.exit(2)
if not node_name:
print(' FAIL: pod was not scheduled to a node', file=sys.stderr)
sys.exit(2)


def main() -> None:
assert_kind_context_has_no_kai()
reset_namespace()
try:
red_phase_kai_fails()
reset_namespace()
green_phase_none_succeeds()
finally:
subprocess.run(
['kubectl', '--context', KIND_CONTEXT, 'delete', 'ns', NAMESPACE,
'--ignore-not-found', '--wait=false'],
check=False)
print('\n✅ issue #936 e2e verification PASSED')


if __name__ == '__main__':
main()
7 changes: 6 additions & 1 deletion src/utils/connectors/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -2478,10 +2478,15 @@ def list_from_db(cls, backends: List[str] | None = None,
class BackendSchedulerType(enum.Enum):
""" Defines the type of scheduler used by the backend """
KAI = 'kai'
NONE = 'none'


class BackendSchedulerSettings(pydantic.BaseModel):
"""Settings that control the how pods are scheduled in a backend"""
"""Settings that control the how pods are scheduled in a backend.

When scheduler_type is NONE, scheduler_name is unused: pods are scheduled
by the cluster's default scheduler and no PodGroup CR is created.
"""
scheduler_type: BackendSchedulerType = BackendSchedulerType.KAI
scheduler_name: str = 'kai-scheduler'
scheduler_timeout: int = 30
Expand Down
22 changes: 22 additions & 0 deletions src/utils/job/kb_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,11 +603,33 @@ def topology_supported(self) -> bool:
return True


class NoneK8sObjectFactory(K8sObjectFactory):
"""k8s object factory for clusters with no batch scheduler.

Pods are submitted directly to the cluster's default scheduler. No PodGroup
CR is created, no scheduler-specific labels or annotations are added, and
schedulerName is left unset so Kubernetes assigns the default. This mode
sacrifices gang scheduling and priority/topology constraints but removes
the kai-scheduler dependency.
"""

def __init__(self, backend: connectors.Backend):
# pylint: disable=unused-argument
super().__init__(scheduler_name='')

def update_pod_k8s_resource(self, pod: Dict, group_uuid: str, pool_name: str,
priority: wf_priority.WorkflowPriority):
# No-op: leave schedulerName unset so the default scheduler picks it up.
pass
Comment on lines +620 to +623

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Suppress the intentional unused parameters in the no-op override.

This override ignores every argument, but unlike the similar base-class methods it does not disable unused-argument, so pylint will flag it.

Proposed fix
 def update_pod_k8s_resource(self, pod: Dict, group_uuid: str, pool_name: str,
                             priority: wf_priority.WorkflowPriority):
+    # pylint: disable=unused-argument
     # No-op: leave schedulerName unset so the default scheduler picks it up.
     pass
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/utils/job/kb_objects.py` around lines 620 - 623, The override
update_pod_k8s_resource currently intentionally ignores all parameters but
doesn't suppress pylint unused-argument; modify the method to silence the linter
by either renaming unused parameters with a leading underscore (e.g., _pod,
_group_uuid, _pool_name, _priority) or add a pylint disable comment (e.g., #
pylint: disable=unused-argument) on the method signature, keeping the no-op pass
body and the method name unchanged so it still overrides the base
implementation.



def get_k8s_object_factory(backend: connectors.Backend) -> K8sObjectFactory:
scheduler_settings = backend.scheduler_settings
scheduler_type = scheduler_settings.scheduler_type
if scheduler_type == connectors.BackendSchedulerType.KAI:
return KaiK8sObjectFactory(backend)
elif scheduler_type == connectors.BackendSchedulerType.NONE:
return NoneK8sObjectFactory(backend)
else:
raise osmo_errors.OSMOServerError(f'Unsupported scheduler type: {scheduler_type}')

Expand Down
1 change: 0 additions & 1 deletion src/utils/job/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ py_test(
srcs = [
"test_kb_objects.py"
],
tags = ["manual"],
deps = [
"//src/utils/job",
]
Expand Down
Loading
Loading