diff --git a/src/operator/backend_listener.py b/src/operator/backend_listener.py index aa777e779..795dee22d 100644 --- a/src/operator/backend_listener.py +++ b/src/operator/backend_listener.py @@ -367,7 +367,9 @@ def check_running_pod_containers(pod: kubernetes.client.models.v1_pod.V1Pod) -> # Add more reasons here for cases when one container terminated and we want the service # to clean up the pod reasons = ['StartError'] - container_statuses = pod.status.container_statuses if pod.status.container_statuses else [] + container_statuses = list(itertools.chain( + pod.status.init_container_statuses or [], + pod.status.container_statuses or [])) for container_status in container_statuses: state = container_status.state if state.terminated: diff --git a/src/operator/tests/test_listener.py b/src/operator/tests/test_listener.py index 3fc036a32..ecea2dfd0 100644 --- a/src/operator/tests/test_listener.py +++ b/src/operator/tests/test_listener.py @@ -188,6 +188,55 @@ def test_user_error_but_ctrl_running_results_in_running(self): status, _, _ = backend_listener.calculate_pod_status(pod_event) self.assertEqual(status, task.TaskGroupStatus.RUNNING) + def create_native_sidecar_ctrl_terminated_pod(self): + """ + Create a pod where osmo-ctrl is a native sidecar (in init_container_statuses) + that has terminated with an error, while user container is still running. + """ + ctrl_init_status = V1ContainerStatus( + name='osmo-ctrl', + image='osmo-ctrl-image', + image_id='osmo-ctrl-imageid', + state=V1ContainerState( + terminated=V1ContainerStateTerminated(reason='Error', exit_code=2)), + ready=False, + restart_count=0 + ) + + user_container_status = V1ContainerStatus( + name='user', + image='nginx', + image_id='nginx_id', + state=V1ContainerState( + running=V1ContainerStateRunning(started_at='2024-02-27T12:34:56Z')), + ready=True, + restart_count=0 + ) + + status = V1PodStatus( + phase='Running', + init_container_statuses=[ctrl_init_status], + container_statuses=[user_container_status] + ) + + pod = V1Pod( + api_version='v1', + kind='Pod', + metadata=V1ObjectMeta(name='my-mock-pod'), + spec=self.create_spec(), + status=status + ) + return pod + + def test_native_sidecar_ctrl_error_detected(self): + """ + Test that osmo-ctrl termination is detected when it runs as a native sidecar + (appears in init_container_statuses instead of container_statuses). + """ + pod_event = self.create_native_sidecar_ctrl_terminated_pod() + error_info = backend_listener.check_running_pod_containers(pod_event) + self.assertNotEqual(error_info.error_message, '') + class TestNodeAvailability(unittest.TestCase): def setUp(self): diff --git a/src/runtime/cmd/ctrl/ctrl.go b/src/runtime/cmd/ctrl/ctrl.go index b0bc529ce..4f0c68144 100644 --- a/src/runtime/cmd/ctrl/ctrl.go +++ b/src/runtime/cmd/ctrl/ctrl.go @@ -71,6 +71,10 @@ var barrierReq string var rsyncStatus rsync.RsyncStatus +// Upload drain: allows SIGTERM handler to wait for in-progress uploads to complete +var uploading atomic.Bool +var uploadDone = make(chan struct{}) + type PortForwardType string const ( @@ -867,8 +871,25 @@ func sendLogs(logSource string, logQueue *common.CircularBuffer, logsPeriodMs in for { select { case <-stopChan: + // Drain remaining logs before exiting + for { + bufferMutex.Lock() + logJson, err := logQueue.Peek() + if err != nil { + bufferMutex.Unlock() + break + } + err = messages.Put(webConn, logJson) + if err != nil { + log.Println("Failed to send log during drain:", err) + bufferMutex.Unlock() + break + } + logQueue.Pop() + bufferMutex.Unlock() + } defer waitGoRoutines.Done() - log.Println("Goroutine sendLogs is done") + log.Println("Goroutine sendLogs is done, queue drained") return case <-ticker.C: if data.WebsocketConnection.IsBroken { @@ -1397,12 +1418,30 @@ func main() { signal.Notify(sigintCatch, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigintCatch + log.Println("SIGTERM received, starting graceful shutdown...") + if uploading.Load() { + log.Println("Upload in progress, waiting for completion...") + select { + case <-uploadDone: + log.Println("Upload completed after SIGTERM") + case <-time.After(9 * time.Minute): + log.Println("Upload drain timeout exceeded") + } + } + // Flush log channels before exiting — os.Exit bypasses defers + stopPutLogs <- true + stopSendLogs <- true + waitGoRoutines.Wait() cleanupMounts(cmdArgs.DownloadType) - os.Exit(1) + os.Exit(0) }() - // Validate data auth access before starting downloads/uploads - if err := data.ValidateInputsOutputsAccess( + // Validate data auth access before starting downloads/uploads. + // Honor OSMO_SKIP_DATA_AUTH set by the service when + // credential_config.disable_data_validation includes "*" or "s3". + if os.Getenv("OSMO_SKIP_DATA_AUTH") == "1" { + osmoChan <- "Data validation skipped (OSMO_SKIP_DATA_AUTH=1)" + } else if err := data.ValidateInputsOutputsAccess( cmdArgs.Inputs, cmdArgs.Outputs, cmdArgs.UserConfig, @@ -1482,9 +1521,12 @@ execLogs: // Send files to be uploaded outputStartTime := time.Now().Format("2006-01-02 15:04:05.000") + uploading.Store(true) uploadOutputs(unixConn, cmdArgs.Outputs, cmdArgs.OutputPath, cmdArgs.MetadataFile, uploadChan, metricChan, cmdArgs.RetryId, cmdArgs.GroupName, cmdArgs.LogSource, cmdArgs.UserConfig, cmdArgs.ServiceConfig, cmdArgs.ConfigLoc) + uploading.Store(false) + close(uploadDone) outputEndTime := time.Now().Format("2006-01-02 15:04:05.000") uploadTimes := metrics.GroupMetrics{ RetryId: cmdArgs.RetryId, @@ -1493,6 +1535,24 @@ execLogs: MetricType: "output_upload"} metricChan <- uploadTimes + // Flush all remaining user logs directly before sending LogDone. + // sendLogs runs on a ticker and may not have drained the queue yet. + // We must flush here while the websocket is still open — after LogDone, + // the logger closes the connection and any remaining logs are lost. + bufferMutex.Lock() + for !logQueue.IsEmpty() { + logJson, err := logQueue.Peek() + if err != nil { + break + } + if err := messages.Put(webConn, logJson); err != nil { + log.Printf("Failed to flush log before LogDone: %v", err) + break + } + logQueue.Pop() + } + bufferMutex.Unlock() + logMsg := messages.CreateLog(cmdArgs.LogSource, "", messages.LogDone) for !logsFinished { threadsafeEnqueue(logQueue, logMsg) diff --git a/src/runtime/cmd/user/user.go b/src/runtime/cmd/user/user.go index 8290ad973..901b5c511 100644 --- a/src/runtime/cmd/user/user.go +++ b/src/runtime/cmd/user/user.go @@ -417,6 +417,7 @@ func main() { } if response.Type == messages.CtrlFailed { + log.Println("Received CtrlFailed from osmo-ctrl — task cannot proceed") return } else if response.Type == messages.ExecStart { break diff --git a/src/service/core/config/objects.py b/src/service/core/config/objects.py index 989036c7c..decb4ee40 100644 --- a/src/service/core/config/objects.py +++ b/src/service/core/config/objects.py @@ -29,23 +29,7 @@ DEFAULT_POD_TEMPLATES : dict[str, dict] = { 'default_ctrl': { 'spec': { - 'containers': [ - { - 'name': 'osmo-ctrl', - 'resources': { - 'limits': { - 'cpu': '{{USER_CPU}}', - 'memory': '{{USER_MEMORY}}', - 'ephemeral-storage': '{{USER_STORAGE}}' - }, - 'requests': { - 'cpu': '1', - 'memory': '1Gi', - 'ephemeral-storage': '1Gi' - } - } - } - ] + 'terminationGracePeriodSeconds': 600, } }, 'default_user': { diff --git a/src/utils/connectors/postgres.py b/src/utils/connectors/postgres.py index 0f7f7243c..52dc097dd 100644 --- a/src/utils/connectors/postgres.py +++ b/src/utils/connectors/postgres.py @@ -2110,13 +2110,14 @@ def construct_updated_allocatables( def check_osmo_data_resource(pod_template: Dict) -> ResourceLimitations: resource_limits = ResourceLimitations() - containers = pod_template.get('spec', {}).get('containers', []) - if containers: - for container in containers: - if container.get('name', '') == 'osmo-ctrl': - if 'resources' in container: - resource_limits = ResourceLimitations(**container['resources']) - break + spec = pod_template.get('spec', {}) + containers = list(spec.get('containers', []) or []) + containers.extend(spec.get('initContainers', []) or []) + for container in containers: + if container.get('name', '') == 'osmo-ctrl': + if 'resources' in container: + resource_limits = ResourceLimitations(**container['resources']) + break return resource_limits ctrl_usage = {} diff --git a/src/utils/job/task.py b/src/utils/job/task.py index 43b94c9d4..d5c5c1337 100644 --- a/src/utils/job/task.py +++ b/src/utils/job/task.py @@ -2756,6 +2756,15 @@ def _build_file_mount(file: File) -> kb_objects.FileMount: ctrl_extra_args, workflow_config.backend_images.client, self.group_uuid, file_mounts, task_spec.downloadType.value, task_spec.resources, user_cache_size) + # Propagate disable_data_validation to the ctrl sidecar so it skips + # osmo dataset check when the server-side config says validation is disabled. + disabled_data = workflow_config.credential_config.disable_data_validation + if disabled_data and ('*' in disabled_data or 's3' in disabled_data): + control_container_spec['env'].append({ + 'name': 'OSMO_SKIP_DATA_AUTH', + 'value': '1', + }) + using_gpu = bool(task_spec.resources.gpu and task_spec.resources.gpu > 0) user_args += [ '-socketPath', f'{kb_objects.DATA_LOCATION}/socket/data.sock', @@ -2805,14 +2814,16 @@ def _build_file_mount(file: File) -> kb_objects.FileMount: 'restartPolicy': 'Never', 'imagePullSecrets': image_pull_secrets, 'hostNetwork': task_spec.hostNetwork, - 'containers': [user_container_spec, control_container_spec], + 'containers': [user_container_spec], 'initContainers': [ k8s_factory.create_init_container( login_file_mount.volume_mount(), user_config_file_mount.volume_mount(), init_extra_args, ), + {**control_container_spec, 'restartPolicy': 'Always'}, ], + 'terminationGracePeriodSeconds': 600, 'volumes': [ {'name': 'osmo'}, {'name': 'osmo-data'},