diff --git a/build.gradle b/build.gradle index 1ab9ed1ab9..fbbdeffe8c 100644 --- a/build.gradle +++ b/build.gradle @@ -22,6 +22,7 @@ plugins { allprojects { repositories { + mavenLocal() mavenCentral() } } @@ -30,7 +31,7 @@ ext { // Platforms grpcVersion = '1.75.0' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager jacksonVersion = '2.15.4' // [2.9.0,) - nexusVersion = '0.4.0-alpha' + nexusVersion = '0.5.0-SNAPSHOT' // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though. micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,) diff --git a/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java b/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java index 55e190a491..2a16588698 100644 --- a/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java @@ -15,6 +15,7 @@ import io.temporal.common.converter.FailureConverter; import io.temporal.internal.activity.ActivityTaskHandlerImpl; import io.temporal.internal.common.FailureUtils; +import io.temporal.internal.common.NexusUtil; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.sync.POJOWorkflowImplementationFactory; import io.temporal.serviceclient.CheckedExceptionWrapper; @@ -192,7 +193,18 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d retryBehavior = HandlerException.RetryBehavior.NON_RETRYABLE; break; } - return new HandlerException(info.getType(), cause, retryBehavior); + if (failure + .getMessage() + .startsWith(String.format("handler error (%s)", info.getType()))) { + return new HandlerException(info.getType(), cause, retryBehavior); + } else { + return new HandlerException( + info.getType(), + failure.getMessage(), + cause, + retryBehavior, + NexusUtil.temporalFailureToNexusFailureInfo(failure)); + } } case FAILUREINFO_NOT_SET: default: @@ -324,6 +336,9 @@ private Failure exceptionToFailure(Throwable throwable) { failure.setNexusOperationExecutionFailureInfo(op); } else if (throwable instanceof HandlerException) { HandlerException he = (HandlerException) throwable; + if (he.getOriginalFailure() != null) { + return NexusUtil.nexusFailureToAPIFailure(he.getOriginalFailure(), true); + } NexusHandlerErrorRetryBehavior retryBehavior = NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED; switch (he.getRetryBehavior()) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java b/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java index 88d8e1d03b..bbc3cf3412 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java @@ -1,10 +1,18 @@ package io.temporal.internal.common; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; +import io.nexusrpc.FailureInfo; import io.nexusrpc.Link; +import io.nexusrpc.handler.HandlerException; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior; import io.temporal.api.nexus.v1.Failure; +import io.temporal.api.nexus.v1.HandlerError; import io.temporal.common.converter.DataConverter; import java.net.URI; import java.net.URISyntaxException; @@ -13,7 +21,8 @@ import java.util.Map; public class NexusUtil { - private static final JsonFormat.Printer JSON_PRINTER = + private static final ObjectWriter JSON_OBJECT_WRITER = new ObjectMapper().writer(); + private static final JsonFormat.Printer PROTO_JSON_PRINTER = JsonFormat.printer().omittingInsignificantWhitespace(); private static final String TEMPORAL_FAILURE_TYPE_STRING = io.temporal.api.failure.v1.Failure.getDescriptor().getFullName(); @@ -47,23 +56,134 @@ public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink) .build(); } - public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) { - io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception); + public static Failure temporalFailureToNexusFailure( + io.temporal.api.failure.v1.Failure temporalFailure) { String details; try { - details = JSON_PRINTER.print(failure.toBuilder().setMessage("").build()); + details = PROTO_JSON_PRINTER.print(temporalFailure.toBuilder().setMessage("").build()); } catch (InvalidProtocolBufferException e) { return Failure.newBuilder() .setMessage("Failed to serialize failure details") .setDetails(ByteString.copyFromUtf8(e.getMessage())) .build(); } - return Failure.newBuilder() - .setMessage(failure.getMessage()) - .setDetails(ByteString.copyFromUtf8(details)) - .putAllMetadata(NEXUS_FAILURE_METADATA) + Failure.Builder failureBuilder = + Failure.newBuilder() + .setMessage(temporalFailure.getMessage()) + .setDetails(ByteString.copyFromUtf8(details)) + .putAllMetadata(NEXUS_FAILURE_METADATA); + if (!temporalFailure.getStackTrace().isEmpty()) { + failureBuilder.setStackTrace(temporalFailure.getStackTrace()); + } + return failureBuilder.build(); + } + + public static io.temporal.api.failure.v1.Failure nexusFailureToAPIFailure( + FailureInfo failureInfo, boolean retryable) { + io.temporal.api.failure.v1.Failure.Builder apiFailure = + io.temporal.api.failure.v1.Failure.newBuilder(); + + if (failureInfo.getMetadata().containsKey("type") + && failureInfo.getMetadata().get("type").equals(TEMPORAL_FAILURE_TYPE_STRING)) { + // Details contains a JSON-serialized Temporal failure + try { + JsonFormat.parser().ignoringUnknownFields().merge(failureInfo.getDetailsJson(), apiFailure); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } else { + // Create an ApplicationFailure with the Nexus failure data + io.temporal.api.common.v1.Payloads payloads = nexusFailureMetadataToPayloads(failureInfo); + io.temporal.api.failure.v1.ApplicationFailureInfo.Builder appFailureInfo = + io.temporal.api.failure.v1.ApplicationFailureInfo.newBuilder() + .setType("NexusFailure") + .setNonRetryable(!retryable); + if (payloads != null) { + appFailureInfo.setDetails(payloads); + } + apiFailure.setApplicationFailureInfo(appFailureInfo.build()); + } + + // Ensure these always get written + apiFailure.setMessage(failureInfo.getMessage()); + + return apiFailure.build(); + } + + private static io.temporal.api.common.v1.Payloads nexusFailureMetadataToPayloads( + FailureInfo failureInfo) { + if (failureInfo.getMetadata().isEmpty() && failureInfo.getDetailsJson().isEmpty()) { + return null; + } + + // Create a copy without the message before serializing + FailureInfo failureCopy = FailureInfo.newBuilder(failureInfo).setMessage("").build(); + String json = null; + try { + json = JSON_OBJECT_WRITER.writeValueAsString(failureCopy); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + return io.temporal.api.common.v1.Payloads.newBuilder() + .addPayloads( + Payload.newBuilder() + .putMetadata("encoding", ByteString.copyFromUtf8("json/plain")) + .setData(ByteString.copyFromUtf8(json)) + .build()) .build(); } + public static FailureInfo temporalFailureToNexusFailureInfo( + io.temporal.api.failure.v1.Failure temporalFailure) { + String details; + try { + details = PROTO_JSON_PRINTER.print(temporalFailure.toBuilder().setMessage("").build()); + } catch (InvalidProtocolBufferException e) { + return FailureInfo.newBuilder() + .setMessage("Failed to serialize failure details") + .setDetailsJson(e.getMessage()) + .build(); + } + return FailureInfo.newBuilder() + .setMessage(temporalFailure.getMessage()) + .setDetailsJson(details) + .putMetadata("type", TEMPORAL_FAILURE_TYPE_STRING) + .build(); + } + + public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) { + io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception); + return temporalFailureToNexusFailure(failure); + } + + public static HandlerError handlerErrorToNexusError( + HandlerException e, DataConverter dataConverter) { + HandlerError.Builder handlerError = + HandlerError.newBuilder() + .setErrorType(e.getErrorType().toString()) + .setRetryBehavior(mapRetryBehavior(e.getRetryBehavior())); + // TODO: check if this works on old server + if (e.getCause() != null) { + handlerError.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter)); + } else if (e.getMessage() != null && !e.getMessage().isEmpty()) { + // Include message even when there's no cause + handlerError.setFailure(Failure.newBuilder().setMessage(e.getMessage()).build()); + } + return handlerError.build(); + } + + private static NexusHandlerErrorRetryBehavior mapRetryBehavior( + HandlerException.RetryBehavior retryBehavior) { + switch (retryBehavior) { + case RETRYABLE: + return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE; + case NON_RETRYABLE: + return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE; + default: + return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED; + } + } + private NexusUtil() {} } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index 4b28f6bd78..5494b55553 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -1,22 +1,24 @@ package io.temporal.internal.nexus; -import static io.temporal.internal.common.NexusUtil.exceptionToNexusFailure; import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink; import com.uber.m3.tally.Scope; import io.grpc.StatusRuntimeException; import io.nexusrpc.Header; import io.nexusrpc.OperationException; +import io.nexusrpc.OperationState; import io.nexusrpc.handler.*; import io.temporal.api.common.v1.Payload; -import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior; import io.temporal.api.nexus.v1.*; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowException; import io.temporal.client.WorkflowNotFoundException; import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.EncodedValues; import io.temporal.common.interceptors.WorkerInterceptor; import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.CanceledFailure; +import io.temporal.failure.TemporalFailure; import io.temporal.internal.common.InternalUtils; import io.temporal.internal.common.NexusUtil; import io.temporal.internal.worker.NexusTask; @@ -78,9 +80,6 @@ public boolean start() { public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException { Request request = task.getResponse().getRequest(); Map headers = request.getHeaderMap(); - if (headers == null) { - headers = Collections.emptyMap(); - } OperationContext.Builder ctx = OperationContext.newBuilder(); headers.forEach(ctx::putHeader); @@ -129,18 +128,9 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException new RuntimeException("Unknown request type: " + request.getVariantCase())); } } catch (HandlerException e) { - return new Result( - HandlerError.newBuilder() - .setErrorType(e.getErrorType().toString()) - .setFailure(exceptionToNexusFailure(e.getCause(), dataConverter)) - .setRetryBehavior(mapRetryBehavior(e.getRetryBehavior())) - .build()); + return new Result(e); } catch (Throwable e) { - return new Result( - HandlerError.newBuilder() - .setErrorType(HandlerException.ErrorType.INTERNAL.toString()) - .setFailure(exceptionToNexusFailure(e, dataConverter)) - .build()); + return new Result(new HandlerException(HandlerException.ErrorType.INTERNAL, e)); } finally { // If the task timed out, we should not send a response back to the server if (timedOut.get()) { @@ -154,18 +144,6 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException } } - private NexusHandlerErrorRetryBehavior mapRetryBehavior( - HandlerException.RetryBehavior retryBehavior) { - switch (retryBehavior) { - case RETRYABLE: - return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE; - case NON_RETRYABLE: - return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE; - default: - return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED; - } - } - private void cancelOperation(OperationContext context, OperationCancelDetails details) { try { serviceHandler.cancelOperation(context, details); @@ -346,11 +324,21 @@ private StartOperationResponse handleStartOperation( convertKnownFailures(failure); } } catch (OperationException e) { - startResponseBuilder.setOperationError( - UnsuccessfulOperationError.newBuilder() - .setOperationState(e.getState().toString().toLowerCase()) - .setFailure(exceptionToNexusFailure(e.getCause(), dataConverter)) - .build()); + TemporalFailure temporalFailure; + if (e.getState() == OperationState.FAILED) { + temporalFailure = + ApplicationFailure.newFailureWithCause(e.getMessage(), "OperationError", e.getCause()); + temporalFailure.setStackTrace(e.getStackTrace()); + } else if (e.getState() == OperationState.CANCELED) { + temporalFailure = + new CanceledFailure(e.getMessage(), new EncodedValues(null), e.getCause()); + temporalFailure.setStackTrace(e.getStackTrace()); + } else { + throw new HandlerException( + HandlerException.ErrorType.INTERNAL, + new RuntimeException("Unknown operation state: " + e.getState())); + } + startResponseBuilder.setFailure(dataConverter.exceptionToFailure(temporalFailure)); } return startResponseBuilder.build(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusTaskHandler.java index 0b00919389..d2df027adb 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusTaskHandler.java @@ -1,9 +1,11 @@ package io.temporal.internal.worker; import com.uber.m3.tally.Scope; -import io.temporal.api.nexus.v1.HandlerError; +import io.nexusrpc.handler.HandlerException; import io.temporal.api.nexus.v1.Response; +import java.util.Objects; import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; import javax.annotation.Nullable; public interface NexusTaskHandler { @@ -20,15 +22,17 @@ public interface NexusTaskHandler { class Result { @Nullable private final Response response; - @Nullable private final HandlerError handlerError; + @Nullable private final HandlerException handlerException; - public Result(Response response) { + public Result(@Nonnull Response response) { + Objects.requireNonNull(response); this.response = response; - handlerError = null; + handlerException = null; } - public Result(HandlerError handlerError) { - this.handlerError = handlerError; + public Result(@Nonnull HandlerException handlerException) { + Objects.requireNonNull(handlerException); + this.handlerException = handlerException; response = null; } @@ -38,8 +42,8 @@ public Response getResponse() { } @Nullable - public HandlerError getHandlerError() { - return handlerError; + public HandlerException getHandlerException() { + return handlerException; } } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java index 7aa6a43573..7a064aa18e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java @@ -8,10 +8,13 @@ import com.uber.m3.tally.Stopwatch; import com.uber.m3.util.Duration; import com.uber.m3.util.ImmutableMap; -import io.temporal.api.nexus.v1.HandlerError; -import io.temporal.api.nexus.v1.Request; -import io.temporal.api.nexus.v1.Response; +import io.nexusrpc.OperationState; +import io.nexusrpc.handler.HandlerException; +import io.temporal.api.failure.v1.Failure; +import io.temporal.api.nexus.v1.*; import io.temporal.api.workflowservice.v1.*; +import io.temporal.common.converter.DataConverter; +import io.temporal.internal.common.NexusUtil; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.logging.LoggerTag; import io.temporal.internal.retryer.GrpcRetryer; @@ -45,6 +48,7 @@ final class NexusWorker implements SuspendableWorker { private final SingleWorkerOptions options; private final PollerOptions pollerOptions; private final Scope workerMetricsScope; + private final DataConverter dataConverter; private final GrpcRetryer grpcRetryer; private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions; private final TrackingSlotSupplier slotSupplier; @@ -55,11 +59,13 @@ public NexusWorker( @Nonnull String taskQueue, @Nonnull SingleWorkerOptions options, @Nonnull NexusTaskHandler handler, + @Nonnull DataConverter dataConverter, @Nonnull SlotSupplier slotSupplier) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); this.handler = Objects.requireNonNull(handler); + this.dataConverter = Objects.requireNonNull(dataConverter); this.options = Objects.requireNonNull(options); this.pollerOptions = getPollerOptions(options); this.workerMetricsScope = @@ -281,12 +287,12 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { Stopwatch sw = metricsScope.timer(MetricsType.NEXUS_EXEC_LATENCY).start(); try { result = handler.handle(task, metricsScope); - if (result.getHandlerError() != null) { + if (result.getHandlerException() != null) { metricsScope .tagged( Collections.singletonMap( TASK_FAILURE_TYPE, - "handler_error_" + result.getHandlerError().getErrorType())) + "handler_error_" + result.getHandlerException().getErrorType())) .counter(MetricsType.NEXUS_EXEC_FAILED_COUNTER) .inc(1); } else if (result.getResponse().hasStartOperation() @@ -297,6 +303,19 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { .tagged(Collections.singletonMap(TASK_FAILURE_TYPE, "operation_" + operationState)) .counter(MetricsType.NEXUS_EXEC_FAILED_COUNTER) .inc(1); + } else if (result.getResponse().hasStartOperation() + && result.getResponse().getStartOperation().hasFailure()) { + Failure f = result.getResponse().getStartOperation().getFailure(); + String operationState; + if (f.hasApplicationFailureInfo()) { + operationState = "failed"; + } else { + operationState = "canceled"; + } + metricsScope + .tagged(Collections.singletonMap(TASK_FAILURE_TYPE, "operation_" + operationState)) + .counter(MetricsType.NEXUS_EXEC_FAILED_COUNTER) + .inc(1); } } catch (TimeoutException e) { log.warn("Nexus task timed out while processing", e); @@ -319,7 +338,17 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { } try { - sendReply(taskToken, result, metricsScope); + // Check if the server supports using the Failure directly in responses + boolean supportTemporalFailure = + task.getResponse().getRequest().getCapabilities().getTemporalFailureResponses(); + + // Allow tests to force old format for backward compatibility testing + String forceOldFormat = System.getProperty("temporal.nexus.forceOldFailureFormat"); + if ("true".equalsIgnoreCase(forceOldFormat)) { + supportTemporalFailure = false; + } + + sendReply(taskToken, supportTemporalFailure, result, metricsScope); } catch (Exception e) { logExceptionDuringResultReporting(e, pollResponse, result); throw e; @@ -344,10 +373,55 @@ private void logExceptionDuringResultReporting( } } + @SuppressWarnings("deprecation") private void sendReply( - ByteString taskToken, NexusTaskHandler.Result response, Scope metricsScope) { + ByteString taskToken, + boolean supportTemporalFailure, + NexusTaskHandler.Result response, + Scope metricsScope) { Response taskResponse = response.getResponse(); if (taskResponse != null) { + // For old servers that do not support TemporalFailure in Failure proto, + // we need to convert the Failure to a UnsuccessfulOperationError + if (!supportTemporalFailure && taskResponse.getStartOperation().hasFailure()) { + Response.Builder b = taskResponse.toBuilder(); + Failure failure = taskResponse.getStartOperation().getFailure(); + String operationState; + if (failure.hasApplicationFailureInfo()) { + operationState = OperationState.FAILED.toString().toLowerCase(); + } else if (failure.hasCanceledFailureInfo()) { + operationState = OperationState.CANCELED.toString().toLowerCase(); + } else { + RespondNexusTaskFailedRequest.Builder request = + RespondNexusTaskFailedRequest.newBuilder() + .setTaskToken(taskToken) + .setIdentity(options.getIdentity()) + .setNamespace(namespace) + .setError( + NexusUtil.handlerErrorToNexusError( + new HandlerException( + HandlerException.ErrorType.INTERNAL, + "Failure does not have applicationFailureInfo or canceledFailureInfo"), + dataConverter)); + grpcRetryer.retry( + () -> + service + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) + .respondNexusTaskFailed(request.build()), + replyGrpcRetryerOptions); + return; + } + taskResponse = + b.setStartOperation( + taskResponse.getStartOperation().toBuilder() + .setOperationError( + UnsuccessfulOperationError.newBuilder() + .setOperationState(operationState) + .setFailure( + NexusUtil.temporalFailureToNexusFailure(failure.getCause())))) + .build(); + } RespondNexusTaskCompletedRequest request = RespondNexusTaskCompletedRequest.newBuilder() .setTaskToken(taskToken) @@ -364,22 +438,24 @@ private void sendReply( .respondNexusTaskCompleted(request), replyGrpcRetryerOptions); } else { - HandlerError taskFailed = response.getHandlerError(); - if (taskFailed != null) { - RespondNexusTaskFailedRequest request = + HandlerException handlerException = response.getHandlerException(); + if (handlerException != null) { + RespondNexusTaskFailedRequest.Builder request = RespondNexusTaskFailedRequest.newBuilder() .setTaskToken(taskToken) .setIdentity(options.getIdentity()) - .setNamespace(namespace) - .setError(taskFailed) - .build(); - + .setNamespace(namespace); + if (supportTemporalFailure) { + request.setFailure(dataConverter.exceptionToFailure(handlerException)); + } else { + request.setError(NexusUtil.handlerErrorToNexusError(handlerException, dataConverter)); + } grpcRetryer.retry( () -> service .blockingStub() .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) - .respondNexusTaskFailed(request), + .respondNexusTaskFailed(request.build()), replyGrpcRetryerOptions); } else { throw new IllegalArgumentException("[BUG] Either response or failure must be set"); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java index e4b4e86cb6..bc089644d1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java @@ -42,6 +42,7 @@ public SyncNexusWorker( taskQueue, options, taskHandler, + options.getDataConverter(), slotSupplier); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/common/NexusUtilTest.java b/temporal-sdk/src/test/java/io/temporal/internal/common/NexusUtilTest.java index 5aed837175..52af8ca589 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/common/NexusUtilTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/common/NexusUtilTest.java @@ -1,9 +1,20 @@ package io.temporal.internal.common; +import io.nexusrpc.FailureInfo; +import io.nexusrpc.handler.HandlerException; +import io.temporal.api.failure.v1.ApplicationFailureInfo; +import io.temporal.api.failure.v1.CanceledFailureInfo; +import io.temporal.api.failure.v1.Failure; +import io.temporal.api.nexus.v1.HandlerError; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.failure.ApplicationFailure; import org.junit.Assert; import org.junit.Test; public class NexusUtilTest { + private static final DataConverter DATA_CONVERTER = DefaultDataConverter.STANDARD_INSTANCE; + @Test public void testParseRequestTimeout() { Assert.assertThrows( @@ -15,4 +26,236 @@ public void testParseRequestTimeout() { Assert.assertEquals(java.time.Duration.ofMinutes(999), NexusUtil.parseRequestTimeout("999m")); Assert.assertEquals(java.time.Duration.ofMillis(1300), NexusUtil.parseRequestTimeout("1.3s")); } + + @Test + public void testTemporalFailureToNexusFailureRoundTrip() { + // Create a Temporal failure with details + Failure temporalFailure = + Failure.newBuilder() + .setMessage("test failure") + .setStackTrace("at test.Class.method(Class.java:123)") + .setApplicationFailureInfo( + ApplicationFailureInfo.newBuilder() + .setType("TestFailure") + .setNonRetryable(true) + .build()) + .build(); + + // Convert to Nexus failure + io.temporal.api.nexus.v1.Failure nexusFailure = + NexusUtil.temporalFailureToNexusFailure(temporalFailure); + + // Verify message is preserved + Assert.assertEquals("test failure", nexusFailure.getMessage()); + + // Verify metadata indicates this is a Temporal failure + Assert.assertTrue(nexusFailure.getMetadataMap().containsKey("type")); + Assert.assertEquals( + "temporal.api.failure.v1.Failure", nexusFailure.getMetadataMap().get("type")); + + // Convert back via FailureInfo + FailureInfo.Builder failureInfoBuilder = + FailureInfo.newBuilder() + .setMessage(nexusFailure.getMessage()) + .setDetailsJson(nexusFailure.getDetails().toStringUtf8()) + .setStackTrace(nexusFailure.getStackTrace()); + // Add metadata entries individually + for (String key : nexusFailure.getMetadataMap().keySet()) { + failureInfoBuilder.putMetadata(key, nexusFailure.getMetadataMap().get(key)); + } + FailureInfo failureInfo = failureInfoBuilder.build(); + + Failure reconstructed = NexusUtil.nexusFailureToAPIFailure(failureInfo, true); + + // Verify round-trip preserves all fields + Assert.assertEquals("test failure", reconstructed.getMessage()); + Assert.assertEquals("at test.Class.method(Class.java:123)", reconstructed.getStackTrace()); + Assert.assertTrue(reconstructed.hasApplicationFailureInfo()); + Assert.assertEquals("TestFailure", reconstructed.getApplicationFailureInfo().getType()); + Assert.assertTrue(reconstructed.getApplicationFailureInfo().getNonRetryable()); + } + + @Test + public void testTemporalFailureToNexusFailureInfoRoundTrip() { + // Create a Temporal failure with nested cause + Failure innerFailure = + Failure.newBuilder() + .setMessage("inner cause") + .setApplicationFailureInfo( + ApplicationFailureInfo.newBuilder().setType("InnerFailure").build()) + .build(); + + Failure outerFailure = + Failure.newBuilder() + .setMessage("outer failure") + .setCause(innerFailure) + .setApplicationFailureInfo( + ApplicationFailureInfo.newBuilder().setType("OuterFailure").build()) + .build(); + + // Convert to FailureInfo and back + FailureInfo failureInfo = NexusUtil.temporalFailureToNexusFailureInfo(outerFailure); + Failure reconstructed = NexusUtil.nexusFailureToAPIFailure(failureInfo, false); + + // Verify nested structure is preserved + Assert.assertEquals("outer failure", reconstructed.getMessage()); + Assert.assertEquals("OuterFailure", reconstructed.getApplicationFailureInfo().getType()); + Assert.assertTrue(reconstructed.hasCause()); + Assert.assertEquals("inner cause", reconstructed.getCause().getMessage()); + Assert.assertEquals( + "InnerFailure", reconstructed.getCause().getApplicationFailureInfo().getType()); + } + + @Test + public void testHandlerErrorToNexusErrorWithCause() { + ApplicationFailure cause = ApplicationFailure.newFailure("test error", "TestType", "detail"); + HandlerException exception = + new HandlerException(HandlerException.ErrorType.BAD_REQUEST, cause); + + HandlerError nexusError = NexusUtil.handlerErrorToNexusError(exception, DATA_CONVERTER); + + Assert.assertEquals("BAD_REQUEST", nexusError.getErrorType()); + Assert.assertTrue(nexusError.hasFailure()); + Assert.assertEquals("test error", nexusError.getFailure().getMessage()); + } + + @Test + public void testHandlerErrorToNexusErrorWithoutCause() { + HandlerException exception = + new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, "handler message", (Throwable) null); + + HandlerError nexusError = NexusUtil.handlerErrorToNexusError(exception, DATA_CONVERTER); + + Assert.assertEquals("BAD_REQUEST", nexusError.getErrorType()); + Assert.assertTrue(nexusError.hasFailure()); + Assert.assertEquals("handler message", nexusError.getFailure().getMessage()); + } + + @Test + public void testHandlerErrorToNexusErrorWithEmptyMessage() { + HandlerException exception = + new HandlerException(HandlerException.ErrorType.INTERNAL, "", (Throwable) null); + + HandlerError nexusError = NexusUtil.handlerErrorToNexusError(exception, DATA_CONVERTER); + + Assert.assertEquals("INTERNAL", nexusError.getErrorType()); + // Should not have failure when message is empty + Assert.assertFalse(nexusError.hasFailure()); + } + + @Test + public void testNexusFailureWithStackTracePreservation() { + String stackTrace = + "at io.temporal.test.Method1(Test.java:100)\n" + + "at io.temporal.test.Method2(Test.java:200)\n" + + "at io.temporal.test.Method3(Test.java:300)"; + + Failure failure = + Failure.newBuilder() + .setMessage("failure with stack") + .setStackTrace(stackTrace) + .setApplicationFailureInfo( + ApplicationFailureInfo.newBuilder().setType("TestFailure").build()) + .build(); + + io.temporal.api.nexus.v1.Failure nexusFailure = + NexusUtil.temporalFailureToNexusFailure(failure); + Assert.assertEquals(stackTrace, nexusFailure.getStackTrace()); + + // Convert back + FailureInfo.Builder failureInfoBuilder = + FailureInfo.newBuilder() + .setMessage(nexusFailure.getMessage()) + .setDetailsJson(nexusFailure.getDetails().toStringUtf8()) + .setStackTrace(nexusFailure.getStackTrace()); + // Add metadata entries individually + for (String key : nexusFailure.getMetadataMap().keySet()) { + failureInfoBuilder.putMetadata(key, nexusFailure.getMetadataMap().get(key)); + } + FailureInfo failureInfo = failureInfoBuilder.build(); + + Failure reconstructed = NexusUtil.nexusFailureToAPIFailure(failureInfo, true); + Assert.assertEquals(stackTrace, reconstructed.getStackTrace()); + } + + @Test + public void testNexusFailureWithoutTemporalMetadata() { + // Test handling of non-Temporal Nexus failures + FailureInfo failureInfo = + FailureInfo.newBuilder() + .setMessage("generic nexus failure") + .putMetadata("custom-key", "custom-value") + .setDetailsJson("{\"detail\":\"some data\"}") + .build(); + + Failure apiFailure = NexusUtil.nexusFailureToAPIFailure(failureInfo, true); + + // Should be wrapped as NexusFailure type + Assert.assertEquals("generic nexus failure", apiFailure.getMessage()); + Assert.assertTrue(apiFailure.hasApplicationFailureInfo()); + Assert.assertEquals("NexusFailure", apiFailure.getApplicationFailureInfo().getType()); + Assert.assertFalse(apiFailure.getApplicationFailureInfo().getNonRetryable()); + } + + @Test + public void testDeeplyNestedFailureCauses() { + // Test 4 levels of nesting + Failure level4 = + Failure.newBuilder() + .setMessage("level 4") + .setApplicationFailureInfo( + ApplicationFailureInfo.newBuilder().setType("Level4").build()) + .build(); + + Failure level3 = + Failure.newBuilder() + .setMessage("level 3") + .setCause(level4) + .setApplicationFailureInfo( + ApplicationFailureInfo.newBuilder().setType("Level3").build()) + .build(); + + Failure level2 = + Failure.newBuilder() + .setMessage("level 2") + .setCause(level3) + .setApplicationFailureInfo( + ApplicationFailureInfo.newBuilder().setType("Level2").build()) + .build(); + + Failure level1 = + Failure.newBuilder() + .setMessage("level 1") + .setCause(level2) + .setApplicationFailureInfo( + ApplicationFailureInfo.newBuilder().setType("Level1").build()) + .build(); + + // Convert through Nexus format and back + io.temporal.api.nexus.v1.Failure nexusFailure = NexusUtil.temporalFailureToNexusFailure(level1); + FailureInfo failureInfo = NexusUtil.temporalFailureToNexusFailureInfo(level1); + Failure reconstructed = NexusUtil.nexusFailureToAPIFailure(failureInfo, true); + + // Verify all levels are preserved + Assert.assertEquals("level 1", reconstructed.getMessage()); + Assert.assertEquals("level 2", reconstructed.getCause().getMessage()); + Assert.assertEquals("level 3", reconstructed.getCause().getCause().getMessage()); + Assert.assertEquals("level 4", reconstructed.getCause().getCause().getCause().getMessage()); + } + + @Test + public void testCanceledFailureConversion() { + Failure canceledFailure = + Failure.newBuilder() + .setMessage("operation canceled") + .setCanceledFailureInfo(CanceledFailureInfo.newBuilder().build()) + .build(); + + FailureInfo failureInfo = NexusUtil.temporalFailureToNexusFailureInfo(canceledFailure); + Failure reconstructed = NexusUtil.nexusFailureToAPIFailure(failureInfo, true); + + Assert.assertEquals("operation canceled", reconstructed.getMessage()); + Assert.assertTrue(reconstructed.hasCanceledFailureInfo()); + } } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java index b986e3f422..c2507f7867 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java @@ -93,7 +93,7 @@ public void startSyncTask() throws TimeoutException { NexusTaskHandler.Result result = nexusTaskHandlerImpl.handle(new NexusTask(task, null, null), metricsScope); - Assert.assertNull(result.getHandlerError()); + Assert.assertNull(result.getHandlerException()); Assert.assertNotNull(result.getResponse()); Assert.assertEquals( "Hello, world!", @@ -153,7 +153,7 @@ public void startAsyncSyncOperation() throws TimeoutException { NexusTaskHandler.Result result = nexusTaskHandlerImpl.handle(new NexusTask(task, null, null), metricsScope); - Assert.assertNull(result.getHandlerError()); + Assert.assertNull(result.getHandlerException()); Assert.assertNotNull(result.getResponse()); Assert.assertEquals( "test id", result.getResponse().getStartOperation().getAsyncSuccess().getOperationToken()); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusFailureOldFormatTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusFailureOldFormatTest.java new file mode 100644 index 0000000000..2ef9a11b7c --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/NexusFailureOldFormatTest.java @@ -0,0 +1,38 @@ +package io.temporal.workflow.nexus; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * Runs the OperationFailMetric suite with the old failure format forced via system property. This + * verifies that the test server correctly handles the old format (UnsuccessfulOperationError and + * HandlerError) even though it advertises support for the new format. + * + *

The system property "temporal.nexus.forceOldFailureFormat=true" makes the SDK send old format + * responses regardless of server capabilities. + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({OperationFailMetricTest.class}) +public class NexusFailureOldFormatTest { + private static String originalValue; + + @BeforeClass + public static void setUpClass() { + // Save original value if it exists + originalValue = System.getProperty("temporal.nexus.forceOldFailureFormat"); + // Force old format for all tests in this suite + System.setProperty("temporal.nexus.forceOldFailureFormat", "true"); + } + + @AfterClass + public static void tearDownClass() { + // Restore original value + if (originalValue != null) { + System.setProperty("temporal.nexus.forceOldFailureFormat", originalValue); + } else { + System.clearProperty("temporal.nexus.forceOldFailureFormat"); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailMetricTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailMetricTest.java index 1ed8252be9..e99acc720e 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailMetricTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailMetricTest.java @@ -14,6 +14,7 @@ import io.temporal.client.WorkflowFailedException; import io.temporal.common.reporter.TestStatsReporter; import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.CanceledFailure; import io.temporal.failure.NexusOperationFailure; import io.temporal.serviceclient.MetricsTag; import io.temporal.testUtils.Eventually; @@ -31,7 +32,7 @@ import org.junit.Test; public class OperationFailMetricTest { - private static Map invocationCount = new ConcurrentHashMap<>(); + private static final Map invocationCount = new ConcurrentHashMap<>(); private final TestStatsReporter reporter = new TestStatsReporter(); @@ -46,6 +47,11 @@ public class OperationFailMetricTest { .reportEvery(com.uber.m3.util.Duration.ofMillis(10))) .build(); + // Check if we're forcing old format via system property + private static boolean isUsingNewFormat() { + return !("true".equalsIgnoreCase(System.getProperty("temporal.nexus.forceOldFailureFormat"))); + } + private ImmutableMap.Builder getBaseTags() { return ImmutableMap.builder() .putAll(MetricsTag.defaultTags(NAMESPACE)) @@ -101,6 +107,42 @@ public void failOperationMetrics() { }); } + @Test + public void cancelOperationMetrics() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + + WorkflowFailedException workflowException = + Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("cancel")); + assertNoRetries("cancel"); + CanceledFailure canceledFailure = + assertNexusOperationFailure(CanceledFailure.class, workflowException); + Assert.assertEquals("intentional cancel", canceledFailure.getOriginalMessage()); + // TODO assert stack trace + if (isUsingNewFormat()) { + Assert.assertNotNull(canceledFailure.getCause()); + Assert.assertTrue(canceledFailure.getCause() instanceof ApplicationFailure); + ApplicationFailure applicationFailure = (ApplicationFailure) canceledFailure.getCause(); + Assert.assertEquals("intentional cancel", applicationFailure.getOriginalMessage()); + } + + Map execFailedTags = + getOperationTags() + .put(MetricsTag.TASK_FAILURE_TYPE, "operation_canceled") + .buildKeepingLast(); + Eventually.assertEventually( + Duration.ofSeconds(3), + () -> { + reporter.assertTimer( + MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, getBaseTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_EXEC_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_TASK_E2E_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 1); + }); + } + @Test public void failOperationApplicationErrorMetrics() { TestWorkflow1 workflowStub = @@ -111,6 +153,94 @@ public void failOperationApplicationErrorMetrics() { assertNoRetries("fail-app"); ApplicationFailure applicationFailure = assertNexusOperationFailure(ApplicationFailure.class, workflowException); + if (isUsingNewFormat()) { + Assert.assertEquals( + "message='intentional failure', type='TestFailure', nonRetryable=false", + applicationFailure.getOriginalMessage()); + Assert.assertEquals("OperationError", applicationFailure.getType()); + Assert.assertNotNull(applicationFailure.getCause()); + applicationFailure = (ApplicationFailure) applicationFailure.getCause(); + } + Assert.assertEquals("intentional failure", applicationFailure.getOriginalMessage()); + Assert.assertEquals("TestFailure", applicationFailure.getType()); + Assert.assertEquals("foo", applicationFailure.getDetails().get(String.class)); + + Map execFailedTags = + getOperationTags().put(MetricsTag.TASK_FAILURE_TYPE, "operation_failed").buildKeepingLast(); + Eventually.assertEventually( + Duration.ofSeconds(3), + () -> { + reporter.assertTimer( + MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, getBaseTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_EXEC_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_TASK_E2E_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 1); + }); + } + + @Test + public void cancelOperationApplicationErrorMetrics() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + + WorkflowFailedException workflowException = + Assert.assertThrows( + WorkflowFailedException.class, () -> workflowStub.execute("cancel-app")); + assertNoRetries("cancel-app"); + CanceledFailure canceledFailure = + assertNexusOperationFailure(CanceledFailure.class, workflowException); + // + if (isUsingNewFormat()) { + Assert.assertEquals( + "message='intentional cancel', type='TestFailure', nonRetryable=false", + canceledFailure.getOriginalMessage()); + Assert.assertEquals(0, canceledFailure.getDetails().getSize()); + Assert.assertNotNull(canceledFailure.getCause()); + Assert.assertTrue(canceledFailure.getCause() instanceof ApplicationFailure); + ApplicationFailure applicationFailure = (ApplicationFailure) canceledFailure.getCause(); + Assert.assertEquals("TestFailure", applicationFailure.getType()); + Assert.assertEquals("intentional cancel", applicationFailure.getOriginalMessage()); + Assert.assertEquals("foo", applicationFailure.getDetails().get(String.class)); + } else { + Assert.assertEquals("intentional cancel", canceledFailure.getOriginalMessage()); + Assert.assertEquals(1, canceledFailure.getDetails().getSize()); + } + + Map execFailedTags = + getOperationTags() + .put(MetricsTag.TASK_FAILURE_TYPE, "operation_canceled") + .buildKeepingLast(); + Eventually.assertEventually( + Duration.ofSeconds(3), + () -> { + reporter.assertTimer( + MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, getBaseTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_EXEC_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_TASK_E2E_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 1); + }); + } + + @Test + public void failOperationMessageApplicationErrorMetrics() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + + WorkflowFailedException workflowException = + Assert.assertThrows( + WorkflowFailedException.class, () -> workflowStub.execute("fail-msg-app")); + assertNoRetries("fail-msg-app"); + ApplicationFailure applicationFailure = + assertNexusOperationFailure(ApplicationFailure.class, workflowException); + if (isUsingNewFormat()) { + Assert.assertEquals("failure message", applicationFailure.getOriginalMessage()); + Assert.assertEquals("OperationError", applicationFailure.getType()); + applicationFailure = (ApplicationFailure) applicationFailure.getCause(); + } Assert.assertEquals("intentional failure", applicationFailure.getOriginalMessage()); Assert.assertEquals("TestFailure", applicationFailure.getType()); Assert.assertEquals("foo", applicationFailure.getDetails().get(String.class)); @@ -162,6 +292,39 @@ public void failHandlerBadRequestMetrics() { }); } + @Test + public void failHandlerBadRequestNoCauseMetrics() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + WorkflowFailedException workflowException = + Assert.assertThrows( + WorkflowFailedException.class, () -> workflowStub.execute("handlererror-no-cause")); + assertNoRetries("handlererror-no-cause"); + HandlerException handlerException = + assertNexusOperationFailure(HandlerException.class, workflowException); + Assert.assertEquals(HandlerException.ErrorType.BAD_REQUEST, handlerException.getErrorType()); + if (isUsingNewFormat()) { + Assert.assertEquals("handler failure message", handlerException.getMessage()); + Assert.assertNull(handlerException.getCause()); + } + + Map execFailedTags = + getOperationTags() + .put(MetricsTag.TASK_FAILURE_TYPE, "handler_error_BAD_REQUEST") + .buildKeepingLast(); + Eventually.assertEventually( + Duration.ofSeconds(3), + () -> { + reporter.assertTimer( + MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, getBaseTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_EXEC_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_TASK_E2E_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 1); + }); + } + @Test public void failHandlerAppBadRequestMetrics() { TestWorkflow1 workflowStub = @@ -196,6 +359,45 @@ public void failHandlerAppBadRequestMetrics() { }); } + @Test + public void failHandlerMessageAppBadRequestMetrics() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + WorkflowFailedException workflowException = + Assert.assertThrows( + WorkflowFailedException.class, () -> workflowStub.execute("handlererror-msg-app")); + assertNoRetries("handlererror-msg-app"); + HandlerException handlerException = + assertNexusOperationFailure(HandlerException.class, workflowException); + Assert.assertEquals(HandlerException.ErrorType.BAD_REQUEST, handlerException.getErrorType()); + if (isUsingNewFormat()) { + Assert.assertEquals("handler failure message", handlerException.getMessage()); + } else { + Assert.assertEquals("intentional failure", handlerException.getMessage()); + } + Assert.assertTrue(handlerException.getCause() instanceof ApplicationFailure); + ApplicationFailure applicationFailure = (ApplicationFailure) handlerException.getCause(); + Assert.assertEquals("intentional failure", applicationFailure.getOriginalMessage()); + Assert.assertEquals("TestFailure", applicationFailure.getType()); + Assert.assertEquals("foo", applicationFailure.getDetails().get(String.class)); + + Map execFailedTags = + getOperationTags() + .put(MetricsTag.TASK_FAILURE_TYPE, "handler_error_BAD_REQUEST") + .buildKeepingLast(); + Eventually.assertEventually( + Duration.ofSeconds(3), + () -> { + reporter.assertTimer( + MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, getBaseTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_EXEC_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_TASK_E2E_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 1); + }); + } + @Test public void failHandlerAlreadyStartedMetrics() { TestWorkflow1 workflowStub = @@ -405,17 +607,40 @@ public OperationHandler operation() { case "fail-app": throw OperationException.failure( ApplicationFailure.newFailure("intentional failure", "TestFailure", "foo")); + case "fail-msg-app": + throw OperationException.failure( + "failure message", + ApplicationFailure.newFailure("intentional failure", "TestFailure", "foo")); + case "cancel": + throw OperationException.canceled(new RuntimeException("intentional cancel")); + case "cancel-app": + throw OperationException.canceled( + ApplicationFailure.newFailure("intentional cancel", "TestFailure", "foo")); + case "cancel-msg-app": + throw OperationException.canceled( + "failure message", + ApplicationFailure.newFailure("intentional cancel", "TestFailure", "foo")); case "handlererror": throw new HandlerException(HandlerException.ErrorType.BAD_REQUEST, "handlererror"); case "handlererror-app": throw new HandlerException( HandlerException.ErrorType.BAD_REQUEST, ApplicationFailure.newFailure("intentional failure", "TestFailure", "foo")); + case "handlererror-msg-app": + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, + "handler failure message", + ApplicationFailure.newFailure("intentional failure", "TestFailure", "foo")); case "handlererror-nonretryable": throw new HandlerException( HandlerException.ErrorType.INTERNAL, ApplicationFailure.newNonRetryableFailure("intentional failure", "TestFailure"), HandlerException.RetryBehavior.NON_RETRYABLE); + case "handlererror-no-cause": + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, + "handler failure message", + (Throwable) null); case "already-started": throw new WorkflowExecutionAlreadyStarted( WorkflowExecution.getDefaultInstance(), "TestWorkflowType", null); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java index fa2b0ca40b..b1c62931c0 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java @@ -646,6 +646,7 @@ public void failWhenUpdateNamesDoNotMatch() { } @Test + @SuppressWarnings("deprecation") public void failServerSideWhenStartIsInvalid() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index 1ae5b673d6..188e309dee 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit 1ae5b673d66b0a94f6131c3eb06bc7173ae2c326 +Subproject commit 188e309dee0acb3e3c84363d2d9f11be32df3bb8 diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index ae20d22858..a8e42a9d3c 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -703,6 +703,9 @@ private static void scheduleNexusOperation( .setRequest( io.temporal.api.nexus.v1.Request.newBuilder() .setScheduledTime(ctx.currentTime()) + .setCapabilities( + io.temporal.api.nexus.v1.Request.Capabilities.newBuilder() + .setTemporalFailureResponses(true)) .putAllHeader(attr.getNexusHeaderMap()) .putHeader( io.nexusrpc.Header.OPERATION_TIMEOUT.toLowerCase(), diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 469fde42df..2c043c83ec 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -69,7 +69,7 @@ public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase implements Closeable { private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class); - private static final JsonFormat.Parser JSON_PARSER = JsonFormat.parser(); + private static final JsonFormat.Parser JSON_PARSER = JsonFormat.parser().ignoringUnknownFields(); private static final String FAILURE_TYPE_STRING = Failure.getDescriptor().getFullName(); @@ -310,6 +310,7 @@ public void startWorkflowExecution( } } + @SuppressWarnings("deprecation") StartWorkflowExecutionResponse startWorkflowExecutionImpl( StartWorkflowExecutionRequest startRequest, Duration backoffStartInterval, @@ -475,6 +476,7 @@ private StartWorkflowExecutionResponse throwDuplicatedWorkflow( WorkflowExecutionAlreadyStartedFailure.getDescriptor()); } + @SuppressWarnings("deprecation") private void validateWorkflowIdReusePolicy( WorkflowIdReusePolicy reusePolicy, WorkflowIdConflictPolicy conflictPolicy) { if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED @@ -988,7 +990,17 @@ public void respondNexusTaskCompleted( mutableState.cancelNexusOperationRequestAcknowledge(tt.getOperationRef()); } else if (request.getResponse().hasStartOperation()) { StartOperationResponse startResp = request.getResponse().getStartOperation(); - if (startResp.hasOperationError()) { + if (startResp.hasFailure()) { + // New format: Failure directly contains ApplicationFailureInfo or CanceledFailureInfo + Failure failure = startResp.getFailure(); + if (failure.hasCanceledFailureInfo()) { + mutableState.cancelNexusOperation(tt.getOperationRef(), failure); + } else { + mutableState.failNexusOperation( + tt.getOperationRef(), wrapNexusOperationFailure(failure)); + } + } else if (startResp.hasOperationError()) { + // Old format: UnsuccessfulOperationError with Nexus Failure UnsuccessfulOperationError opError = startResp.getOperationError(); Failure.Builder b = Failure.newBuilder().setMessage(opError.getFailure().getMessage()); @@ -1013,7 +1025,7 @@ public void respondNexusTaskCompleted( tt.getOperationRef(), startResp.getSyncSuccess().getPayload()); } else { throw Status.INVALID_ARGUMENT - .withDescription("Expected success or OperationError to be set on request.") + .withDescription("Expected success, Failure, or OperationError to be set on request.") .asRuntimeException(); } } else { @@ -1028,21 +1040,30 @@ public void respondNexusTaskCompleted( } } + @SuppressWarnings("deprecation") @Override public void respondNexusTaskFailed( RespondNexusTaskFailedRequest request, StreamObserver responseObserver) { try { - if (!request.hasError()) { + Failure failure; + if (request.hasFailure()) { + // New format: Failure directly contains the handler error with NexusHandlerFailureInfo + // Don't wrap with NexusOperationFailureInfo - the state machine will do that if needed + failure = request.getFailure(); + } else if (request.hasError()) { + // Old format: HandlerError needs to be converted + failure = handlerErrorToFailure(request.getError()); + } else { throw Status.INVALID_ARGUMENT - .withDescription("Nexus handler error not set on RespondNexusTaskFailedRequest") + .withDescription("Neither Failure nor Error set on RespondNexusTaskFailedRequest") .asRuntimeException(); } + NexusTaskToken tt = NexusTaskToken.fromBytes(request.getTaskToken()); TestWorkflowMutableState mutableState = getMutableState(tt.getOperationRef().getExecutionId()); if (mutableState.validateOperationTaskToken(tt)) { - Failure failure = handlerErrorToFailure(request.getError()); mutableState.failNexusOperation(tt.getOperationRef(), failure); } responseObserver.onNext(RespondNexusTaskFailedResponse.getDefaultInstance()); @@ -1115,15 +1136,27 @@ public void completeNexusOperation( } private static Failure handlerErrorToFailure(HandlerError err) { - return Failure.newBuilder() - .setMessage(err.getFailure().getMessage()) - .setNexusHandlerFailureInfo( - NexusHandlerFailureInfo.newBuilder() - .setType(err.getErrorType()) - .setRetryBehavior(err.getRetryBehavior()) - .build()) - .setCause(nexusFailureToAPIFailure(err.getFailure(), false)) - .build(); + Failure.Builder failureBuilder = + Failure.newBuilder() + .setMessage(err.getFailure().getMessage()) + .setNexusHandlerFailureInfo( + NexusHandlerFailureInfo.newBuilder() + .setType(err.getErrorType()) + .setRetryBehavior(err.getRetryBehavior()) + .build()); + // Only set cause if the failure has meaningful content beyond just a message + if (err.hasFailure() && hasFailureContent(err.getFailure())) { + failureBuilder.setCause(nexusFailureToAPIFailure(err.getFailure(), false)); + } + return failureBuilder.build(); + } + + private static boolean hasFailureContent(io.temporal.api.nexus.v1.Failure failure) { + // Check if the failure has content beyond just a message + return !failure.getDetails().isEmpty() + || !failure.getMetadataMap().isEmpty() + || !failure.getStackTrace().isEmpty() + || failure.hasCause(); } /** @@ -1149,7 +1182,11 @@ private static Failure nexusFailureToAPIFailure( applicationFailureInfo.setNonRetryable(!retryable); apiFailure.setApplicationFailureInfo(applicationFailureInfo.build()); } + // Ensure these always get written apiFailure.setMessage(failure.getMessage()); + if (!failure.getStackTrace().isEmpty()) { + apiFailure.setStackTrace(failure.getStackTrace()); + } return apiFailure.build(); }