From 2077676c33ea6c90963da73d6391acf6db4820b2 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 16 Dec 2025 19:45:05 +0700 Subject: [PATCH 1/6] fix: improve destwebhook delivery error handling --- .../providers/destwebhook/destwebhook.go | 66 +++++++- .../destwebhook/destwebhook_publish_test.go | 143 ++++++++++++++++++ 2 files changed, 208 insertions(+), 1 deletion(-) diff --git a/internal/destregistry/providers/destwebhook/destwebhook.go b/internal/destregistry/providers/destwebhook/destwebhook.go index 8a3963bd..4738a1ed 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook.go +++ b/internal/destregistry/providers/destwebhook/destwebhook.go @@ -6,6 +6,7 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -597,7 +598,27 @@ func (p *WebhookPublisher) Publish(ctx context.Context, event *models.Event) (*d resp, err := p.httpClient.Do(httpReq) if err != nil { - return nil, destregistry.NewErrDestinationPublishAttempt(err, "webhook", map[string]interface{}{ + // Context canceled is a system error (e.g., service shutdown) - return nil + // so it's treated as PreDeliveryError (nack → requeue for another instance). + if errors.Is(err, context.Canceled) { + return nil, destregistry.NewErrDestinationPublishAttempt(err, "webhook", map[string]interface{}{ + "error": "canceled", + "message": err.Error(), + }) + } + + // All other errors are destination-level failures (DeliveryError → ack + retry): + // - DNS errors (*net.DNSError): "no such host", DNS timeout + // - Connection errors (*net.OpError): connection refused, network unreachable, reset + // - Timeout errors: I/O timeout, context deadline exceeded + // - TLS errors: certificate validation, handshake failures + // - Redirect errors: too many redirects + // See: https://github.com/hookdeck/outpost/issues/571 + delivery := &destregistry.Delivery{ + Status: "failed", + Code: classifyNetworkError(err), + } + return delivery, destregistry.NewErrDestinationPublishAttempt(err, "webhook", map[string]interface{}{ "error": "request_failed", "message": err.Error(), }) @@ -771,3 +792,46 @@ func parseResponse(delivery *destregistry.Delivery, resp *http.Response) { } } } + +// classifyNetworkError returns a descriptive error code based on the error type. +// All errors classified here are destination-level failures (DeliveryError → ack + retry). +// +// Error codes and their meanings: +// - dns_error: Domain doesn't exist or DNS lookup failed +// - connection_refused: Server not running or rejecting connections +// - connection_reset: Connection was dropped by the server +// - network_unreachable: Network path to destination is unavailable +// - timeout: Request took too long (I/O timeout or context deadline) +// - tls_error: TLS/SSL certificate or handshake failure +// - redirect_error: Too many redirects +// - network_error: Other network-related failures (catch-all) +// +// Note: context.Canceled is handled separately as a system error (nack → requeue). +func classifyNetworkError(err error) string { + if err == nil { + return "unknown" + } + + errStr := err.Error() + + switch { + case strings.Contains(errStr, "no such host"): + return "dns_error" + case strings.Contains(errStr, "connection refused"): + return "connection_refused" + case strings.Contains(errStr, "connection reset"): + return "connection_reset" + case strings.Contains(errStr, "network is unreachable"): + return "network_unreachable" + case strings.Contains(errStr, "i/o timeout"): + return "timeout" + case strings.Contains(errStr, "context deadline exceeded"): + return "timeout" + case strings.Contains(errStr, "tls:") || strings.Contains(errStr, "x509:"): + return "tls_error" + case strings.Contains(errStr, "too many redirects") || strings.Contains(errStr, "stopped after"): + return "redirect_error" + default: + return "network_error" + } +} diff --git a/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go b/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go index bc862827..2b755ca6 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go +++ b/internal/destregistry/providers/destwebhook/destwebhook_publish_test.go @@ -671,6 +671,149 @@ func TestWebhookPublisher_CustomHeaders(t *testing.T) { }) } +// TestWebhookPublisher_ConnectionErrors tests that connection errors (connection refused, DNS failures) +// return a Delivery object alongside the error, NOT nil. +// +// This is important because the messagehandler uses the presence of a Delivery object to distinguish +// between "pre-delivery errors" (system issues) and "delivery errors" (destination issues): +// - nil delivery + error → PreDeliveryError → nack → DLQ +// - delivery + error → DeliveryError → ack + retry +// +// Connection errors are destination-level failures and should trigger retries, not go to DLQ. +// See: https://github.com/hookdeck/outpost/issues/571 +func TestWebhookPublisher_ConnectionErrors(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + url string + description string + expectedCode string + }{ + { + name: "connection refused", + url: "http://127.0.0.1:1/webhook", // Port 1 is typically not listening + description: "simulates a server that is not running", + expectedCode: "connection_refused", + }, + { + name: "DNS failure", + url: "http://this-domain-does-not-exist-abc123xyz.invalid/webhook", + description: "simulates an invalid/non-existent domain", + expectedCode: "dns_error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + provider, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("webhook"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "url": tt.url, + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "secret": "test-secret", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}), + ) + + // Attempt to publish to unreachable endpoint + delivery, err := publisher.Publish(context.Background(), &event) + + // Should return an error + require.Error(t, err, "should return error for %s", tt.description) + + // CRITICAL: Should return a Delivery object, NOT nil + // This ensures the error is treated as a DeliveryError (retryable) + // rather than a PreDeliveryError (goes to DLQ) + require.NotNil(t, delivery, "delivery should NOT be nil for connection errors - "+ + "returning nil causes messagehandler to treat this as PreDeliveryError (nack → DLQ) "+ + "instead of DeliveryError (ack + retry)") + + // Verify the delivery has appropriate status and code + assert.Equal(t, "failed", delivery.Status, "delivery status should be 'failed'") + assert.Equal(t, tt.expectedCode, delivery.Code, "delivery code should indicate error type") + }) + } +} + +// TestWebhookPublisher_HTTPErrors tests that HTTP error responses (4xx, 5xx) return +// a Delivery object alongside the error. This is the current correct behavior that +// connection errors should also follow. +func TestWebhookPublisher_HTTPErrors(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + statusCode int + }{ + {"400 Bad Request", 400}, + {"401 Unauthorized", 401}, + {"403 Forbidden", 403}, + {"404 Not Found", 404}, + {"429 Too Many Requests", 429}, + {"500 Internal Server Error", 500}, + {"502 Bad Gateway", 502}, + {"503 Service Unavailable", 503}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + // Create a server that returns the specified status code + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.statusCode) + w.Write([]byte(`{"error": "test error"}`)) + })) + defer server.Close() + + provider, err := destwebhook.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("webhook"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "url": server.URL + "/webhook", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "secret": "test-secret", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}), + ) + + delivery, err := publisher.Publish(context.Background(), &event) + + // Should return an error + require.Error(t, err) + + // Should return a Delivery object (this already works correctly for HTTP errors) + require.NotNil(t, delivery, "delivery should NOT be nil for HTTP errors") + assert.Equal(t, "failed", delivery.Status) + assert.Equal(t, fmt.Sprintf("%d", tt.statusCode), delivery.Code) + }) + } +} + func TestWebhookPublisher_SignatureTemplates(t *testing.T) { dest := testutil.DestinationFactory.Any( testutil.DestinationFactory.WithType("webhook"), From 4142a5e8258233aeffaaad0b9a030ec5f9d69c72 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 16 Dec 2025 19:52:49 +0700 Subject: [PATCH 2/6] refactor: http request helpers --- .../providers/destwebhook/destwebhook.go | 137 +------------ .../providers/destwebhook/httphelper.go | 183 ++++++++++++++++++ 2 files changed, 185 insertions(+), 135 deletions(-) create mode 100644 internal/destregistry/providers/destwebhook/httphelper.go diff --git a/internal/destregistry/providers/destwebhook/destwebhook.go b/internal/destregistry/providers/destwebhook/destwebhook.go index 4738a1ed..fff30b2b 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook.go +++ b/internal/destregistry/providers/destwebhook/destwebhook.go @@ -6,9 +6,7 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" - "errors" "fmt" - "io" "net/http" "regexp" "strings" @@ -596,73 +594,8 @@ func (p *WebhookPublisher) Publish(ctx context.Context, event *models.Event) (*d return nil, err } - resp, err := p.httpClient.Do(httpReq) - if err != nil { - // Context canceled is a system error (e.g., service shutdown) - return nil - // so it's treated as PreDeliveryError (nack → requeue for another instance). - if errors.Is(err, context.Canceled) { - return nil, destregistry.NewErrDestinationPublishAttempt(err, "webhook", map[string]interface{}{ - "error": "canceled", - "message": err.Error(), - }) - } - - // All other errors are destination-level failures (DeliveryError → ack + retry): - // - DNS errors (*net.DNSError): "no such host", DNS timeout - // - Connection errors (*net.OpError): connection refused, network unreachable, reset - // - Timeout errors: I/O timeout, context deadline exceeded - // - TLS errors: certificate validation, handshake failures - // - Redirect errors: too many redirects - // See: https://github.com/hookdeck/outpost/issues/571 - delivery := &destregistry.Delivery{ - Status: "failed", - Code: classifyNetworkError(err), - } - return delivery, destregistry.NewErrDestinationPublishAttempt(err, "webhook", map[string]interface{}{ - "error": "request_failed", - "message": err.Error(), - }) - } - defer resp.Body.Close() - - if resp.StatusCode >= 400 { - delivery := &destregistry.Delivery{ - Status: "failed", - Code: fmt.Sprintf("%d", resp.StatusCode), - } - parseResponse(delivery, resp) - - // Extract body from delivery.Response for error details - var bodyStr string - if delivery.Response != nil { - if body, ok := delivery.Response["body"]; ok { - switch v := body.(type) { - case string: - bodyStr = v - case map[string]interface{}: - if jsonBytes, err := json.Marshal(v); err == nil { - bodyStr = string(jsonBytes) - } - } - } - } - - return delivery, destregistry.NewErrDestinationPublishAttempt( - fmt.Errorf("request failed with status %d: %s", resp.StatusCode, bodyStr), - "webhook", - map[string]interface{}{ - "status": resp.StatusCode, - "body": bodyStr, - }) - } - - delivery := &destregistry.Delivery{ - Status: "success", - Code: fmt.Sprintf("%d", resp.StatusCode), - } - parseResponse(delivery, resp) - - return delivery, nil + result := ExecuteHTTPRequest(ctx, p.httpClient, httpReq, "webhook") + return result.Delivery, result.Error } // Format is a helper function to format the event data into an HTTP request. @@ -769,69 +702,3 @@ func isTruthy(value string) bool { return false } } - -func parseResponse(delivery *destregistry.Delivery, resp *http.Response) { - if strings.Contains(resp.Header.Get("Content-Type"), "application/json") { - bodyBytes, _ := io.ReadAll(resp.Body) - var response map[string]interface{} - if err := json.Unmarshal(bodyBytes, &response); err != nil { - delivery.Response = map[string]interface{}{ - "status": resp.StatusCode, - "body": string(bodyBytes), - } - } - delivery.Response = map[string]interface{}{ - "status": resp.StatusCode, - "body": response, - } - } else { - bodyBytes, _ := io.ReadAll(resp.Body) - delivery.Response = map[string]interface{}{ - "status": resp.StatusCode, - "body": string(bodyBytes), - } - } -} - -// classifyNetworkError returns a descriptive error code based on the error type. -// All errors classified here are destination-level failures (DeliveryError → ack + retry). -// -// Error codes and their meanings: -// - dns_error: Domain doesn't exist or DNS lookup failed -// - connection_refused: Server not running or rejecting connections -// - connection_reset: Connection was dropped by the server -// - network_unreachable: Network path to destination is unavailable -// - timeout: Request took too long (I/O timeout or context deadline) -// - tls_error: TLS/SSL certificate or handshake failure -// - redirect_error: Too many redirects -// - network_error: Other network-related failures (catch-all) -// -// Note: context.Canceled is handled separately as a system error (nack → requeue). -func classifyNetworkError(err error) string { - if err == nil { - return "unknown" - } - - errStr := err.Error() - - switch { - case strings.Contains(errStr, "no such host"): - return "dns_error" - case strings.Contains(errStr, "connection refused"): - return "connection_refused" - case strings.Contains(errStr, "connection reset"): - return "connection_reset" - case strings.Contains(errStr, "network is unreachable"): - return "network_unreachable" - case strings.Contains(errStr, "i/o timeout"): - return "timeout" - case strings.Contains(errStr, "context deadline exceeded"): - return "timeout" - case strings.Contains(errStr, "tls:") || strings.Contains(errStr, "x509:"): - return "tls_error" - case strings.Contains(errStr, "too many redirects") || strings.Contains(errStr, "stopped after"): - return "redirect_error" - default: - return "network_error" - } -} diff --git a/internal/destregistry/providers/destwebhook/httphelper.go b/internal/destregistry/providers/destwebhook/httphelper.go new file mode 100644 index 00000000..43f0b2fe --- /dev/null +++ b/internal/destregistry/providers/destwebhook/httphelper.go @@ -0,0 +1,183 @@ +package destwebhook + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + + "github.com/hookdeck/outpost/internal/destregistry" +) + +// HTTPRequestResult contains the result of an HTTP request execution. +// It handles the classification of errors into system errors vs delivery errors. +type HTTPRequestResult struct { + // Delivery is the delivery result. Will be nil for system errors (context.Canceled). + Delivery *destregistry.Delivery + // Error is the error that occurred, if any. + Error error + // Response is the HTTP response, if one was received. Caller should NOT close the body. + Response *http.Response +} + +// ExecuteHTTPRequest executes an HTTP request and classifies the result. +// +// Error classification: +// - context.Canceled: System error (service shutdown) → returns nil Delivery +// so messagehandler treats it as PreDeliveryError (nack → requeue) +// - Other network errors: Delivery error → returns Delivery with classified code +// so messagehandler treats it as DeliveryError (ack + retry) +// - HTTP 4xx/5xx: Delivery error → returns Delivery with status code +// - HTTP 2xx/3xx: Success → returns Delivery with success status +// +// See: https://github.com/hookdeck/outpost/issues/571 +func ExecuteHTTPRequest(ctx context.Context, client *http.Client, req *http.Request, provider string) *HTTPRequestResult { + resp, err := client.Do(req) + if err != nil { + // Context canceled is a system error (e.g., service shutdown) - return nil + // so it's treated as PreDeliveryError (nack → requeue for another instance). + if errors.Is(err, context.Canceled) { + return &HTTPRequestResult{ + Delivery: nil, + Error: destregistry.NewErrDestinationPublishAttempt(err, provider, map[string]interface{}{ + "error": "canceled", + "message": err.Error(), + }), + Response: nil, + } + } + + // All other errors are destination-level failures (DeliveryError → ack + retry) + return &HTTPRequestResult{ + Delivery: &destregistry.Delivery{ + Status: "failed", + Code: ClassifyNetworkError(err), + }, + Error: destregistry.NewErrDestinationPublishAttempt(err, provider, map[string]interface{}{ + "error": "request_failed", + "message": err.Error(), + }), + Response: nil, + } + } + + // HTTP error response (4xx, 5xx) + if resp.StatusCode >= 400 { + delivery := &destregistry.Delivery{ + Status: "failed", + Code: fmt.Sprintf("%d", resp.StatusCode), + } + ParseHTTPResponse(delivery, resp) + + // Extract body for error details + var bodyStr string + if delivery.Response != nil { + if body, ok := delivery.Response["body"]; ok { + switch v := body.(type) { + case string: + bodyStr = v + case map[string]interface{}: + if jsonBytes, err := json.Marshal(v); err == nil { + bodyStr = string(jsonBytes) + } + } + } + } + + return &HTTPRequestResult{ + Delivery: delivery, + Error: destregistry.NewErrDestinationPublishAttempt( + fmt.Errorf("request failed with status %d: %s", resp.StatusCode, bodyStr), + provider, + map[string]interface{}{ + "status": resp.StatusCode, + "body": bodyStr, + }), + Response: resp, + } + } + + // Success + delivery := &destregistry.Delivery{ + Status: "success", + Code: fmt.Sprintf("%d", resp.StatusCode), + } + ParseHTTPResponse(delivery, resp) + + return &HTTPRequestResult{ + Delivery: delivery, + Error: nil, + Response: resp, + } +} + +// ClassifyNetworkError returns a descriptive error code based on the error type. +// All errors classified here are destination-level failures (DeliveryError → ack + retry). +// +// Error codes and their meanings: +// - dns_error: Domain doesn't exist or DNS lookup failed +// - connection_refused: Server not running or rejecting connections +// - connection_reset: Connection was dropped by the server +// - network_unreachable: Network path to destination is unavailable +// - timeout: Request took too long (I/O timeout or context deadline) +// - tls_error: TLS/SSL certificate or handshake failure +// - redirect_error: Too many redirects +// - network_error: Other network-related failures (catch-all) +// +// Note: context.Canceled is handled separately as a system error (nack → requeue). +func ClassifyNetworkError(err error) string { + if err == nil { + return "unknown" + } + + errStr := err.Error() + + switch { + case strings.Contains(errStr, "no such host"): + return "dns_error" + case strings.Contains(errStr, "connection refused"): + return "connection_refused" + case strings.Contains(errStr, "connection reset"): + return "connection_reset" + case strings.Contains(errStr, "network is unreachable"): + return "network_unreachable" + case strings.Contains(errStr, "i/o timeout"): + return "timeout" + case strings.Contains(errStr, "context deadline exceeded"): + return "timeout" + case strings.Contains(errStr, "tls:") || strings.Contains(errStr, "x509:"): + return "tls_error" + case strings.Contains(errStr, "too many redirects") || strings.Contains(errStr, "stopped after"): + return "redirect_error" + default: + return "network_error" + } +} + +// ParseHTTPResponse reads and parses the HTTP response body into the delivery. +func ParseHTTPResponse(delivery *destregistry.Delivery, resp *http.Response) { + bodyBytes, _ := io.ReadAll(resp.Body) + + if strings.Contains(resp.Header.Get("Content-Type"), "application/json") { + var response map[string]interface{} + if err := json.Unmarshal(bodyBytes, &response); err != nil { + delivery.Response = map[string]interface{}{ + "status": resp.StatusCode, + "body": string(bodyBytes), + } + return + } + delivery.Response = map[string]interface{}{ + "status": resp.StatusCode, + "body": response, + } + } else { + delivery.Response = map[string]interface{}{ + "status": resp.StatusCode, + "body": string(bodyBytes), + } + } +} From a3065b1c65c9a4fdb08ef44eca0958563dfb2c26 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 16 Dec 2025 19:53:03 +0700 Subject: [PATCH 3/6] chore: update destwebhookstandard with same error handling logic --- .../destwebhookstandard.go | 74 +------------------ 1 file changed, 2 insertions(+), 72 deletions(-) diff --git a/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go b/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go index c6b5a61e..2899044c 100644 --- a/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go +++ b/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go @@ -54,7 +54,6 @@ import ( "context" "encoding/json" "fmt" - "io" "net/http" "strconv" "strings" @@ -539,53 +538,8 @@ func (p *StandardWebhookPublisher) Publish(ctx context.Context, event *models.Ev return nil, err } - resp, err := p.httpClient.Do(httpReq) - if err != nil { - return nil, destregistry.NewErrDestinationPublishAttempt(err, "webhook_standard", map[string]interface{}{ - "error": "request_failed", - "message": err.Error(), - }) - } - defer resp.Body.Close() - - if resp.StatusCode >= 400 { - delivery := &destregistry.Delivery{ - Status: "failed", - Code: fmt.Sprintf("%d", resp.StatusCode), - } - parseResponse(delivery, resp) - - // Extract body from delivery.Response for error details - var bodyStr string - if delivery.Response != nil { - if body, ok := delivery.Response["body"]; ok { - switch v := body.(type) { - case string: - bodyStr = v - case map[string]interface{}: - if jsonBytes, err := json.Marshal(v); err == nil { - bodyStr = string(jsonBytes) - } - } - } - } - - return delivery, destregistry.NewErrDestinationPublishAttempt( - fmt.Errorf("request failed with status %d: %s", resp.StatusCode, bodyStr), - "webhook_standard", - map[string]interface{}{ - "status": resp.StatusCode, - "body": bodyStr, - }) - } - - delivery := &destregistry.Delivery{ - Status: "success", - Code: fmt.Sprintf("%d", resp.StatusCode), - } - parseResponse(delivery, resp) - - return delivery, nil + result := destwebhook.ExecuteHTTPRequest(ctx, p.httpClient, httpReq, "webhook_standard") + return result.Delivery, result.Error } // Format creates an HTTP request formatted according to Standard Webhooks specification @@ -658,27 +612,3 @@ func isTruthy(value string) bool { return false } } - -func parseResponse(delivery *destregistry.Delivery, resp *http.Response) { - if strings.Contains(resp.Header.Get("Content-Type"), "application/json") { - bodyBytes, _ := io.ReadAll(resp.Body) - var response map[string]interface{} - if err := json.Unmarshal(bodyBytes, &response); err != nil { - delivery.Response = map[string]interface{}{ - "status": resp.StatusCode, - "body": string(bodyBytes), - } - return - } - delivery.Response = map[string]interface{}{ - "status": resp.StatusCode, - "body": response, - } - } else { - bodyBytes, _ := io.ReadAll(resp.Body) - delivery.Response = map[string]interface{}{ - "status": resp.StatusCode, - "body": string(bodyBytes), - } - } -} From 1ee5e79597cc8e88a0d157d99558a0a08505fd0f Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sat, 3 Jan 2026 21:07:21 +0700 Subject: [PATCH 4/6] fix: rabbitmq deliverymq error handling --- .../providers/destrabbitmq/destrabbitmq.go | 93 ++++++++++++++++++- .../destrabbitmq/destrabbitmq_publish_test.go | 81 ++++++++++++++++ 2 files changed, 172 insertions(+), 2 deletions(-) diff --git a/internal/destregistry/providers/destrabbitmq/destrabbitmq.go b/internal/destregistry/providers/destrabbitmq/destrabbitmq.go index da1b9861..b8e461f8 100644 --- a/internal/destregistry/providers/destrabbitmq/destrabbitmq.go +++ b/internal/destregistry/providers/destrabbitmq/destrabbitmq.go @@ -3,6 +3,7 @@ package destrabbitmq import ( "context" "encoding/json" + "errors" "fmt" "strings" "sync" @@ -139,7 +140,24 @@ func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) (* // Ensure we have a valid connection if err := p.ensureConnection(ctx); err != nil { - return nil, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ + // Context canceled is a system error (e.g., service shutdown) - return nil + // so it's treated as PreDeliveryError (nack → requeue for another instance). + // See: https://github.com/hookdeck/outpost/issues/571 + if errors.Is(err, context.Canceled) { + return nil, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ + "error": "canceled", + "message": err.Error(), + }) + } + + // All other connection errors are destination-level failures (DeliveryError → ack + retry) + return &destregistry.Delivery{ + Status: "failed", + Code: ClassifyRabbitMQError(err), + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ "error": "connection_failed", "message": err.Error(), }) @@ -167,9 +185,18 @@ func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) (* Body: []byte(dataBytes), }, ); err != nil { + // Context canceled is a system error (e.g., service shutdown) - return nil + // so it's treated as PreDeliveryError (nack → requeue for another instance). + if errors.Is(err, context.Canceled) { + return nil, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ + "error": "canceled", + "message": err.Error(), + }) + } + return &destregistry.Delivery{ Status: "failed", - Code: "ERR", + Code: ClassifyRabbitMQError(err), Response: map[string]interface{}{ "error": err.Error(), }, @@ -228,6 +255,68 @@ func rabbitURL(config *RabbitMQDestinationConfig, credentials *RabbitMQDestinati return fmt.Sprintf("%s://%s:%s@%s", scheme, credentials.Username, credentials.Password, config.ServerURL) } +// ClassifyRabbitMQError returns a descriptive error code based on the error type. +// All errors classified here are destination-level failures (DeliveryError → ack + retry). +// +// Error codes and their meanings: +// - dns_error: Domain doesn't exist or DNS lookup failed +// - connection_refused: Server not running or rejecting connections +// - connection_reset: Connection was dropped by the server +// - auth_failed: Authentication/authorization failure +// - channel_error: Channel-level error (closed, etc.) +// - exchange_not_found: Exchange doesn't exist +// - timeout: Connection or operation timed out +// - tls_error: TLS/SSL certificate or handshake failure +// - rabbitmq_error: Other RabbitMQ-related failures (catch-all) +// +// Note: context.Canceled is handled separately as a system error (nack → requeue). +func ClassifyRabbitMQError(err error) string { + if err == nil { + return "unknown" + } + + errStr := err.Error() + + // Check for AMQP-specific errors first + var amqpErr *amqp091.Error + if errors.As(err, &amqpErr) { + switch amqpErr.Code { + case amqp091.AccessRefused: + return "access_denied" + case amqp091.NotFound: + return "exchange_not_found" + case amqp091.ChannelError: + return "channel_error" + case amqp091.ConnectionForced: + return "connection_forced" + default: + return "rabbitmq_error" + } + } + + // Fall back to string matching for network-level errors + switch { + case strings.Contains(errStr, "no such host"): + return "dns_error" + case strings.Contains(errStr, "connection refused"): + return "connection_refused" + case strings.Contains(errStr, "connection reset"): + return "connection_reset" + case strings.Contains(errStr, "i/o timeout"): + return "timeout" + case strings.Contains(errStr, "context deadline exceeded"): + return "timeout" + case strings.Contains(errStr, "tls:") || strings.Contains(errStr, "x509:"): + return "tls_error" + case strings.Contains(errStr, "PLAIN") || strings.Contains(errStr, "auth") || strings.Contains(errStr, "ACCESS_REFUSED"): + return "auth_failed" + case strings.Contains(errStr, "channel"): + return "channel_error" + default: + return "rabbitmq_error" + } +} + // ===== TEST HELPERS ===== func (p *RabbitMQPublisher) GetConnection() *amqp091.Connection { diff --git a/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go b/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go index bf4e0857..781c3349 100644 --- a/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go +++ b/internal/destregistry/providers/destrabbitmq/destrabbitmq_publish_test.go @@ -1,6 +1,7 @@ package destrabbitmq_test import ( + "context" "testing" "github.com/hookdeck/outpost/internal/destregistry/providers/destrabbitmq" @@ -221,3 +222,83 @@ func toStringMap(table amqp091.Table) map[string]string { } return result } + +// TestRabbitMQPublisher_ConnectionErrors tests that connection errors (connection refused, DNS failures) +// return a Delivery object alongside the error, NOT nil. +// +// This is important because the messagehandler uses the presence of a Delivery object to distinguish +// between "pre-delivery errors" (system issues) and "delivery errors" (destination issues): +// - nil delivery + error → PreDeliveryError → nack → DLQ +// - delivery + error → DeliveryError → ack + retry +// +// Connection errors are destination-level failures and should trigger retries, not go to DLQ. +// See: https://github.com/hookdeck/outpost/issues/571 +func TestRabbitMQPublisher_ConnectionErrors(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + serverURL string + description string + expectedCode string + }{ + { + name: "connection refused", + serverURL: "127.0.0.1:1", // Port 1 is typically not listening + description: "simulates a server that is not running", + expectedCode: "connection_refused", + }, + { + name: "DNS failure", + serverURL: "this-domain-does-not-exist-abc123xyz.invalid:5672", + description: "simulates an invalid/non-existent domain", + expectedCode: "dns_error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + provider, err := destrabbitmq.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("rabbitmq"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "server_url": tt.serverURL, + "exchange": "test-exchange", + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "username": "guest", + "password": "guest", + }), + ) + + publisher, err := provider.CreatePublisher(context.Background(), &destination) + require.NoError(t, err) + defer publisher.Close() + + event := testutil.EventFactory.Any( + testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}), + ) + + // Attempt to publish to unreachable endpoint + delivery, err := publisher.Publish(context.Background(), &event) + + // Should return an error + require.Error(t, err, "should return error for %s", tt.description) + + // CRITICAL: Should return a Delivery object, NOT nil + // This ensures the error is treated as a DeliveryError (retryable) + // rather than a PreDeliveryError (goes to DLQ) + require.NotNil(t, delivery, "delivery should NOT be nil for connection errors - "+ + "returning nil causes messagehandler to treat this as PreDeliveryError (nack → DLQ) "+ + "instead of DeliveryError (ack + retry)") + + // Verify the delivery has appropriate status and code + assert.Equal(t, "failed", delivery.Status, "delivery status should be 'failed'") + assert.Equal(t, tt.expectedCode, delivery.Code, "delivery code should indicate error type") + }) + } +} From 558fd11395b9eeb29353d07ca7bf54aec5038361 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 4 Jan 2026 00:48:45 +0700 Subject: [PATCH 5/6] refactor: handle context.Canceled err --- internal/destregistry/error.go | 12 +++ .../destazureservicebus.go | 26 +++++-- .../providers/destrabbitmq/destrabbitmq.go | 41 +++-------- .../providers/destwebhook/httphelper.go | 30 +------- internal/destregistry/registry.go | 7 ++ internal/destregistry/registry_test.go | 73 +++++++++++++++++++ 6 files changed, 121 insertions(+), 68 deletions(-) diff --git a/internal/destregistry/error.go b/internal/destregistry/error.go index 23b76229..c824d9b2 100644 --- a/internal/destregistry/error.go +++ b/internal/destregistry/error.go @@ -1,6 +1,7 @@ package destregistry import ( + "context" "errors" "fmt" ) @@ -38,4 +39,15 @@ func NewErrDestinationPublishAttempt(err error, provider string, data map[string return &ErrDestinationPublishAttempt{Err: err, Provider: provider, Data: data} } +// NewErrPublishCanceled creates an error for when publish is canceled (e.g., service shutdown). +// This should return nil Delivery to trigger nack → requeue for another instance. +// See: https://github.com/hookdeck/outpost/issues/571 +func NewErrPublishCanceled(provider string) error { + return &ErrDestinationPublishAttempt{ + Err: context.Canceled, + Provider: provider, + Data: map[string]interface{}{"error": "canceled"}, + } +} + var ErrPublisherClosed = errors.New("publisher is closed") diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go index 31667e80..86e2c795 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go @@ -158,19 +158,29 @@ func (p *AzureServiceBusPublisher) Publish(ctx context.Context, event *models.Ev sender, err := p.ensureSender() if err != nil { - return nil, err + return &destregistry.Delivery{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{ + "error": "sender_failed", + "message": err.Error(), + }) } if err := sender.SendMessage(ctx, message, nil); err != nil { return &destregistry.Delivery{ - Status: "failed", - Code: "ERR", - Response: map[string]interface{}{ - "error": err.Error(), - }, - }, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ "error": err.Error(), - }) + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{ + "error": "send_failed", + "message": err.Error(), + }) } return &destregistry.Delivery{ diff --git a/internal/destregistry/providers/destrabbitmq/destrabbitmq.go b/internal/destregistry/providers/destrabbitmq/destrabbitmq.go index b8e461f8..aaa3273f 100644 --- a/internal/destregistry/providers/destrabbitmq/destrabbitmq.go +++ b/internal/destregistry/providers/destrabbitmq/destrabbitmq.go @@ -138,19 +138,7 @@ func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) (* } defer p.BasePublisher.FinishPublish() - // Ensure we have a valid connection if err := p.ensureConnection(ctx); err != nil { - // Context canceled is a system error (e.g., service shutdown) - return nil - // so it's treated as PreDeliveryError (nack → requeue for another instance). - // See: https://github.com/hookdeck/outpost/issues/571 - if errors.Is(err, context.Canceled) { - return nil, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ - "error": "canceled", - "message": err.Error(), - }) - } - - // All other connection errors are destination-level failures (DeliveryError → ack + retry) return &destregistry.Delivery{ Status: "failed", Code: ClassifyRabbitMQError(err), @@ -185,25 +173,16 @@ func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) (* Body: []byte(dataBytes), }, ); err != nil { - // Context canceled is a system error (e.g., service shutdown) - return nil - // so it's treated as PreDeliveryError (nack → requeue for another instance). - if errors.Is(err, context.Canceled) { - return nil, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ - "error": "canceled", - "message": err.Error(), - }) - } - return &destregistry.Delivery{ - Status: "failed", - Code: ClassifyRabbitMQError(err), - Response: map[string]interface{}{ - "error": err.Error(), - }, - }, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ - "error": "publish_failed", - "message": err.Error(), - }) + Status: "failed", + Code: ClassifyRabbitMQError(err), + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ + "error": "publish_failed", + "message": err.Error(), + }) } return &destregistry.Delivery{ @@ -268,8 +247,6 @@ func rabbitURL(config *RabbitMQDestinationConfig, credentials *RabbitMQDestinati // - timeout: Connection or operation timed out // - tls_error: TLS/SSL certificate or handshake failure // - rabbitmq_error: Other RabbitMQ-related failures (catch-all) -// -// Note: context.Canceled is handled separately as a system error (nack → requeue). func ClassifyRabbitMQError(err error) string { if err == nil { return "unknown" diff --git a/internal/destregistry/providers/destwebhook/httphelper.go b/internal/destregistry/providers/destwebhook/httphelper.go index 43f0b2fe..94c1028c 100644 --- a/internal/destregistry/providers/destwebhook/httphelper.go +++ b/internal/destregistry/providers/destwebhook/httphelper.go @@ -3,7 +3,6 @@ package destwebhook import ( "context" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -13,9 +12,8 @@ import ( ) // HTTPRequestResult contains the result of an HTTP request execution. -// It handles the classification of errors into system errors vs delivery errors. type HTTPRequestResult struct { - // Delivery is the delivery result. Will be nil for system errors (context.Canceled). + // Delivery is the delivery result. Delivery *destregistry.Delivery // Error is the error that occurred, if any. Error error @@ -24,33 +22,11 @@ type HTTPRequestResult struct { } // ExecuteHTTPRequest executes an HTTP request and classifies the result. -// -// Error classification: -// - context.Canceled: System error (service shutdown) → returns nil Delivery -// so messagehandler treats it as PreDeliveryError (nack → requeue) -// - Other network errors: Delivery error → returns Delivery with classified code -// so messagehandler treats it as DeliveryError (ack + retry) -// - HTTP 4xx/5xx: Delivery error → returns Delivery with status code -// - HTTP 2xx/3xx: Success → returns Delivery with success status -// +// All errors return a Delivery object with a classified error code. // See: https://github.com/hookdeck/outpost/issues/571 func ExecuteHTTPRequest(ctx context.Context, client *http.Client, req *http.Request, provider string) *HTTPRequestResult { resp, err := client.Do(req) if err != nil { - // Context canceled is a system error (e.g., service shutdown) - return nil - // so it's treated as PreDeliveryError (nack → requeue for another instance). - if errors.Is(err, context.Canceled) { - return &HTTPRequestResult{ - Delivery: nil, - Error: destregistry.NewErrDestinationPublishAttempt(err, provider, map[string]interface{}{ - "error": "canceled", - "message": err.Error(), - }), - Response: nil, - } - } - - // All other errors are destination-level failures (DeliveryError → ack + retry) return &HTTPRequestResult{ Delivery: &destregistry.Delivery{ Status: "failed", @@ -126,8 +102,6 @@ func ExecuteHTTPRequest(ctx context.Context, client *http.Client, req *http.Requ // - tls_error: TLS/SSL certificate or handshake failure // - redirect_error: Too many redirects // - network_error: Other network-related failures (catch-all) -// -// Note: context.Canceled is handled separately as a system error (nack → requeue). func ClassifyNetworkError(err error) string { if err == nil { return "unknown" diff --git a/internal/destregistry/registry.go b/internal/destregistry/registry.go index 02779914..8b670da0 100644 --- a/internal/destregistry/registry.go +++ b/internal/destregistry/registry.go @@ -153,6 +153,13 @@ func (r *registry) PublishEvent(ctx context.Context, destination *models.Destina deliveryData, err := publisher.Publish(timeoutCtx, event) if err != nil { + // Context canceled = system shutdown, return nil delivery to trigger nack → requeue. + // This is handled centrally so individual publishers don't need to check for it. + // See: https://github.com/hookdeck/outpost/issues/571 + if errors.Is(err, context.Canceled) { + return nil, NewErrPublishCanceled(destination.Type) + } + if deliveryData != nil { delivery.Time = time.Now() delivery.Status = deliveryData.Status diff --git a/internal/destregistry/registry_test.go b/internal/destregistry/registry_test.go index 081c2a1e..edf92a0e 100644 --- a/internal/destregistry/registry_test.go +++ b/internal/destregistry/registry_test.go @@ -700,6 +700,79 @@ func TestPublishEventTimeout(t *testing.T) { }) } +// TestPublishEventCanceled tests that context.Canceled errors are handled centrally +// and return nil delivery to trigger nack → requeue behavior. +// See: https://github.com/hookdeck/outpost/issues/571 +func TestPublishEventCanceled(t *testing.T) { + t.Parallel() + + t.Run("should return nil delivery when context is canceled", func(t *testing.T) { + t.Parallel() + + registry := destregistry.NewRegistry(&destregistry.Config{ + DeliveryTimeout: time.Second, + }, testutil.CreateTestLogger(t)) + + provider, err := newMockProvider() + require.NoError(t, err) + provider.publishDelay = time.Second // Long enough that we can cancel + err = registry.RegisterProvider("test", provider) + require.NoError(t, err) + + destination := &models.Destination{Type: "test"} + event := &models.Event{} + + // Create a context that we'll cancel + ctx, cancel := context.WithCancel(context.Background()) + + // Cancel immediately + cancel() + + delivery, err := registry.PublishEvent(ctx, destination, event) + + // Should return error + require.Error(t, err) + + // CRITICAL: Should return nil delivery for context.Canceled + // This ensures messagehandler treats it as PreDeliveryError (nack → requeue) + assert.Nil(t, delivery, "delivery should be nil for context.Canceled to trigger requeue") + + // Verify it's the right error type + var publishErr *destregistry.ErrDestinationPublishAttempt + require.ErrorAs(t, err, &publishErr) + assert.Equal(t, "canceled", publishErr.Data["error"]) + }) + + t.Run("should return delivery when publish fails with other errors", func(t *testing.T) { + t.Parallel() + + registry := destregistry.NewRegistry(&destregistry.Config{}, testutil.CreateTestLogger(t)) + + provider, err := newMockProvider() + require.NoError(t, err) + provider.mockError = destregistry.NewErrDestinationPublishAttempt( + errors.New("connection refused"), + "test", + map[string]interface{}{"error": "connection_refused"}, + ) + err = registry.RegisterProvider("test", provider) + require.NoError(t, err) + + destination := &models.Destination{Type: "test"} + event := &models.Event{} + + delivery, err := registry.PublishEvent(context.Background(), destination, event) + + // Should return error + require.Error(t, err) + + // Should return non-nil delivery for other errors (to trigger retry, not requeue) + // Note: The mock returns nil delivery with mockError, so this tests the registry behavior + // In real publishers, they should return Delivery objects for connection errors + assert.Nil(t, delivery, "mock returns nil, but real publishers should return Delivery") + }) +} + func TestDisplayDestination(t *testing.T) { t.Parallel() From 1febb7deb7348c98428c34a8e5ea2e9ce31e9b7a Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Sun, 4 Jan 2026 00:48:50 +0700 Subject: [PATCH 6/6] chore: gofmt --- .../destazureservicebus.go | 36 +++++++++---------- .../providers/destrabbitmq/destrabbitmq.go | 36 +++++++++---------- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go index 86e2c795..0f0b93db 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go @@ -159,28 +159,28 @@ func (p *AzureServiceBusPublisher) Publish(ctx context.Context, event *models.Ev sender, err := p.ensureSender() if err != nil { return &destregistry.Delivery{ - Status: "failed", - Code: "ERR", - Response: map[string]interface{}{ - "error": err.Error(), - }, - }, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{ - "error": "sender_failed", - "message": err.Error(), - }) + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{ + "error": "sender_failed", + "message": err.Error(), + }) } if err := sender.SendMessage(ctx, message, nil); err != nil { return &destregistry.Delivery{ - Status: "failed", - Code: "ERR", - Response: map[string]interface{}{ - "error": err.Error(), - }, - }, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{ - "error": "send_failed", - "message": err.Error(), - }) + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{ + "error": "send_failed", + "message": err.Error(), + }) } return &destregistry.Delivery{ diff --git a/internal/destregistry/providers/destrabbitmq/destrabbitmq.go b/internal/destregistry/providers/destrabbitmq/destrabbitmq.go index aaa3273f..c5720396 100644 --- a/internal/destregistry/providers/destrabbitmq/destrabbitmq.go +++ b/internal/destregistry/providers/destrabbitmq/destrabbitmq.go @@ -140,15 +140,15 @@ func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) (* if err := p.ensureConnection(ctx); err != nil { return &destregistry.Delivery{ - Status: "failed", - Code: ClassifyRabbitMQError(err), - Response: map[string]interface{}{ - "error": err.Error(), - }, - }, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ - "error": "connection_failed", - "message": err.Error(), - }) + Status: "failed", + Code: ClassifyRabbitMQError(err), + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ + "error": "connection_failed", + "message": err.Error(), + }) } dataBytes, err := json.Marshal(event.Data) @@ -174,15 +174,15 @@ func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) (* }, ); err != nil { return &destregistry.Delivery{ - Status: "failed", - Code: ClassifyRabbitMQError(err), - Response: map[string]interface{}{ - "error": err.Error(), - }, - }, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ - "error": "publish_failed", - "message": err.Error(), - }) + Status: "failed", + Code: ClassifyRabbitMQError(err), + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{ + "error": "publish_failed", + "message": err.Error(), + }) } return &destregistry.Delivery{