Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 95 additions & 70 deletions api/matchingservice/v1/request_response.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion chasm/lib/callback/chasm_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c chasmInvocation) getHistoryRequest(
Completion: completion,
}
case *nexusrpc.OperationCompletionUnsuccessful:
apiFailure, err := commonnexus.NexusFailureToAPIFailure(op.Failure, true)
apiFailure, err := commonnexus.NexusFailureToTemporalFailure(op.Failure)
if err != nil {
return nil, fmt.Errorf("failed to convert failure type: %v", err)
}
Expand Down
298 changes: 210 additions & 88 deletions common/nexus/failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package nexus

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync/atomic"

"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -52,135 +55,254 @@ var failureTypeString = string((&failurepb.Failure{}).ProtoReflect().Descriptor(

// ProtoFailureToNexusFailure converts a proto Nexus Failure to a Nexus SDK Failure.
func ProtoFailureToNexusFailure(failure *nexuspb.Failure) nexus.Failure {
return nexus.Failure{
Message: failure.GetMessage(),
Metadata: failure.GetMetadata(),
Details: failure.GetDetails(),
nf := nexus.Failure{
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: failure.GetMetadata(),
Details: failure.GetDetails(),
}
if failure.GetCause() != nil {
cause := ProtoFailureToNexusFailure(failure.GetCause())
nf.Cause = &cause
}
return nf
}

// NexusFailureToProtoFailure converts a Nexus SDK Failure to a proto Nexus Failure.
// Always returns a non-nil value.
func NexusFailureToProtoFailure(failure nexus.Failure) *nexuspb.Failure {
return &nexuspb.Failure{
Message: failure.Message,
Metadata: failure.Metadata,
Details: failure.Details,
pf := &nexuspb.Failure{
Message: failure.Message,
Metadata: failure.Metadata,
Details: failure.Details,
StackTrace: failure.StackTrace,
}
if failure.Cause != nil {
pf.Cause = NexusFailureToProtoFailure(*failure.Cause)
}
return pf
}

type serializedHandlerError struct {
Type string `json:"type,omitempty"`
RetryableOverride *bool `json:"retryableOverride,omitempty"`
// Bytes as base64 encoded string.
EncodedAttributes string `json:"encodedAttributes,omitempty"`
}

// APIFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to
// the proto fullname of the temporal API Failure message.
// Mutates the failure temporarily, unsetting the Message field to avoid duplicating the information in the serialized
// failure. Mutating was chosen over cloning for performance reasons since this function may be called frequently.
func APIFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error) {
// Unset message so it's not serialized in the details.
// TemporalFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to
// the proto fullname of the temporal API Failure message or the standard Nexus SDK failure types.
// Returns an error if the failure cannot be converted.
// Mutates the failure temporarily, unsetting the Message and StackTrace fields to avoid duplicating the information in
// the serialized failure. Mutating was chosen over cloning for performance reasons since this function may be called
// frequently.
func TemporalFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error) {
var causep *nexus.Failure
if failure.GetCause() != nil {
var cause nexus.Failure
var err error
cause, err = TemporalFailureToNexusFailure(failure.GetCause())
if err != nil {
return nexus.Failure{}, err
}
causep = &cause
}

switch info := failure.GetFailureInfo().(type) {
case *failurepb.Failure_NexusHandlerFailureInfo:
var encodedAttributes string
if failure.EncodedAttributes != nil {
b, err := protojson.Marshal(failure.EncodedAttributes)
if err != nil {
return nexus.Failure{}, fmt.Errorf("failed to deserialize HandlerError attributes: %w", err)
}
encodedAttributes = base64.RawURLEncoding.EncodeToString(b)
}
var retryableOverride *bool
// nolint:exhaustive,revive // There are only two valid values other than unspecified.
switch info.NexusHandlerFailureInfo.GetRetryBehavior() {
case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE:
val := true
retryableOverride = &val
case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE:
val := false
retryableOverride = &val
}

handlerError := serializedHandlerError{
Type: info.NexusHandlerFailureInfo.GetType(),
RetryableOverride: retryableOverride,
EncodedAttributes: encodedAttributes,
}

details, err := json.Marshal(handlerError)
if err != nil {
return nexus.Failure{}, err
}
return nexus.Failure{
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: map[string]string{
"type": "nexus.HandlerError",
},
Details: details,
Cause: causep,
}, nil
}
// Unset message and stack trace so it's not serialized in the details.
var message string
message, failure.Message = failure.Message, ""
var stackTrace string
stackTrace, failure.StackTrace = failure.StackTrace, ""

data, err := protojson.Marshal(failure)
failure.Message = message

failure.StackTrace = stackTrace
if err != nil {
return nexus.Failure{}, err
}

return nexus.Failure{
Message: failure.GetMessage(),
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: map[string]string{
"type": failureTypeString,
},
Details: data,
Cause: causep,
}, nil
}

// NexusFailureToAPIFailure converts a Nexus Failure to an API proto Failure.
// NexusFailureToTemporalFailure converts a Nexus Failure to an API proto Failure.
// If the failure metadata "type" field is set to the fullname of the temporal API Failure message, the failure is
// reconstructed using protojson.Unmarshal on the failure details field.
func NexusFailureToAPIFailure(failure nexus.Failure, retryable bool) (*failurepb.Failure, error) {
apiFailure := &failurepb.Failure{}
// reconstructed using protojson.Unmarshal on the failure details field. Otherwise, the failure is reconstructed
// based on the known Nexus SDK failure types.
// Returns an error if the failure cannot be converted.
// nolint:revive // cognitive-complexity is high but justified to keep each case together
func NexusFailureToTemporalFailure(f nexus.Failure) (*failurepb.Failure, error) {
apiFailure := &failurepb.Failure{
Message: f.Message,
StackTrace: f.StackTrace,
}

if failure.Metadata != nil && failure.Metadata["type"] == failureTypeString {
if err := protojson.Unmarshal(failure.Details, apiFailure); err != nil {
return nil, err
if f.Metadata != nil {
switch f.Metadata["type"] {
case failureTypeString:
opts := protojson.UnmarshalOptions{DiscardUnknown: true}
if err := opts.Unmarshal(f.Details, apiFailure); err != nil {
return nil, err
}
// Restore these fields as they are not included in the marshalled failure.
apiFailure.Message = f.Message
apiFailure.StackTrace = f.StackTrace
case "nexus.OperationError":
// Special case for OperationError that adapts from Nexus semantics to Temporal semantics.
// Note that Temporal -> Temporal doesn't go through this code path, operation errors are always used as empty
// wrappers for an underlying causes.
var operationError *nexus.OperationError
err := json.Unmarshal(f.Details, &operationError)
if err != nil {
return nil, fmt.Errorf("failed to deserialize OperationError: %w", err)
}
if operationError.State == nexus.OperationStateCanceled {
// Canceled operation errors are represented as CanceledFailure in Temporal.
apiFailure.FailureInfo = &failurepb.Failure_CanceledFailureInfo{
CanceledFailureInfo: &failurepb.CanceledFailureInfo{},
}
} else {
// Failed operation errors are represented as non-retryable ApplicationFailure in Temporal.
apiFailure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{
ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
NonRetryable: true,
Type: "OperationError",
},
}
}
case "nexus.HandlerError":
var se serializedHandlerError
err := json.Unmarshal(f.Details, &se)
if err != nil {
return nil, fmt.Errorf("failed to deserialize HandlerError: %w", err)
}
var retryBehavior enumspb.NexusHandlerErrorRetryBehavior
if se.RetryableOverride == nil {
retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED
} else if *se.RetryableOverride {
retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE
} else {
retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE
}
apiFailure.FailureInfo = &failurepb.Failure_NexusHandlerFailureInfo{
NexusHandlerFailureInfo: &failurepb.NexusHandlerFailureInfo{
Type: se.Type,
RetryBehavior: retryBehavior,
},
}
if len(se.EncodedAttributes) > 0 {
decoded, err := base64.RawURLEncoding.DecodeString(se.EncodedAttributes)
if err != nil {
return nil, fmt.Errorf("failed to decode base64 HandlerError attributes: %w", err)
}
apiFailure.EncodedAttributes = &commonpb.Payload{}
if err := protojson.Unmarshal(decoded, apiFailure.EncodedAttributes); err != nil {
return nil, fmt.Errorf("failed to deserialize HandlerError attributes: %w", err)
}
}
default:
// We don't recognize this type, convert to a generic ApplicationFailure and preserve the original Nexus failure
// as serialized details.
applicationFailureInfo, err := nexusFailureMetadataToApplicationFailureInfo(f)
if err != nil {
return nil, fmt.Errorf("failed to serialize Nexus failure: %w", err)
}
apiFailure.FailureInfo = applicationFailureInfo
}
} else {
payloads, err := nexusFailureMetadataToPayloads(failure)
} else if len(f.Details) > 0 {
// We don't recognize this type, convert to a generic ApplicationFailure and preserve the original Nexus failure as
// serialized details.
applicationFailureInfo, err := nexusFailureMetadataToApplicationFailureInfo(f)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to serialize Nexus failure: %w", err)
}
apiFailure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{
ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
// Make up a type here, it's not part of the Nexus Failure spec.
Type: "NexusFailure",
Details: payloads,
NonRetryable: !retryable,
},
apiFailure.FailureInfo = applicationFailureInfo
}

if f.Cause != nil {
var err error
apiFailure.Cause, err = NexusFailureToTemporalFailure(*f.Cause)
if err != nil {
return nil, err
}
}
// Ensure this always gets written.
apiFailure.Message = failure.Message
return apiFailure, nil
}

func OperationErrorToTemporalFailure(opErr *nexus.OperationError) (*failurepb.Failure, error) {
var nexusFailure nexus.Failure
failureErr, ok := opErr.Cause.(*nexus.FailureError)
if ok {
nexusFailure = failureErr.Failure
} else if opErr.Cause != nil {
nexusFailure = nexus.Failure{Message: opErr.Cause.Error()}
}

// Canceled must be translated into a CanceledFailure to match the SDK expectation.
if opErr.State == nexus.OperationStateCanceled {
if nexusFailure.Metadata != nil && nexusFailure.Metadata["type"] == failureTypeString {
temporalFailure, err := NexusFailureToAPIFailure(nexusFailure, false)
if err != nil {
return nil, err
}
if temporalFailure.GetCanceledFailureInfo() != nil {
// We already have a CanceledFailure, use it.
return temporalFailure, nil
}
// Fallback to encoding the Nexus failure into a Temporal canceled failure, we expect operations that end up
// as canceled to have a CanceledFailureInfo object.
}
payloads, err := nexusFailureMetadataToPayloads(nexusFailure)
func nexusFailureMetadataToApplicationFailureInfo(failure nexus.Failure) (*failurepb.Failure_ApplicationFailureInfo, error) {
var payloads *commonpb.Payloads
if len(failure.Metadata) > 0 || len(failure.Details) > 0 {
// Delete before serializing (note the failure here is passed by value).
failure.Message = ""
failure.StackTrace = ""
data, err := json.Marshal(failure)
if err != nil {
return nil, err
}
return &failurepb.Failure{
Message: nexusFailure.Message,
FailureInfo: &failurepb.Failure_CanceledFailureInfo{
CanceledFailureInfo: &failurepb.CanceledFailureInfo{
Details: payloads,
payloads = &commonpb.Payloads{
Payloads: []*commonpb.Payload{
{
Metadata: map[string][]byte{
"encoding": []byte("json/plain"),
},
Data: data,
},
},
}, nil
}

return NexusFailureToAPIFailure(nexusFailure, false)
}

func nexusFailureMetadataToPayloads(failure nexus.Failure) (*commonpb.Payloads, error) {
if len(failure.Metadata) == 0 && len(failure.Details) == 0 {
return nil, nil
}
// Delete before serializing.
failure.Message = ""
data, err := json.Marshal(failure)
if err != nil {
return nil, err
}
}
return &commonpb.Payloads{
Payloads: []*commonpb.Payload{
{
Metadata: map[string][]byte{
"encoding": []byte("json/plain"),
},
Data: data,
},
return &failurepb.Failure_ApplicationFailureInfo{
ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
Details: payloads,
},
}, err
}, nil
}

// ConvertGRPCError converts either a serviceerror or a gRPC status error into a Nexus HandlerError if possible.
Expand Down
Loading