Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,22 @@ public class TaskExecutorAllocationRequest {
JobMetadata jobMetadata;
int stageNum;
long readyAt;
MantisJobDurationType durationType; //TODO: update job actor to fill these fields.
MantisJobDurationType durationType;

// Static factory method with defaults for backward compatibility
public static TaskExecutorAllocationRequest of(
WorkerId workerId,
SchedulingConstraints constraints,
JobMetadata jobMetadata,
int stageNum,
MantisJobDurationType durationType) {
return new TaskExecutorAllocationRequest(workerId, constraints, jobMetadata, stageNum, -1L, durationType);
}

/**
* @deprecated Use {@link #of(WorkerId, SchedulingConstraints, JobMetadata, int, MantisJobDurationType)} instead.
* This overload hardcodes durationType to Perpetual, which causes subscription timeout to not fire for Transient jobs.
*/
@Deprecated
public static TaskExecutorAllocationRequest of(
WorkerId workerId,
SchedulingConstraints constraints,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private void onScheduleRequestEvent(ScheduleRequestEvent event) {
resourceCluster
.getTaskExecutorsFor(
Collections.singleton(TaskExecutorAllocationRequest.of(
event.getRequest().getWorkerId(), event.getRequest().getSchedulingConstraints(), event.getRequest().getJobMetadata(), event.getRequest().getStageNum())))
event.getRequest().getWorkerId(), event.getRequest().getSchedulingConstraints(), event.getRequest().getJobMetadata(), event.getRequest().getStageNum(), event.getRequest().getDurationType())))
.<Object>thenApply(allocation -> event.onAssignment(allocation.values().stream().findFirst().get()))
.exceptionally(event::onFailure);

Expand Down Expand Up @@ -404,7 +404,7 @@ static class BatchScheduleRequestEvent {
this.allocationRequestScheduleRequestMap = request
.getScheduleRequests()
.stream()
.map(req -> Pair.of(req, TaskExecutorAllocationRequest.of(req.getWorkerId(), req.getSchedulingConstraints(), req.getJobMetadata(), req.getStageNum())))
.map(req -> Pair.of(req, TaskExecutorAllocationRequest.of(req.getWorkerId(), req.getSchedulingConstraints(), req.getJobMetadata(), req.getStageNum(), req.getDurationType())))
.collect(Collectors.toMap(Pair::getRight, Pair::getLeft));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.mantisrx.master.resourcecluster.AssignmentHandlerActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
import io.mantisrx.runtime.MachineDefinition;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.domain.WorkerId;
import io.mantisrx.server.core.scheduler.SchedulingConstraints;
Expand All @@ -52,6 +53,7 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

public class AssignmentHandlerActorTest {
private static ActorSystem actorSystem;
Expand Down Expand Up @@ -101,7 +103,8 @@ private TaskExecutorAllocationRequest createAllocationRequest() {
workerId,
SchedulingConstraints.of(new MachineDefinition(1.0, 1024, 1024, 1024, 1)),
null,
0
0,
MantisJobDurationType.Transient
);
}

Expand Down Expand Up @@ -134,6 +137,41 @@ public void testAssignmentSuccess() {
verify(taskExecutorGateway, times(1)).submitTask(any());
}

@Test
public void testDurationTypePropagatedToExecuteStageRequest() {
TestKit probe = new TestKit(actorSystem);
Props props = AssignmentHandlerActor.props(
clusterID,
jobMessageRouter,
Duration.ofSeconds(1),
executeStageRequestFactory
);
ActorRef parent = actorSystem.actorOf(Props.create(ForwarderParent.class, props, probe.getRef(), taskExecutorGateway));
ActorRef actor = probe.expectMsgClass(ActorRef.class);

when(taskExecutorGateway.submitTask(any())).thenReturn(CompletableFuture.completedFuture(Ack.getInstance()));

TaskExecutorAllocationRequest allocationRequest = createAllocationRequest();
assertEquals(MantisJobDurationType.Transient, allocationRequest.getDurationType());

TaskExecutorAssignmentRequest request = TaskExecutorAssignmentRequest.of(
allocationRequest,
taskExecutorID,
createRegistration(),
CompletableFuture.completedFuture(taskExecutorGateway)
);

actor.tell(request, probe.getRef());
probe.expectNoMessage(Duration.ofMillis(200));

// Verify that executeStageRequestFactory.of() was called with the allocation request
// that has the correct durationType (Transient, not Perpetual)
ArgumentCaptor<TaskExecutorAllocationRequest> allocationCaptor =
ArgumentCaptor.forClass(TaskExecutorAllocationRequest.class);
verify(executeStageRequestFactory).of(any(TaskExecutorRegistration.class), allocationCaptor.capture());
assertEquals(MantisJobDurationType.Transient, allocationCaptor.getValue().getDurationType());
}

@Test
public void testAssignmentRetryAndSuccess() {
TestKit probe = new TestKit(actorSystem);
Expand Down
Loading