From 9da50afc455a08b36e83e7f336952393c09b3dc6 Mon Sep 17 00:00:00 2001 From: Keita Watanabe Date: Tue, 31 Mar 2026 16:19:07 +0000 Subject: [PATCH 1/3] fix: convert osmo-ctrl to native K8s sidecar to prevent upload data loss MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit osmo-ctrl is killed before large output uploads complete because: (1) terminationGracePeriodSeconds defaults to 30s (never set by OSMO), and (2) the SIGTERM handler calls os.Exit(1) immediately without waiting for in-progress uploads to finish. This converts osmo-ctrl from a regular container to a Kubernetes native sidecar (KEP-753, init container with restartPolicy: Always), which ensures it receives SIGTERM only after the user container exits. Combined with a SIGTERM drain handler that waits for upload completion, this eliminates the race condition. Changes: - task.py: Move control_container_spec from containers[] to initContainers[] with restartPolicy: Always; set terminationGracePeriodSeconds: 600 - ctrl.go: Add upload drain logic to SIGTERM handler — waits up to 9 minutes for in-progress uploads before exiting - objects.py: Simplify default_ctrl template to only set terminationGracePeriodSeconds (container placement is handled by task.py; duplicating in the template causes K8s 422 errors via merge_lists_on_name) - backend_listener.py: Check init_container_statuses in addition to container_statuses so osmo-ctrl termination is detected as a native sidecar - test_listener.py: Add test for native sidecar osmo-ctrl error detection Requires Kubernetes >= 1.29 (SidecarContainers feature gate beta, enabled by default). Fixes #764 --- src/operator/backend_listener.py | 4 ++- src/operator/tests/test_listener.py | 49 +++++++++++++++++++++++++++++ src/runtime/cmd/ctrl/ctrl.go | 16 ++++++++++ src/service/core/config/objects.py | 18 +---------- src/utils/job/task.py | 4 ++- 5 files changed, 72 insertions(+), 19 deletions(-) diff --git a/src/operator/backend_listener.py b/src/operator/backend_listener.py index aa777e779..795dee22d 100644 --- a/src/operator/backend_listener.py +++ b/src/operator/backend_listener.py @@ -367,7 +367,9 @@ def check_running_pod_containers(pod: kubernetes.client.models.v1_pod.V1Pod) -> # Add more reasons here for cases when one container terminated and we want the service # to clean up the pod reasons = ['StartError'] - container_statuses = pod.status.container_statuses if pod.status.container_statuses else [] + container_statuses = list(itertools.chain( + pod.status.init_container_statuses or [], + pod.status.container_statuses or [])) for container_status in container_statuses: state = container_status.state if state.terminated: diff --git a/src/operator/tests/test_listener.py b/src/operator/tests/test_listener.py index 3fc036a32..ecea2dfd0 100644 --- a/src/operator/tests/test_listener.py +++ b/src/operator/tests/test_listener.py @@ -188,6 +188,55 @@ def test_user_error_but_ctrl_running_results_in_running(self): status, _, _ = backend_listener.calculate_pod_status(pod_event) self.assertEqual(status, task.TaskGroupStatus.RUNNING) + def create_native_sidecar_ctrl_terminated_pod(self): + """ + Create a pod where osmo-ctrl is a native sidecar (in init_container_statuses) + that has terminated with an error, while user container is still running. + """ + ctrl_init_status = V1ContainerStatus( + name='osmo-ctrl', + image='osmo-ctrl-image', + image_id='osmo-ctrl-imageid', + state=V1ContainerState( + terminated=V1ContainerStateTerminated(reason='Error', exit_code=2)), + ready=False, + restart_count=0 + ) + + user_container_status = V1ContainerStatus( + name='user', + image='nginx', + image_id='nginx_id', + state=V1ContainerState( + running=V1ContainerStateRunning(started_at='2024-02-27T12:34:56Z')), + ready=True, + restart_count=0 + ) + + status = V1PodStatus( + phase='Running', + init_container_statuses=[ctrl_init_status], + container_statuses=[user_container_status] + ) + + pod = V1Pod( + api_version='v1', + kind='Pod', + metadata=V1ObjectMeta(name='my-mock-pod'), + spec=self.create_spec(), + status=status + ) + return pod + + def test_native_sidecar_ctrl_error_detected(self): + """ + Test that osmo-ctrl termination is detected when it runs as a native sidecar + (appears in init_container_statuses instead of container_statuses). + """ + pod_event = self.create_native_sidecar_ctrl_terminated_pod() + error_info = backend_listener.check_running_pod_containers(pod_event) + self.assertNotEqual(error_info.error_message, '') + class TestNodeAvailability(unittest.TestCase): def setUp(self): diff --git a/src/runtime/cmd/ctrl/ctrl.go b/src/runtime/cmd/ctrl/ctrl.go index b0bc529ce..e9c26d074 100644 --- a/src/runtime/cmd/ctrl/ctrl.go +++ b/src/runtime/cmd/ctrl/ctrl.go @@ -71,6 +71,10 @@ var barrierReq string var rsyncStatus rsync.RsyncStatus +// Upload drain: allows SIGTERM handler to wait for in-progress uploads to complete +var uploading atomic.Bool +var uploadDone = make(chan struct{}) + type PortForwardType string const ( @@ -1397,6 +1401,15 @@ func main() { signal.Notify(sigintCatch, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigintCatch + if uploading.Load() { + log.Println("SIGTERM received during upload, waiting for completion...") + select { + case <-uploadDone: + log.Println("Upload completed after SIGTERM, exiting gracefully") + case <-time.After(9 * time.Minute): + log.Println("Upload drain timeout exceeded, forcing exit") + } + } cleanupMounts(cmdArgs.DownloadType) os.Exit(1) }() @@ -1482,9 +1495,12 @@ execLogs: // Send files to be uploaded outputStartTime := time.Now().Format("2006-01-02 15:04:05.000") + uploading.Store(true) uploadOutputs(unixConn, cmdArgs.Outputs, cmdArgs.OutputPath, cmdArgs.MetadataFile, uploadChan, metricChan, cmdArgs.RetryId, cmdArgs.GroupName, cmdArgs.LogSource, cmdArgs.UserConfig, cmdArgs.ServiceConfig, cmdArgs.ConfigLoc) + uploading.Store(false) + close(uploadDone) outputEndTime := time.Now().Format("2006-01-02 15:04:05.000") uploadTimes := metrics.GroupMetrics{ RetryId: cmdArgs.RetryId, diff --git a/src/service/core/config/objects.py b/src/service/core/config/objects.py index 989036c7c..decb4ee40 100644 --- a/src/service/core/config/objects.py +++ b/src/service/core/config/objects.py @@ -29,23 +29,7 @@ DEFAULT_POD_TEMPLATES : dict[str, dict] = { 'default_ctrl': { 'spec': { - 'containers': [ - { - 'name': 'osmo-ctrl', - 'resources': { - 'limits': { - 'cpu': '{{USER_CPU}}', - 'memory': '{{USER_MEMORY}}', - 'ephemeral-storage': '{{USER_STORAGE}}' - }, - 'requests': { - 'cpu': '1', - 'memory': '1Gi', - 'ephemeral-storage': '1Gi' - } - } - } - ] + 'terminationGracePeriodSeconds': 600, } }, 'default_user': { diff --git a/src/utils/job/task.py b/src/utils/job/task.py index 43b94c9d4..00d997b07 100644 --- a/src/utils/job/task.py +++ b/src/utils/job/task.py @@ -2805,14 +2805,16 @@ def _build_file_mount(file: File) -> kb_objects.FileMount: 'restartPolicy': 'Never', 'imagePullSecrets': image_pull_secrets, 'hostNetwork': task_spec.hostNetwork, - 'containers': [user_container_spec, control_container_spec], + 'containers': [user_container_spec], 'initContainers': [ k8s_factory.create_init_container( login_file_mount.volume_mount(), user_config_file_mount.volume_mount(), init_extra_args, ), + {**control_container_spec, 'restartPolicy': 'Always'}, ], + 'terminationGracePeriodSeconds': 600, 'volumes': [ {'name': 'osmo'}, {'name': 'osmo-data'}, From eba8a0ed7ae1cecda00d499d251566c389094ffa Mon Sep 17 00:00:00 2001 From: Keita Watanabe Date: Tue, 31 Mar 2026 17:35:47 +0000 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20exit?= =?UTF-8?q?=20code=200=20after=20drain,=20check=20initContainers=20in=20po?= =?UTF-8?q?stgres.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ctrl.go: os.Exit(0) after successful upload drain so K8s reports reason=Completed instead of reason=Error (which caused the backend listener to mark the workflow FAILED despite successful upload) - postgres.py: check_osmo_data_resource() now scans both containers and initContainers for osmo-ctrl resource limits, fixing a regression where capacity calculations silently returned empty after moving osmo-ctrl to initContainers --- src/runtime/cmd/ctrl/ctrl.go | 2 ++ src/utils/connectors/postgres.py | 15 ++++++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/runtime/cmd/ctrl/ctrl.go b/src/runtime/cmd/ctrl/ctrl.go index e9c26d074..f6bc54777 100644 --- a/src/runtime/cmd/ctrl/ctrl.go +++ b/src/runtime/cmd/ctrl/ctrl.go @@ -1406,6 +1406,8 @@ func main() { select { case <-uploadDone: log.Println("Upload completed after SIGTERM, exiting gracefully") + cleanupMounts(cmdArgs.DownloadType) + os.Exit(0) case <-time.After(9 * time.Minute): log.Println("Upload drain timeout exceeded, forcing exit") } diff --git a/src/utils/connectors/postgres.py b/src/utils/connectors/postgres.py index 0f7f7243c..52dc097dd 100644 --- a/src/utils/connectors/postgres.py +++ b/src/utils/connectors/postgres.py @@ -2110,13 +2110,14 @@ def construct_updated_allocatables( def check_osmo_data_resource(pod_template: Dict) -> ResourceLimitations: resource_limits = ResourceLimitations() - containers = pod_template.get('spec', {}).get('containers', []) - if containers: - for container in containers: - if container.get('name', '') == 'osmo-ctrl': - if 'resources' in container: - resource_limits = ResourceLimitations(**container['resources']) - break + spec = pod_template.get('spec', {}) + containers = list(spec.get('containers', []) or []) + containers.extend(spec.get('initContainers', []) or []) + for container in containers: + if container.get('name', '') == 'osmo-ctrl': + if 'resources' in container: + resource_limits = ResourceLimitations(**container['resources']) + break return resource_limits ctrl_usage = {} From 7671bd00f4bce376c56de4572b7efd812be14f6b Mon Sep 17 00:00:00 2001 From: Keita Watanabe Date: Thu, 2 Apr 2026 17:29:41 +0000 Subject: [PATCH 3/3] fix: address deployment issues found during native sidecar testing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five fixes discovered while testing the native sidecar conversion on a production EKS cluster: 1. Honor OSMO_SKIP_DATA_AUTH in ctrl.go — the Python service sets this env var when credential_config.disable_data_validation includes "*" or "s3", but the Go runtime never checked it. Dataset output write validation failed (iam:SimulatePrincipalPolicy), causing silent task failure with exit 0. 2. Log CtrlFailed in user.go — osmo_exec silently returned exit 0 when receiving CtrlFailed, making validation failures invisible to users. 3. Flush log channels before os.Exit in SIGTERM handler — os.Exit() bypasses deferred functions, so logs were lost on task completion. 4. Drain sendLogs queue on stop — sendLogs returned immediately on stopChan without flushing remaining messages. 5. Flush logQueue directly before LogDone — the logger closes the websocket after receiving LogDone, so any user logs still in the queue at that point were lost. Now flushes from the main goroutine while the websocket is still open. 6. Propagate disable_data_validation to ctrl container env — adds OSMO_SKIP_DATA_AUTH=1 when the service config disables validation. --- src/runtime/cmd/ctrl/ctrl.go | 60 ++++++++++++++++++++++++++++++------ src/runtime/cmd/user/user.go | 1 + src/utils/job/task.py | 9 ++++++ 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/src/runtime/cmd/ctrl/ctrl.go b/src/runtime/cmd/ctrl/ctrl.go index f6bc54777..4f0c68144 100644 --- a/src/runtime/cmd/ctrl/ctrl.go +++ b/src/runtime/cmd/ctrl/ctrl.go @@ -871,8 +871,25 @@ func sendLogs(logSource string, logQueue *common.CircularBuffer, logsPeriodMs in for { select { case <-stopChan: + // Drain remaining logs before exiting + for { + bufferMutex.Lock() + logJson, err := logQueue.Peek() + if err != nil { + bufferMutex.Unlock() + break + } + err = messages.Put(webConn, logJson) + if err != nil { + log.Println("Failed to send log during drain:", err) + bufferMutex.Unlock() + break + } + logQueue.Pop() + bufferMutex.Unlock() + } defer waitGoRoutines.Done() - log.Println("Goroutine sendLogs is done") + log.Println("Goroutine sendLogs is done, queue drained") return case <-ticker.C: if data.WebsocketConnection.IsBroken { @@ -1401,23 +1418,30 @@ func main() { signal.Notify(sigintCatch, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigintCatch + log.Println("SIGTERM received, starting graceful shutdown...") if uploading.Load() { - log.Println("SIGTERM received during upload, waiting for completion...") + log.Println("Upload in progress, waiting for completion...") select { case <-uploadDone: - log.Println("Upload completed after SIGTERM, exiting gracefully") - cleanupMounts(cmdArgs.DownloadType) - os.Exit(0) + log.Println("Upload completed after SIGTERM") case <-time.After(9 * time.Minute): - log.Println("Upload drain timeout exceeded, forcing exit") + log.Println("Upload drain timeout exceeded") } } + // Flush log channels before exiting — os.Exit bypasses defers + stopPutLogs <- true + stopSendLogs <- true + waitGoRoutines.Wait() cleanupMounts(cmdArgs.DownloadType) - os.Exit(1) + os.Exit(0) }() - // Validate data auth access before starting downloads/uploads - if err := data.ValidateInputsOutputsAccess( + // Validate data auth access before starting downloads/uploads. + // Honor OSMO_SKIP_DATA_AUTH set by the service when + // credential_config.disable_data_validation includes "*" or "s3". + if os.Getenv("OSMO_SKIP_DATA_AUTH") == "1" { + osmoChan <- "Data validation skipped (OSMO_SKIP_DATA_AUTH=1)" + } else if err := data.ValidateInputsOutputsAccess( cmdArgs.Inputs, cmdArgs.Outputs, cmdArgs.UserConfig, @@ -1511,6 +1535,24 @@ execLogs: MetricType: "output_upload"} metricChan <- uploadTimes + // Flush all remaining user logs directly before sending LogDone. + // sendLogs runs on a ticker and may not have drained the queue yet. + // We must flush here while the websocket is still open — after LogDone, + // the logger closes the connection and any remaining logs are lost. + bufferMutex.Lock() + for !logQueue.IsEmpty() { + logJson, err := logQueue.Peek() + if err != nil { + break + } + if err := messages.Put(webConn, logJson); err != nil { + log.Printf("Failed to flush log before LogDone: %v", err) + break + } + logQueue.Pop() + } + bufferMutex.Unlock() + logMsg := messages.CreateLog(cmdArgs.LogSource, "", messages.LogDone) for !logsFinished { threadsafeEnqueue(logQueue, logMsg) diff --git a/src/runtime/cmd/user/user.go b/src/runtime/cmd/user/user.go index 8290ad973..901b5c511 100644 --- a/src/runtime/cmd/user/user.go +++ b/src/runtime/cmd/user/user.go @@ -417,6 +417,7 @@ func main() { } if response.Type == messages.CtrlFailed { + log.Println("Received CtrlFailed from osmo-ctrl — task cannot proceed") return } else if response.Type == messages.ExecStart { break diff --git a/src/utils/job/task.py b/src/utils/job/task.py index 00d997b07..d5c5c1337 100644 --- a/src/utils/job/task.py +++ b/src/utils/job/task.py @@ -2756,6 +2756,15 @@ def _build_file_mount(file: File) -> kb_objects.FileMount: ctrl_extra_args, workflow_config.backend_images.client, self.group_uuid, file_mounts, task_spec.downloadType.value, task_spec.resources, user_cache_size) + # Propagate disable_data_validation to the ctrl sidecar so it skips + # osmo dataset check when the server-side config says validation is disabled. + disabled_data = workflow_config.credential_config.disable_data_validation + if disabled_data and ('*' in disabled_data or 's3' in disabled_data): + control_container_spec['env'].append({ + 'name': 'OSMO_SKIP_DATA_AUTH', + 'value': '1', + }) + using_gpu = bool(task_spec.resources.gpu and task_spec.resources.gpu > 0) user_args += [ '-socketPath', f'{kb_objects.DATA_LOCATION}/socket/data.sock',