Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/operator/backend_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ def check_running_pod_containers(pod: kubernetes.client.models.v1_pod.V1Pod) ->
# Add more reasons here for cases when one container terminated and we want the service
# to clean up the pod
reasons = ['StartError']
container_statuses = pod.status.container_statuses if pod.status.container_statuses else []
container_statuses = list(itertools.chain(
pod.status.init_container_statuses or [],
pod.status.container_statuses or []))
for container_status in container_statuses:
state = container_status.state
if state.terminated:
Expand Down
49 changes: 49 additions & 0 deletions src/operator/tests/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,55 @@ def test_user_error_but_ctrl_running_results_in_running(self):
status, _, _ = backend_listener.calculate_pod_status(pod_event)
self.assertEqual(status, task.TaskGroupStatus.RUNNING)

def create_native_sidecar_ctrl_terminated_pod(self):
"""
Create a pod where osmo-ctrl is a native sidecar (in init_container_statuses)
that has terminated with an error, while user container is still running.
"""
ctrl_init_status = V1ContainerStatus(
name='osmo-ctrl',
image='osmo-ctrl-image',
image_id='osmo-ctrl-imageid',
state=V1ContainerState(
terminated=V1ContainerStateTerminated(reason='Error', exit_code=2)),
ready=False,
restart_count=0
)

user_container_status = V1ContainerStatus(
name='user',
image='nginx',
image_id='nginx_id',
state=V1ContainerState(
running=V1ContainerStateRunning(started_at='2024-02-27T12:34:56Z')),
ready=True,
restart_count=0
)

status = V1PodStatus(
phase='Running',
init_container_statuses=[ctrl_init_status],
container_statuses=[user_container_status]
)

pod = V1Pod(
api_version='v1',
kind='Pod',
metadata=V1ObjectMeta(name='my-mock-pod'),
spec=self.create_spec(),
status=status
)
return pod

def test_native_sidecar_ctrl_error_detected(self):
"""
Test that osmo-ctrl termination is detected when it runs as a native sidecar
(appears in init_container_statuses instead of container_statuses).
"""
pod_event = self.create_native_sidecar_ctrl_terminated_pod()
error_info = backend_listener.check_running_pod_containers(pod_event)
self.assertNotEqual(error_info.error_message, '')


class TestNodeAvailability(unittest.TestCase):
def setUp(self):
Expand Down
68 changes: 64 additions & 4 deletions src/runtime/cmd/ctrl/ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ var barrierReq string

var rsyncStatus rsync.RsyncStatus

// Upload drain: allows SIGTERM handler to wait for in-progress uploads to complete
var uploading atomic.Bool
var uploadDone = make(chan struct{})

type PortForwardType string

