Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:

- name: Start containerized server and dependencies
env:
TEMPORAL_CLI_VERSION: 1.4.1-cloud-v1-29-0-139-2.0
TEMPORAL_CLI_VERSION: 1.6.1-server-1.31.0-150.0
run: |
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
tar -xzf temporal_cli.tar.gz
Expand All @@ -94,21 +94,13 @@ jobs:
--search-attribute CustomBoolField=Bool \
--dynamic-config-value system.forceSearchAttributesCacheRefreshOnRead=true \
--dynamic-config-value system.enableActivityEagerExecution=true \
--dynamic-config-value system.enableEagerWorkflowStart=true \
--dynamic-config-value system.enableExecuteMultiOperation=true \
--dynamic-config-value frontend.enableUpdateWorkflowExecutionAsyncAccepted=true \
--dynamic-config-value history.MaxBufferedQueryCount=100000 \
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
--dynamic-config-value worker.buildIdScavengerEnabled=true \
--dynamic-config-value frontend.workerVersioningRuleAPIs=true \
--dynamic-config-value worker.removableBuildIdDurationSinceDefault=true \
--dynamic-config-value matching.useNewMatcher=true \
--dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
--dynamic-config-value component.nexusoperations.recordCancelRequestCompletionEvents=true \
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true \
--dynamic-config-value frontend.activityAPIsEnabled=true \
--dynamic-config-value system.enableDeploymentVersions=true \
--dynamic-config-value history.enableRequestIdRefLinks=true &
sleep 10s

Expand Down Expand Up @@ -199,4 +191,4 @@ jobs:
name: Build native test server
uses: ./.github/workflows/build-native-image.yml
with:
ref: ${{ github.event.pull_request.head.sha }}
ref: ${{ github.event.pull_request.head.sha }}
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
input.getHeaders().forEach((k, v) -> attributes.putNexusHeader(k.toLowerCase(), v));
attributes.setScheduleToCloseTimeout(
ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToCloseTimeout()));
attributes.setScheduleToStartTimeout(
ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToStartTimeout()));
attributes.setStartToCloseTimeout(
ProtobufTimeUtils.toProtoDuration(input.getOptions().getStartToCloseTimeout()));

