From 08c893b574ce414c86b1e6ea86430a9d1300b446 Mon Sep 17 00:00:00 2001 From: Jiaen Ren Date: Fri, 8 May 2026 00:30:25 -0700 Subject: [PATCH] scheduler: add 'none' type that bypasses PodGroup creation (#936) Adds BackendSchedulerType.NONE so backends can run without kai-scheduler. The new NoneK8sObjectFactory skips PodGroup CR creation, leaves schedulerName unset (use the cluster's default scheduler), and emits no kai/runai labels or scheduler-side resources (Queue/Topology). Sacrifices gang scheduling and priority/topology constraints to remove the kai-scheduler dependency. Tests: - 14 new unit tests in src/utils/job/tests/test_kb_objects.py covering enum parsing, factory dispatch, pod spec absence-of-fields, no PodGroup in output, no kai/runai labels, empty scheduler resources, and priority/topology unsupported. - New run/tests/test_scheduler_none_kind.py e2e harness that, on a kind cluster with no kai-scheduler installed, asserts (a) the kai factory's PodGroup is rejected by the API server (proving the negative) and (b) the none factory's pod applies cleanly and reaches Running on default-scheduler. --- run/tests/BUILD | 14 +- run/tests/test_scheduler_none_kind.py | 236 +++++++++++++++++++++++++ src/utils/connectors/postgres.py | 7 +- src/utils/job/kb_objects.py | 22 +++ src/utils/job/tests/BUILD | 1 - src/utils/job/tests/test_kb_objects.py | 189 ++++++++++++++++++-- 6 files changed, 456 insertions(+), 13 deletions(-) create mode 100644 run/tests/test_scheduler_none_kind.py diff --git a/run/tests/BUILD b/run/tests/BUILD index c56b7536c..5341a5cb3 100644 --- a/run/tests/BUILD +++ b/run/tests/BUILD @@ -16,7 +16,7 @@ limitations under the License. SPDX-License-Identifier: Apache-2.0 """ -load("//bzl:py.bzl", "osmo_py_test") +load("//bzl:py.bzl", "osmo_py_binary", "osmo_py_test") load("@osmo_python_deps//:requirements.bzl", "requirement") osmo_py_test( @@ -29,3 +29,15 @@ osmo_py_test( size = "medium", visibility = ["//visibility:public"], ) + +osmo_py_binary( + name = "test_scheduler_none_kind", + main = "test_scheduler_none_kind.py", + srcs = ["test_scheduler_none_kind.py"], + deps = [ + "//src/lib/utils:priority", + "//src/utils/connectors", + "//src/utils/job", + ], + visibility = ["//visibility:public"], +) diff --git a/run/tests/test_scheduler_none_kind.py b/run/tests/test_scheduler_none_kind.py new file mode 100644 index 000000000..1d285aae9 --- /dev/null +++ b/run/tests/test_scheduler_none_kind.py @@ -0,0 +1,236 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 + +End-to-end verification of issue #936 'none' scheduler type on a real kind cluster. + +Generates pod specs using NoneK8sObjectFactory and KaiK8sObjectFactory directly, +applies them to a kind cluster that has NO kai-scheduler installed, and verifies: + +* Kai-style resources fail to apply (PodGroup CRD missing) — proves the cluster + really lacks kai-scheduler, so the test can fail. +* None-style resources apply cleanly and the pod reaches Running, scheduled by + the cluster's default kube-scheduler — proves Option C removes the kai + dependency. + +Usage: + KIND_CONTEXT=kind-issue-936-none python test_scheduler_none_kind.py +""" + +import datetime +import json +import os +import subprocess +import sys +import time + +# Make sibling source tree importable when invoked directly. +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) + +from src.lib.utils import priority as wf_priority # noqa: E402 +from src.utils import connectors # noqa: E402 +from src.utils.job import kb_objects # noqa: E402 + +KIND_CONTEXT = os.environ.get('KIND_CONTEXT', 'kind-issue-936-none') +NAMESPACE = os.environ.get('TEST_NAMESPACE', 'issue-936-test') +POOL_NAME = 'pool-a' +GROUP_UUID = 'issue-936-grp-1234567890ab' + + +def kubectl(args: list, *, input_data: str | None = None, check: bool = True) \ + -> subprocess.CompletedProcess: + cmd = ['kubectl', '--context', KIND_CONTEXT, '-n', NAMESPACE] + args + return subprocess.run( + cmd, input=input_data, text=True, capture_output=True, check=check) + + +def make_backend(scheduler_type: connectors.BackendSchedulerType) -> connectors.Backend: + return connectors.Backend( + name='backend-test', + description='kind test backend', + version='1.0.0', + k8s_uid='kind-uid', + k8s_namespace=NAMESPACE, + dashboard_url='http://test', + grafana_url='http://test', + tests=[], + scheduler_settings=connectors.BackendSchedulerSettings( + scheduler_type=scheduler_type, + scheduler_name=('kai-scheduler' + if scheduler_type == connectors.BackendSchedulerType.KAI + else ''), + ), + node_conditions=connectors.BackendNodeConditions(), + last_heartbeat=datetime.datetime.now(), + created_date=datetime.datetime.now(), + router_address='router', + online=True, + ) + + +def make_pod(name: str) -> dict: + return { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': name, + 'labels': { + 'osmo.task_name': name, + 'osmo.group_uid': GROUP_UUID, + }, + 'annotations': {}, + }, + 'spec': { + 'restartPolicy': 'Never', + 'containers': [{ + 'name': 'sleeper', + 'image': 'busybox:1.36', + 'command': ['sh', '-c', 'echo ok && sleep 5'], + }], + }, + } + + +def assert_kind_context_has_no_kai() -> None: + print(f'[setup] verifying {KIND_CONTEXT} has no kai-scheduler installed...') + result = subprocess.run( + ['kubectl', '--context', KIND_CONTEXT, 'get', 'crd', '-o', 'name'], + capture_output=True, text=True, check=True) + bad = [line for line in result.stdout.splitlines() + if 'scheduling.run.ai' in line or 'kai.scheduler' in line] + if bad: + print(f' FAIL: cluster has kai/run.ai CRDs: {bad}', file=sys.stderr) + sys.exit(2) + print(' OK: no kai-scheduler CRDs') + + +def reset_namespace() -> None: + subprocess.run( + ['kubectl', '--context', KIND_CONTEXT, 'delete', 'ns', NAMESPACE, + '--ignore-not-found', '--wait=true', '--timeout=60s'], + check=True) + subprocess.run( + ['kubectl', '--context', KIND_CONTEXT, 'create', 'ns', NAMESPACE], check=True) + + +def red_phase_kai_fails() -> None: + print('\n[RED] Kai-style spec must FAIL to apply (proves cluster has no kai)') + backend = make_backend(connectors.BackendSchedulerType.KAI) + factory = kb_objects.get_k8s_object_factory(backend) + assert isinstance(factory, kb_objects.KaiK8sObjectFactory) + pods = [make_pod('kai-task-1')] + resources = factory.create_group_k8s_resources( + GROUP_UUID, pods, {'osmo.label': 'v'}, POOL_NAME, + wf_priority.WorkflowPriority.NORMAL, [], []) + kinds = sorted({r['kind'] for r in resources}) + print(f' factory produced kinds={kinds}') + if 'PodGroup' not in kinds: + print(' FAIL: KaiK8sObjectFactory should have produced a PodGroup', file=sys.stderr) + sys.exit(2) + + yaml_doc = '\n---\n'.join(json.dumps(r) for r in resources) + result = kubectl(['apply', '-f', '-'], input_data=yaml_doc, check=False) + if result.returncode == 0: + print(' FAIL: kai PodGroup unexpectedly applied — cluster has kai installed?', + file=sys.stderr) + sys.exit(2) + if 'PodGroup' not in result.stderr and 'no matches' not in result.stderr: + print(f' FAIL: unexpected error: {result.stderr}', file=sys.stderr) + sys.exit(2) + print(f' OK: apply rejected as expected: {result.stderr.strip().splitlines()[-1]}') + + +def green_phase_none_succeeds() -> None: + print('\n[GREEN] None-style spec must apply and pod must reach Running') + backend = make_backend(connectors.BackendSchedulerType.NONE) + factory = kb_objects.get_k8s_object_factory(backend) + assert isinstance(factory, kb_objects.NoneK8sObjectFactory) + + pods = [make_pod('none-task-1')] + resources = factory.create_group_k8s_resources( + GROUP_UUID, pods, {'osmo.label': 'v'}, POOL_NAME, + wf_priority.WorkflowPriority.NORMAL, [], []) + + kinds = sorted({r['kind'] for r in resources}) + print(f' factory produced kinds={kinds}') + if kinds != ['Pod']: + print(f' FAIL: expected [Pod] only, got {kinds}', file=sys.stderr) + sys.exit(2) + + pod = resources[0] + if 'schedulerName' in pod['spec']: + print(f' FAIL: pod has schedulerName={pod["spec"]["schedulerName"]}', + file=sys.stderr) + sys.exit(2) + bad_label_keys = [k for k in pod['metadata']['labels'] + if k.startswith('kai.scheduler/') or k.startswith('runai/')] + if bad_label_keys: + print(f' FAIL: pod has kai labels: {bad_label_keys}', file=sys.stderr) + sys.exit(2) + print(' pod has no schedulerName, no kai labels ✓') + + yaml_doc = '\n---\n'.join(json.dumps(r) for r in resources) + result = kubectl(['apply', '-f', '-'], input_data=yaml_doc) + print(f' apply: {result.stdout.strip()}') + + print(' waiting for pod to reach Running (or Succeeded)...') + deadline = time.time() + 120 + pod_name = pods[0]['metadata']['name'] + last_phase = '' + while time.time() < deadline: + result = kubectl( + ['get', 'pod', pod_name, '-o', 'jsonpath={.status.phase}'], check=False) + last_phase = result.stdout.strip() + if last_phase in ('Running', 'Succeeded'): + break + time.sleep(2) + if last_phase not in ('Running', 'Succeeded'): + kubectl(['describe', 'pod', pod_name], check=False) + print(f' FAIL: pod stuck in phase={last_phase}', file=sys.stderr) + sys.exit(2) + print(f' OK: pod phase={last_phase}') + + result = kubectl( + ['get', 'pod', pod_name, '-o', + 'jsonpath={.spec.schedulerName}|{.spec.nodeName}']) + scheduler_name, _, node_name = result.stdout.partition('|') + print(f' scheduled by={scheduler_name!r} on node={node_name!r}') + # K8s defaults to "default-scheduler" when schedulerName is unset. + if scheduler_name not in ('default-scheduler', ''): + print(f' FAIL: unexpected scheduler {scheduler_name}', file=sys.stderr) + sys.exit(2) + if not node_name: + print(' FAIL: pod was not scheduled to a node', file=sys.stderr) + sys.exit(2) + + +def main() -> None: + assert_kind_context_has_no_kai() + reset_namespace() + try: + red_phase_kai_fails() + reset_namespace() + green_phase_none_succeeds() + finally: + subprocess.run( + ['kubectl', '--context', KIND_CONTEXT, 'delete', 'ns', NAMESPACE, + '--ignore-not-found', '--wait=false'], + check=False) + print('\n✅ issue #936 e2e verification PASSED') + + +if __name__ == '__main__': + main() diff --git a/src/utils/connectors/postgres.py b/src/utils/connectors/postgres.py index 9700b6009..e83da34d4 100644 --- a/src/utils/connectors/postgres.py +++ b/src/utils/connectors/postgres.py @@ -2478,10 +2478,15 @@ def list_from_db(cls, backends: List[str] | None = None, class BackendSchedulerType(enum.Enum): """ Defines the type of scheduler used by the backend """ KAI = 'kai' + NONE = 'none' class BackendSchedulerSettings(pydantic.BaseModel): - """Settings that control the how pods are scheduled in a backend""" + """Settings that control the how pods are scheduled in a backend. + + When scheduler_type is NONE, scheduler_name is unused: pods are scheduled + by the cluster's default scheduler and no PodGroup CR is created. + """ scheduler_type: BackendSchedulerType = BackendSchedulerType.KAI scheduler_name: str = 'kai-scheduler' scheduler_timeout: int = 30 diff --git a/src/utils/job/kb_objects.py b/src/utils/job/kb_objects.py index c5bb5d41f..326ecf45d 100644 --- a/src/utils/job/kb_objects.py +++ b/src/utils/job/kb_objects.py @@ -603,11 +603,33 @@ def topology_supported(self) -> bool: return True +class NoneK8sObjectFactory(K8sObjectFactory): + """k8s object factory for clusters with no batch scheduler. + + Pods are submitted directly to the cluster's default scheduler. No PodGroup + CR is created, no scheduler-specific labels or annotations are added, and + schedulerName is left unset so Kubernetes assigns the default. This mode + sacrifices gang scheduling and priority/topology constraints but removes + the kai-scheduler dependency. + """ + + def __init__(self, backend: connectors.Backend): + # pylint: disable=unused-argument + super().__init__(scheduler_name='') + + def update_pod_k8s_resource(self, pod: Dict, group_uuid: str, pool_name: str, + priority: wf_priority.WorkflowPriority): + # No-op: leave schedulerName unset so the default scheduler picks it up. + pass + + def get_k8s_object_factory(backend: connectors.Backend) -> K8sObjectFactory: scheduler_settings = backend.scheduler_settings scheduler_type = scheduler_settings.scheduler_type if scheduler_type == connectors.BackendSchedulerType.KAI: return KaiK8sObjectFactory(backend) + elif scheduler_type == connectors.BackendSchedulerType.NONE: + return NoneK8sObjectFactory(backend) else: raise osmo_errors.OSMOServerError(f'Unsupported scheduler type: {scheduler_type}') diff --git a/src/utils/job/tests/BUILD b/src/utils/job/tests/BUILD index 35970c76f..259b4721d 100644 --- a/src/utils/job/tests/BUILD +++ b/src/utils/job/tests/BUILD @@ -78,7 +78,6 @@ py_test( srcs = [ "test_kb_objects.py" ], - tags = ["manual"], deps = [ "//src/utils/job", ] diff --git a/src/utils/job/tests/test_kb_objects.py b/src/utils/job/tests/test_kb_objects.py index 4646367eb..dc1197c77 100644 --- a/src/utils/job/tests/test_kb_objects.py +++ b/src/utils/job/tests/test_kb_objects.py @@ -1,5 +1,5 @@ """ -SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -15,9 +15,12 @@ SPDX-License-Identifier: Apache-2.0 """ +import datetime import unittest -from src.utils.job import kb_objects +from src.lib.utils import priority as wf_priority +from src.utils import connectors +from src.utils.job import kb_objects, topology class KbObjectTest(unittest.TestCase): @@ -27,11 +30,11 @@ def test_simple_host_mounts(self): """ test_path = '/opt/data' host_mount = kb_objects.HostMount(name='my-mount', path=test_path) - self.assertEquals(host_mount.src_path, test_path) - self.assertEquals(host_mount.dest_path, test_path) + self.assertEqual(host_mount.src_path, test_path) + self.assertEqual(host_mount.dest_path, test_path) - self.assertEquals(host_mount.volume()['hostPath']['path'], test_path) - self.assertEquals(host_mount.volume_mount()['mountPath'], test_path) + self.assertEqual(host_mount.volume()['hostPath']['path'], test_path) + self.assertEqual(host_mount.volume_mount()['mountPath'], test_path) def test_src_dest_host_mounts(self): """ @@ -40,10 +43,176 @@ def test_src_dest_host_mounts(self): src_test_path = '/opt/data' dest_test_path = '/home/data' host_mount = kb_objects.HostMount(name='my-mount', path=f'{src_test_path}:{dest_test_path}') - self.assertEquals(host_mount.src_path, src_test_path) - self.assertEquals(host_mount.dest_path, dest_test_path) - self.assertEquals(host_mount.volume()['hostPath']['path'], src_test_path) - self.assertEquals(host_mount.volume_mount()['mountPath'], dest_test_path) + self.assertEqual(host_mount.src_path, src_test_path) + self.assertEqual(host_mount.dest_path, dest_test_path) + self.assertEqual(host_mount.volume()['hostPath']['path'], src_test_path) + self.assertEqual(host_mount.volume_mount()['mountPath'], dest_test_path) + + +def _make_backend(scheduler_type: connectors.BackendSchedulerType, + scheduler_name: str = '') -> connectors.Backend: + return connectors.Backend( + name='test-backend', + description='Test backend', + version='1.0.0', + k8s_uid='test-uid', + k8s_namespace='test-namespace', + dashboard_url='http://test', + grafana_url='http://test', + tests=[], + scheduler_settings=connectors.BackendSchedulerSettings( + scheduler_type=scheduler_type, + scheduler_name=scheduler_name, + ), + node_conditions=connectors.BackendNodeConditions(), + last_heartbeat=datetime.datetime.now(), + created_date=datetime.datetime.now(), + router_address='test-router', + online=True, + ) + + +def _make_pod(task_name: str) -> dict: + return { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': f'pod-{task_name}', + 'labels': {'osmo.task_name': task_name}, + 'annotations': {}, + }, + 'spec': { + 'containers': [{'name': 'test', 'image': 'test:latest'}], + }, + } + + +class NoneSchedulerSettingsTest(unittest.TestCase): + """Validate that BackendSchedulerType.NONE is accepted by the Pydantic model.""" + + def test_enum_value_is_none(self): + self.assertEqual(connectors.BackendSchedulerType.NONE.value, 'none') + + def test_settings_parses_none_from_string(self): + settings = connectors.BackendSchedulerSettings(scheduler_type='none') + self.assertEqual(settings.scheduler_type, connectors.BackendSchedulerType.NONE) + + def test_settings_parses_none_from_dict(self): + settings = connectors.BackendSchedulerSettings.model_validate({ + 'scheduler_type': 'none', + }) + self.assertEqual(settings.scheduler_type, connectors.BackendSchedulerType.NONE) + + +class NoneK8sObjectFactoryTest(unittest.TestCase): + """Verify the 'none' scheduler factory bypasses PodGroup creation entirely.""" + + def setUp(self): + self.backend = _make_backend(connectors.BackendSchedulerType.NONE) + self.factory = kb_objects.get_k8s_object_factory(self.backend) + + def test_factory_dispatch(self): + self.assertIsInstance(self.factory, kb_objects.NoneK8sObjectFactory) + + def test_create_group_returns_pods_only(self): + pods = [_make_pod(f'task{i}') for i in range(3)] + resources = self.factory.create_group_k8s_resources( + 'group-uuid', pods, {'osmo.label': 'value'}, 'pool-a', + wf_priority.WorkflowPriority.NORMAL, [], [], + ) + self.assertEqual(len(resources), 3) + for resource in resources: + self.assertEqual(resource['kind'], 'Pod') + + def test_no_podgroup_in_output(self): + pods = [_make_pod('task1')] + resources = self.factory.create_group_k8s_resources( + 'group-uuid', pods, {}, 'pool-a', + wf_priority.WorkflowPriority.NORMAL, [], [], + ) + kinds = {r['kind'] for r in resources} + self.assertNotIn('PodGroup', kinds) + + def test_pod_has_no_custom_scheduler_name(self): + pods = [_make_pod('task1')] + resources = self.factory.create_group_k8s_resources( + 'group-uuid', pods, {}, 'pool-a', + wf_priority.WorkflowPriority.NORMAL, [], [], + ) + pod = resources[0] + self.assertNotIn('schedulerName', pod['spec']) + + def test_pod_has_no_kai_or_runai_labels(self): + pods = [_make_pod('task1')] + resources = self.factory.create_group_k8s_resources( + 'group-uuid', pods, {'osmo.label': 'value'}, 'pool-a', + wf_priority.WorkflowPriority.NORMAL, [], [], + ) + pod = resources[0] + self.assertNotIn('kai.scheduler/queue', pod['metadata']['labels']) + self.assertNotIn('runai/queue', pod['metadata']['labels']) + self.assertNotIn('kai.scheduler/subgroup-name', pod['metadata']['labels']) + + def test_pod_has_no_pod_group_annotation(self): + pods = [_make_pod('task1')] + resources = self.factory.create_group_k8s_resources( + 'group-uuid', pods, {}, 'pool-a', + wf_priority.WorkflowPriority.NORMAL, [], [], + ) + annotations = resources[0]['metadata'].get('annotations', {}) + self.assertNotIn('pod-group-name', annotations) + + def test_update_pod_does_not_set_scheduler_name(self): + pod = _make_pod('task1') + self.factory.update_pod_k8s_resource( + pod, 'group-uuid', 'pool-a', wf_priority.WorkflowPriority.NORMAL, + ) + self.assertNotIn('schedulerName', pod['spec']) + self.assertNotIn('kai.scheduler/queue', pod['metadata']['labels']) + + def test_cleanup_specs_does_not_include_podgroup(self): + specs = self.factory.get_group_cleanup_specs({'osmo.group_uid': 'g'}) + for spec in specs: + api = getattr(spec, 'generic_api', None) + kind = api.kind if api else spec.resource_type + self.assertNotEqual(kind, 'PodGroup') + + def test_scheduler_resources_spec_is_empty(self): + self.assertEqual(self.factory.get_scheduler_resources_spec(self.backend, []), []) + self.assertEqual(self.factory.list_scheduler_resources_spec(self.backend), []) + self.assertEqual(self.factory.list_immutable_scheduler_resources(), []) + + def test_priority_and_topology_unsupported(self): + self.assertFalse(self.factory.priority_supported()) + self.assertFalse(self.factory.topology_supported()) + + +class NoneFactoryWithTopologyKeysTest(unittest.TestCase): + """Topology keys must not produce PodGroup constraints in 'none' mode.""" + + def test_topology_keys_ignored(self): + backend = _make_backend(connectors.BackendSchedulerType.NONE) + factory = kb_objects.get_k8s_object_factory(backend) + topology_keys = [ + topology.TopologyKey(key='gpu-clique', label='nvidia.com/gpu-clique'), + ] + task_infos = [ + topology.TaskTopology( + name='task1', + topology_requirements=[ + topology.TopologyRequirement( + key='gpu-clique', group='default', required=True), + ], + ), + ] + pods = [_make_pod('task1')] + resources = factory.create_group_k8s_resources( + 'group-uuid', pods, {}, 'pool-a', + wf_priority.WorkflowPriority.NORMAL, topology_keys, task_infos, + ) + kinds = {r['kind'] for r in resources} + self.assertEqual(kinds, {'Pod'}) + if __name__ == "__main__": unittest.main()