const (
Expand Down Expand Up @@ -867,8 +871,25 @@ func sendLogs(logSource string, logQueue *common.CircularBuffer, logsPeriodMs in
for {
select {
case <-stopChan:
// Drain remaining logs before exiting
for {
bufferMutex.Lock()
logJson, err := logQueue.Peek()
if err != nil {
bufferMutex.Unlock()
break
}
err = messages.Put(webConn, logJson)
if err != nil {
log.Println("Failed to send log during drain:", err)
bufferMutex.Unlock()
break
}
logQueue.Pop()
bufferMutex.Unlock()
}
defer waitGoRoutines.Done()
log.Println("Goroutine sendLogs is done")
log.Println("Goroutine sendLogs is done, queue drained")
return
case <-ticker.C:
if data.WebsocketConnection.IsBroken {
Expand Down Expand Up @@ -1397,12 +1418,30 @@ func main() {
signal.Notify(sigintCatch, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigintCatch
log.Println("SIGTERM received, starting graceful shutdown...")
if uploading.Load() {
log.Println("Upload in progress, waiting for completion...")
select {
case <-uploadDone:
log.Println("Upload completed after SIGTERM")
case <-time.After(9 * time.Minute):
log.Println("Upload drain timeout exceeded")
}
}
// Flush log channels before exiting — os.Exit bypasses defers
stopPutLogs <- true
stopSendLogs <- true
waitGoRoutines.Wait()
cleanupMounts(cmdArgs.DownloadType)
os.Exit(1)
os.Exit(0)
}()

// Validate data auth access before starting downloads/uploads
if err := data.ValidateInputsOutputsAccess(
// Validate data auth access before starting downloads/uploads.
// Honor OSMO_SKIP_DATA_AUTH set by the service when
// credential_config.disable_data_validation includes "*" or "s3".
if os.Getenv("OSMO_SKIP_DATA_AUTH") == "1" {
osmoChan <- "Data validation skipped (OSMO_SKIP_DATA_AUTH=1)"
} else if err := data.ValidateInputsOutputsAccess(
cmdArgs.Inputs,
cmdArgs.Outputs,
cmdArgs.UserConfig,
Expand Down Expand Up @@ -1482,9 +1521,12 @@ execLogs:

// Send files to be uploaded
outputStartTime := time.Now().Format("2006-01-02 15:04:05.000")
uploading.Store(true)
uploadOutputs(unixConn, cmdArgs.Outputs, cmdArgs.OutputPath, cmdArgs.MetadataFile,
uploadChan, metricChan, cmdArgs.RetryId, cmdArgs.GroupName, cmdArgs.LogSource,
cmdArgs.UserConfig, cmdArgs.ServiceConfig, cmdArgs.ConfigLoc)
uploading.Store(false)
close(uploadDone)
Comment on lines +1524 to +1529

@coderabbitai coderabbitai Bot Mar 31, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, check file size and structure
wc -l src/runtime/cmd/ctrl/ctrl.go

Repository: NVIDIA/OSMO

Length of output: 88


🏁 Script executed:

# Read the declaration area (lines 74-76)
sed -n '70,85p' src/runtime/cmd/ctrl/ctrl.go

Repository: NVIDIA/OSMO

Length of output: 402


🏁 Script executed:

# Read the signal handler area (lines 1404-1412)
sed -n '1400,1420p' src/runtime/cmd/ctrl/ctrl.go

Repository: NVIDIA/OSMO

Length of output: 716


🏁 Script executed:

# Read the upload call area (lines 1498-1503 and surrounding context)
sed -n '1490,1510p' src/runtime/cmd/ctrl/ctrl.go

Repository: NVIDIA/OSMO

Length of output: 846


🏁 Script executed:

# Find the execLogs loop to understand when it exits
rg -n "execLogs" src/runtime/cmd/ctrl/ctrl.go -A 5 -B 5

Repository: NVIDIA/OSMO

Length of output: 922


🏁 Script executed:

# Search for tests of SIGTERM/signal handling
find . -name "*test*.go" -o -name "*_test.go" | xargs grep -l "SIGTERM\|sigintCatch\|uploading" 2>/dev/null | head -20

Repository: NVIDIA/OSMO

Length of output: 37


🏁 Script executed:

# Check if there are any other references to uploading flag
rg -n "uploading\." src/runtime/cmd/ctrl/ctrl.go

Repository: NVIDIA/OSMO

Length of output: 140


🏁 Script executed:

# Check the uploadOutputs function signature and blocking behavior
rg -n "func uploadOutputs" src/runtime/cmd/ctrl/ctrl.go -A 10

Repository: NVIDIA/OSMO

Length of output: 492


🏁 Script executed:

# Check if there's any initialization or prior setting of uploading before the loop
sed -n '1400,1500p' src/runtime/cmd/ctrl/ctrl.go | head -100

Repository: NVIDIA/OSMO

Length of output: 3345


Uploading flag must be set before the signal handler can bypass upload draining.

There is a genuine race condition at lines 1498–1502. The uploading flag is set only after the execLogs loop exits and before uploadOutputs() runs. In that window, SIGTERM can arrive, the signal handler checks uploading.Load() (which is still false), and exits immediately via cleanupMounts() and os.Exit(1) without waiting for uploads—causing the subsequent uploadOutputs() call never to execute.

In the native-sidecar lifecycle, the kubelet sends SIGTERM when the user container exits, which occurs precisely during this race window. The result is data loss: task outputs are skipped.

Set the uploading flag to true before entering the exec phase (or immediately before the signal handler setup if uploads can happen during exec), and clear it only after uploadOutputs() completes. Add a test that delivers SIGTERM right after an ExecFinished message to confirm the fix prevents the race.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/runtime/cmd/ctrl/ctrl.go` around lines 1498 - 1503, The uploading flag is
set too late causing a race where SIGTERM can bypass uploads; move the
uploading.Store(true) to before the exec phase (i.e., set it before entering the
execLogs/ExecFinished handling or immediately before the signal handler is
registered) and only call uploading.Store(false) after uploadOutputs(...)
completes; ensure the signal handler continues to check uploading.Load() so it
will wait for uploadOutputs to finish instead of calling
cleanupMounts()/os.Exit(1); add a unit/integration test that sends SIGTERM
immediately after an ExecFinished message to verify uploads complete and no data
is skipped.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We investigated this and believe the race is not practically exploitable in the native sidecar model.

With native sidecars (KEP-753), kubelet sends SIGTERM to the sidecar only after the user container has fully stopped. But ExecFinished arrives via the Unix socket before the user container process exits (it is the last message sent by the user process). So the execLogs loop exits and uploading.Store(true) is set well before kubelet initiates sidecar termination — osmo-ctrl gets a deterministic head start.

The window between break execLogs and uploading.Store(true) is a few Go instructions (nanoseconds), while kubelet's container stop → SIGTERM delivery path involves API calls and process teardown (milliseconds to seconds).

We opted not to move uploading.Store(true) earlier because setting it before the exec phase would cause the SIGTERM handler to wait for uploads even when the process is in the download or exec phase (where no upload is in progress and waiting would be incorrect).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KeitaW, thank you for the thorough explanation — the reasoning is sound.

The key insight is the lifecycle ordering guarantee from KEP-753: kubelet only delivers SIGTERM to the native sidecar after the user container's process has fully exited, and ExecFinished is sent over the Unix socket before that exit. The gap between break execLogs and uploading.Store(true) is therefore bounded by a few Go instructions, while the kubelet's container-stop-to-SIGTERM path (API call, cgroup accounting, etc.) takes orders of magnitude longer. Under that model the window is not realistically reachable.

The deliberate choice to keep uploading.Store(true) as late as possible also makes sense — hoisting it earlier would cause the drain logic to block on SIGTERM during the download or exec phase, where no upload is occurring and an indefinite wait would be incorrect.

I'll withdraw the concern for this design and note the intent for future reviews.


✏️ Learnings added
Learnt from: KeitaW
Repo: NVIDIA/OSMO PR: 765
File: src/runtime/cmd/ctrl/ctrl.go:1500-1505
Timestamp: 2026-03-31T17:46:05.985Z
Learning: In `src/runtime/cmd/ctrl/ctrl.go`, `uploading.Store(true)` is deliberately set immediately before `uploadOutputs()` (not before the exec phase). Under the Kubernetes native sidecar model (KEP-753, K8s >= 1.29), kubelet sends SIGTERM to osmo-ctrl only after the user container fully exits, which happens after `ExecFinished` is received and `uploading.Store(true)` is already set — so the apparent SIGTERM race window is practically unexploitable. Moving the flag earlier would incorrectly trigger upload-drain waiting during download or exec phases.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

outputEndTime := time.Now().Format("2006-01-02 15:04:05.000")
uploadTimes := metrics.GroupMetrics{
RetryId: cmdArgs.RetryId,
Expand All @@ -1493,6 +1535,24 @@ execLogs:
MetricType: "output_upload"}
metricChan <- uploadTimes

// Flush all remaining user logs directly before sending LogDone.
// sendLogs runs on a ticker and may not have drained the queue yet.
// We must flush here while the websocket is still open — after LogDone,
// the logger closes the connection and any remaining logs are lost.
bufferMutex.Lock()
for !logQueue.IsEmpty() {
logJson, err := logQueue.Peek()
if err != nil {
break
}
if err := messages.Put(webConn, logJson); err != nil {
log.Printf("Failed to flush log before LogDone: %v", err)
break
}
logQueue.Pop()
}
bufferMutex.Unlock()

logMsg := messages.CreateLog(cmdArgs.LogSource, "", messages.LogDone)
for !logsFinished {
threadsafeEnqueue(logQueue, logMsg)
Expand Down
1 change: 1 addition & 0 deletions src/runtime/cmd/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func main() {
}

if response.Type == messages.CtrlFailed {
log.Println("Received CtrlFailed from osmo-ctrl — task cannot proceed")
return
} else if response.Type == messages.ExecStart {
break
Expand Down
18 changes: 1 addition & 17 deletions src/service/core/config/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,7 @@
DEFAULT_POD_TEMPLATES : dict[str, dict] = {
'default_ctrl': {
'spec': {
'containers': [
{
'name': 'osmo-ctrl',
'resources': {
'limits': {
'cpu': '{{USER_CPU}}',
'memory': '{{USER_MEMORY}}',
'ephemeral-storage': '{{USER_STORAGE}}'
},
'requests': {
'cpu': '1',
'memory': '1Gi',
'ephemeral-storage': '1Gi'
}
}
}
]
'terminationGracePeriodSeconds': 600,
}
},
'default_user': {
Expand Down
15 changes: 8 additions & 7 deletions src/utils/connectors/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -2110,13 +2110,14 @@ def construct_updated_allocatables(

def check_osmo_data_resource(pod_template: Dict) -> ResourceLimitations:
resource_limits = ResourceLimitations()
containers = pod_template.get('spec', {}).get('containers', [])
if containers:
for container in containers:
if container.get('name', '') == 'osmo-ctrl':
if 'resources' in container:
resource_limits = ResourceLimitations(**container['resources'])
break
spec = pod_template.get('spec', {})
containers = list(spec.get('containers', []) or [])
containers.extend(spec.get('initContainers', []) or [])
for container in containers:
if container.get('name', '') == 'osmo-ctrl':
if 'resources' in container:
resource_limits = ResourceLimitations(**container['resources'])
break
return resource_limits

ctrl_usage = {}
Expand Down
13 changes: 12 additions & 1 deletion src/utils/job/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2756,6 +2756,15 @@ def _build_file_mount(file: File) -> kb_objects.FileMount:
ctrl_extra_args, workflow_config.backend_images.client, self.group_uuid, file_mounts,
task_spec.downloadType.value, task_spec.resources, user_cache_size)

# Propagate disable_data_validation to the ctrl sidecar so it skips
# osmo dataset check when the server-side config says validation is disabled.
disabled_data = workflow_config.credential_config.disable_data_validation
if disabled_data and ('*' in disabled_data or 's3' in disabled_data):
control_container_spec['env'].append({
'name': 'OSMO_SKIP_DATA_AUTH',
'value': '1',
})
Comment on lines +2759 to +2766

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't collapse disable_data_validation into a global skip flag.

Line 2762 turns a backend-scoped setting into an all-or-nothing env toggle. That gives you two bad cases: ['gs'] never propagates to osmo-ctrl, and ['s3'] disables validation for every input/output in ctrl.go, not just the S3 ones. Please pass the configured schemes through and let osmo-ctrl apply the skip per input/output instead of reducing the list to a single boolean.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/job/task.py` around lines 2759 - 2766, The current code collapses
workflow_config.credential_config.disable_data_validation into a global boolean
and sets OSMO_SKIP_DATA_AUTH, which loses per-scheme info; instead, serialize
the actual disable_data_validation list into an env var (e.g.,
OSMO_SKIP_DATA_SCHEMES) on control_container_spec['env'] when
disable_data_validation is present (preserving '*' if supplied) so osmo-ctrl can
decide per input/output; update osmo-ctrl/ctrl.go to read that env var
(comma-separated or wildcard) and apply skipping per-scheme rather than relying
on a single boolean flag.


using_gpu = bool(task_spec.resources.gpu and task_spec.resources.gpu > 0)
user_args += [
'-socketPath', f'{kb_objects.DATA_LOCATION}/socket/data.sock',
Expand Down Expand Up @@ -2805,14 +2814,16 @@ def _build_file_mount(file: File) -> kb_objects.FileMount:
'restartPolicy': 'Never',
'imagePullSecrets': image_pull_secrets,
'hostNetwork': task_spec.hostNetwork,
'containers': [user_container_spec, control_container_spec],
'containers': [user_container_spec],
'initContainers': [
k8s_factory.create_init_container(
login_file_mount.volume_mount(),
user_config_file_mount.volume_mount(),
init_extra_args,
),
{**control_container_spec, 'restartPolicy': 'Always'},
],
Comment thread
coderabbitai[bot] marked this conversation as resolved.
'terminationGracePeriodSeconds': 600,
'volumes': [
{'name': 'osmo'},
{'name': 'osmo-data'},
Expand Down