@Nullable
UserMetadata userMetadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public static NexusOperationOptions getDefaultInstance() {

public static final class Builder {
private Duration scheduleToCloseTimeout;
private Duration scheduleToStartTimeout;
private Duration startToCloseTimeout;
private NexusOperationCancellationType cancellationType;
private String summary;

Expand All @@ -46,6 +48,45 @@ public NexusOperationOptions.Builder setScheduleToCloseTimeout(
return this;
}

/**
* Sets the schedule to start timeout for the Nexus operation.
*
* <p>Maximum time to wait for the operation to be started (or completed if synchronous) by the
* handler. If the operation is not started within this timeout, it will fail with
* TIMEOUT_TYPE_SCHEDULE_TO_START.
*
* <p>Requires Temporal Server 1.31.0 or later.
*
* @param scheduleToStartTimeout the schedule to start timeout for the Nexus operation
* @return this
*/
@Experimental
public NexusOperationOptions.Builder setScheduleToStartTimeout(
Duration scheduleToStartTimeout) {
this.scheduleToStartTimeout = scheduleToStartTimeout;
return this;
}

/**
* Sets the start to close timeout for the Nexus operation.
*
* <p>Maximum time to wait for an asynchronous operation to complete after it has been started.
* If the operation does not complete within this timeout after starting, it will fail with
* TIMEOUT_TYPE_START_TO_CLOSE.
*
* <p>Only applies to asynchronous operations. Synchronous operations ignore this timeout.
*
* <p>Requires Temporal Server 1.31.0 or later.
*
* @param startToCloseTimeout the start to close timeout for the Nexus operation
* @return this
*/
@Experimental
public NexusOperationOptions.Builder setStartToCloseTimeout(Duration startToCloseTimeout) {
this.startToCloseTimeout = startToCloseTimeout;
return this;
}

/**
* Sets the cancellation type for the Nexus operation. Defaults to WAIT_COMPLETED.
*
Expand Down Expand Up @@ -78,12 +119,19 @@ private Builder(NexusOperationOptions options) {
return;
}
this.scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
this.scheduleToStartTimeout = options.getScheduleToStartTimeout();
this.startToCloseTimeout = options.getStartToCloseTimeout();
this.cancellationType = options.getCancellationType();
this.summary = options.getSummary();
}

public NexusOperationOptions build() {
return new NexusOperationOptions(scheduleToCloseTimeout, cancellationType, summary);
return new NexusOperationOptions(
scheduleToCloseTimeout,
scheduleToStartTimeout,
startToCloseTimeout,
cancellationType,
summary);
}

public NexusOperationOptions.Builder mergeNexusOperationOptions(
Expand All @@ -95,6 +143,14 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions(
(override.scheduleToCloseTimeout == null)
? this.scheduleToCloseTimeout
: override.scheduleToCloseTimeout;
this.scheduleToStartTimeout =
(override.scheduleToStartTimeout == null)
? this.scheduleToStartTimeout
: override.scheduleToStartTimeout;
this.startToCloseTimeout =
(override.startToCloseTimeout == null)
? this.startToCloseTimeout
: override.startToCloseTimeout;
this.cancellationType =
(override.cancellationType == null) ? this.cancellationType : override.cancellationType;
this.summary = (override.summary == null) ? this.summary : override.summary;
Expand All @@ -104,9 +160,13 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions(

private NexusOperationOptions(
Duration scheduleToCloseTimeout,
Duration scheduleToStartTimeout,
Duration startToCloseTimeout,
NexusOperationCancellationType cancellationType,
String summary) {
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
this.scheduleToStartTimeout = scheduleToStartTimeout;
this.startToCloseTimeout = startToCloseTimeout;
this.cancellationType = cancellationType;
this.summary = summary;
}
Expand All @@ -116,13 +176,25 @@ public NexusOperationOptions.Builder toBuilder() {
}

private final Duration scheduleToCloseTimeout;
private final Duration scheduleToStartTimeout;
private final Duration startToCloseTimeout;
private final NexusOperationCancellationType cancellationType;
private final String summary;

public Duration getScheduleToCloseTimeout() {
return scheduleToCloseTimeout;
}

@Experimental
public Duration getScheduleToStartTimeout() {
return scheduleToStartTimeout;
}

@Experimental
public Duration getStartToCloseTimeout() {
return startToCloseTimeout;
}

public NexusOperationCancellationType getCancellationType() {
return cancellationType;
}
Expand All @@ -138,20 +210,31 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
NexusOperationOptions that = (NexusOperationOptions) o;
return Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout)
&& Objects.equals(scheduleToStartTimeout, that.scheduleToStartTimeout)
&& Objects.equals(startToCloseTimeout, that.startToCloseTimeout)
&& Objects.equals(cancellationType, that.cancellationType)
&& Objects.equals(summary, that.summary);
}

@Override
public int hashCode() {
return Objects.hash(scheduleToCloseTimeout, cancellationType, summary);
return Objects.hash(
scheduleToCloseTimeout,
scheduleToStartTimeout,
startToCloseTimeout,
cancellationType,
summary);
}

@Override
public String toString() {
return "NexusOperationOptions{"
+ "scheduleToCloseTimeout="
+ scheduleToCloseTimeout
+ ", scheduleToStartTimeout="
+ scheduleToStartTimeout
+ ", startToCloseTimeout="
+ startToCloseTimeout
+ ", cancellationType="
+ cancellationType
+ ", summary='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class HeaderTest {
public void testOperationHeaders() {
TestWorkflow workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow.class);
Map<String, String> headers = workflowStub.execute(testWorkflowRule.getTaskQueue());
// Operation-timeout is set because the schedule-to-close timeout is capped by workflow run
// timeout, which is set by
// default for tests.
Assert.assertTrue(headers.containsKey("operation-timeout"));
Assert.assertTrue(headers.containsKey("request-timeout"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ public void failWhenUpdateNamesDoNotMatch() {
}
}

@SuppressWarnings(
"deprecation") // Backwards compatibility for WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
@Test
public void failServerSideWhenStartIsInvalid() {
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,22 +652,60 @@ private static void scheduleNexusOperation(
NexusOperationData data,
ScheduleNexusOperationCommandAttributes attr,
long workflowTaskCompletedId) {
Duration expirationInterval = attr.getScheduleToCloseTimeout();
// Cap scheduleToCloseTimeout to workflow run timeout.
com.google.protobuf.Duration workflowRunTimeoutProto =
ctx.getWorkflowMutableState().getStartRequest().getWorkflowRunTimeout();
java.time.Duration workflowRunTimeout =
ProtobufTimeUtils.toJavaDuration(workflowRunTimeoutProto);
java.time.Duration scheduleToCloseTimeout =
ProtobufTimeUtils.toJavaDuration(attr.getScheduleToCloseTimeout());

com.google.protobuf.Duration cappedScheduleToCloseTimeout = attr.getScheduleToCloseTimeout();
if (!workflowRunTimeout.isZero()
&& (scheduleToCloseTimeout.isZero()
|| scheduleToCloseTimeout.compareTo(workflowRunTimeout) > 0)) {
cappedScheduleToCloseTimeout = workflowRunTimeoutProto;
scheduleToCloseTimeout = workflowRunTimeout;
}

Duration expirationInterval = cappedScheduleToCloseTimeout;
Timestamp expirationTime =
(attr.hasScheduleToCloseTimeout()
&& Durations.toMillis(attr.getScheduleToCloseTimeout()) > 0)
!scheduleToCloseTimeout.isZero()
? Timestamps.add(ctx.currentTime(), expirationInterval)
: Timestamp.getDefaultInstance();
TestServiceRetryState retryState = new TestServiceRetryState(data.retryPolicy, expirationTime);

// Trim secondary timeouts to the primary timeout (scheduleToClose).
java.time.Duration scheduleToStartTimeout =
ProtobufTimeUtils.toJavaDuration(attr.getScheduleToStartTimeout());
java.time.Duration startToCloseTimeout =
ProtobufTimeUtils.toJavaDuration(attr.getStartToCloseTimeout());

com.google.protobuf.Duration cappedScheduleToStartTimeout = attr.getScheduleToStartTimeout();
com.google.protobuf.Duration cappedStartToCloseTimeout = attr.getStartToCloseTimeout();

if (!scheduleToCloseTimeout.isZero()
&& !scheduleToStartTimeout.isZero()
&& scheduleToStartTimeout.compareTo(scheduleToCloseTimeout) > 0) {
cappedScheduleToStartTimeout = cappedScheduleToCloseTimeout;
}

if (!scheduleToCloseTimeout.isZero()
&& !startToCloseTimeout.isZero()
&& startToCloseTimeout.compareTo(scheduleToCloseTimeout) > 0) {
cappedStartToCloseTimeout = cappedScheduleToCloseTimeout;
}

NexusOperationScheduledEventAttributes.Builder a =
NexusOperationScheduledEventAttributes.newBuilder()
.setEndpoint(attr.getEndpoint())
.setEndpointId(data.endpoint.getId())
.setService(attr.getService())
.setOperation(attr.getOperation())
.setInput(attr.getInput())
.setScheduleToCloseTimeout(attr.getScheduleToCloseTimeout())
.setScheduleToCloseTimeout(cappedScheduleToCloseTimeout)
.setScheduleToStartTimeout(cappedScheduleToStartTimeout)
.setStartToCloseTimeout(cappedStartToCloseTimeout)
.putAllNexusHeader(attr.getNexusHeaderMap())
.setRequestId(UUID.randomUUID().toString())
.setWorkflowTaskCompletedEventId(workflowTaskCompletedId);
Expand Down Expand Up @@ -704,9 +742,6 @@ private static void scheduleNexusOperation(
io.temporal.api.nexus.v1.Request.newBuilder()
.setScheduledTime(ctx.currentTime())
.putAllHeader(attr.getNexusHeaderMap())
.putHeader(
io.nexusrpc.Header.OPERATION_TIMEOUT.toLowerCase(),
attr.getScheduleToCloseTimeout().toString())
.setStartOperation(
StartOperationRequest.newBuilder()
.setService(attr.getService())
Expand Down Expand Up @@ -778,7 +813,9 @@ private static void completeNexusOperation(

private static void timeoutNexusOperation(
RequestContext ctx, NexusOperationData data, TimeoutType timeoutType, long notUsed) {
if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) {
if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE
&& timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START
&& timeoutType != TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE) {
throw new IllegalArgumentException(
"Timeout type not supported for Nexus operations: " + timeoutType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ void completeAsyncNexusOperation(

boolean validateOperationTaskToken(NexusTaskToken tt);

@Nullable
NexusOperationScheduledEventAttributes getNexusOperationScheduledEventAttributes(
long scheduledEventId);

boolean isNexusOperationStarted(long scheduledEventId);

QueryWorkflowResponse query(QueryWorkflowRequest queryRequest, long deadline);

TestWorkflowMutableStateImpl.UpdateHandle updateWorkflowExecution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,18 @@ private void processScheduleNexusOperation(
operation.getData().getAttempt()),
"NexusOperation ScheduleToCloseTimeout");
}
if (attr.hasScheduleToStartTimeout()
&& Durations.toMillis(attr.getScheduleToStartTimeout()) > 0) {
// ScheduleToStartTimeout is the time from schedule to start (or completion if synchronous)
ctx.addTimer(
ProtobufTimeUtils.toJavaDuration(attr.getScheduleToStartTimeout()),
() ->
timeoutNexusOperation(
scheduleEventId,
TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START,
operation.getData().getAttempt()),
"NexusOperation ScheduleToStartTimeout");
}
ctx.lockTimer("processScheduleNexusOperation");
}

Expand Down Expand Up @@ -2309,6 +2321,23 @@ public void startNexusOperation(
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
operation.action(StateMachines.Action.START, ctx, resp, 0);
operation.getData().identity = clientIdentity;

// Add start-to-close timeout timer if configured
NexusOperationScheduledEventAttributes scheduledEvent =
operation.getData().scheduledEvent;
if (scheduledEvent.hasStartToCloseTimeout()
&& Durations.toMillis(scheduledEvent.getStartToCloseTimeout()) > 0) {
// StartToCloseTimeout measures from when the operation started to when it completes
ctx.addTimer(
ProtobufTimeUtils.toJavaDuration(scheduledEvent.getStartToCloseTimeout()),
() ->
timeoutNexusOperation(
scheduledEventId,
TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE,
operation.getData().getAttempt()),
"NexusOperation StartToCloseTimeout");
}

scheduleWorkflowTask(ctx);
});
}
Expand Down Expand Up @@ -3691,6 +3720,20 @@ public boolean validateOperationTaskToken(NexusTaskToken tt) {
return true;
}

@Override
public NexusOperationScheduledEventAttributes getNexusOperationScheduledEventAttributes(
long scheduledEventId) {
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
return operation.getData().scheduledEvent;
}

@Override
public boolean isNexusOperationStarted(long scheduledEventId) {
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
// Operation is considered started if it has an operation token
return !operation.getData().operationToken.isEmpty();
}

private boolean isTerminalState(State workflowState) {
return workflowState == State.COMPLETED
|| workflowState == State.TIMED_OUT
Expand Down
Loading
Loading