Skip to content

fix transient job type param#836

Merged
Andyz26 merged 1 commit intomasterfrom
andyz/fixTransientJobType
Mar 24, 2026
Merged

fix transient job type param#836
Andyz26 merged 1 commit intomasterfrom
andyz/fixTransientJobType

Conversation

@Andyz26
Copy link
Copy Markdown
Collaborator

@Andyz26 Andyz26 commented Mar 24, 2026

Context

A Mantis worker with Subscription-Timeout: 30s did not shut down after losing all SSE sink connections. Investigation revealed two chained bugs:

  1. The job's durationType: Transient (from job SLA) is lost during scheduling and hardcoded to Perpetual
  2. Perpetual jobs create an eager subscription that prevents the subscription timeout from ever triggering

Summary

Workers configured with Subscription-Timeout don't shut down after losing SSE sink connections because durationType is hardcoded to Perpetual in the scheduling path, regardless of job SLA configuration.

One Chained Bugs

Bug 1 (scheduling): TaskExecutorAllocationRequest.of() hardcodes MantisJobDurationType.Perpetual (line 42), ignoring the job's actual durationType. When jobs are scheduled through the ExecutorStateManagerActorAssignmentHandlerActor path, the ExecuteStageRequestFactory.of(TaskExecutorRegistration, TaskExecutorAllocationRequest) overload reads this hardcoded value. The TODO on line 34 acknowledges this: //TODO: update job actor to fill these fields.

Scheduling path:

JobActor (durationType=Transient from SLA)
  → ScheduleRequest (durationType=Transient ✓)
    → ResourceClusterAwareSchedulerActor
      → TaskExecutorAllocationRequest.of(...) ← hardcodes Perpetual ✗
        → ExecutorStateManagerActor
          → AssignmentHandlerActor
            → executeStageRequestFactory.of(registration, allocationRequest)
              → ExecuteStageRequest (durationType=Perpetual ✗)
                → Worker runs as Perpetual

Not a Bug: (worker-side consequence):
[This behavior is by design]
For Perpetual jobs, SinkPublisher creates an eager subscription on a .share()'d Observable:

Observable o = Observable.create(...)
    .doOnUnsubscribe(() -> onSinkUnsubscribed())  // never fires
    .share();  // = publish().refCount()

eagerSubscription = o.subscribe();  // permanent subscriber, refCount never reaches 0

When SSE clients disconnect, ObservableTrigger.ssetrigger.doOnStop unsubscribes from o, but the eager subscription keeps refCount >= 1. The doOnUnsubscribe callback (which would notify SubscriptionStateHandlerImpl) never fires. The handler stays in subscribed=true state and never triggers the kill.

Log Evidence

The log shows durationType: Transient in the API but Perpetual at runtime:

  • API: "sla": { "durationType": "Transient", ... }
  • Worker log: "eagerSubscription subscribed for Perpetual job."

Key Files

File Role
TaskExecutorAllocationRequest.java Hardcodes durationType=Perpetual in of() factory
AssignmentHandlerActor.java:109 Uses executeStageRequestFactory.of(registration, allocationRequest) with hardcoded Perpetual
ExecuteStageRequestFactory.java Has two of() overloads; the TaskExecutorAllocationRequest one reads the wrong durationType
SinkPublisher.java:89-113 Eager subscription on .share() prevents doOnUnsubscribe from firing
SubscriptionStateHandlerImpl.java 1-second periodic check for subscription timeout; never transitions to unsubscribed
ObservableTrigger.java:148-153 ssetrigger.doOnStop correctly unsubscribes, but eager sub keeps refCount > 0

Changes

1. TaskExecutorAllocationRequest.java
Path: mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/master/resourcecluster/TaskExecutorAllocationRequest.java

Add durationType parameter to the of() factory method:

2. ResourceClusterAwareSchedulerActor.java
Path: mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java

3. Update tests
Path: mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/AssignmentHandlerActorTest.java

  • Update any TaskExecutorAllocationRequest.of() calls to pass durationType
  • Add assertion verifying durationType propagates to ExecuteStageRequest

Path: mantis-control-plane/mantis-control-plane-server/src/test/java/io/mantisrx/master/resourcecluster/ResourceClusterActorClusterUsageAkkaTest.java

  • Update TaskExecutorAllocationRequest.of() calls

@github-actions
Copy link
Copy Markdown

Test Results

162 files  ±0  162 suites  ±0   10m 18s ⏱️ -31s
777 tests +1  766 ✅ +2  11 💤 ±0  0 ❌  - 1 
777 runs  ±0  766 ✅ +1  11 💤 ±0  0 ❌  - 1 

Results for commit 3e61512. ± Comparison against base commit 5bc54bf.

@Andyz26 Andyz26 merged commit 2a4fb13 into master Mar 24, 2026
3 checks passed
@Andyz26 Andyz26 deleted the andyz/fixTransientJobType branch March 24, 2026 22:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants