Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions internal/destregistry/error.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package destregistry

import (
"context"
"errors"
"fmt"
)
Expand Down Expand Up @@ -38,4 +39,15 @@ func NewErrDestinationPublishAttempt(err error, provider string, data map[string
return &ErrDestinationPublishAttempt{Err: err, Provider: provider, Data: data}
}

// NewErrPublishCanceled creates an error for when publish is canceled (e.g., service shutdown).
// This should return nil Delivery to trigger nack → requeue for another instance.
// See: https://github.com/hookdeck/outpost/issues/571
func NewErrPublishCanceled(provider string) error {
return &ErrDestinationPublishAttempt{
Err: context.Canceled,
Provider: provider,
Data: map[string]interface{}{"error": "canceled"},
}
}

var ErrPublisherClosed = errors.New("publisher is closed")
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,16 @@ func (p *AzureServiceBusPublisher) Publish(ctx context.Context, event *models.Ev

sender, err := p.ensureSender()
if err != nil {
return nil, err
return &destregistry.Delivery{
Status: "failed",
Code: "ERR",
Response: map[string]interface{}{
"error": err.Error(),
},
}, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{
"error": "sender_failed",
"message": err.Error(),
})
}

if err := sender.SendMessage(ctx, message, nil); err != nil {
Expand All @@ -169,7 +178,8 @@ func (p *AzureServiceBusPublisher) Publish(ctx context.Context, event *models.Ev
"error": err.Error(),
},
}, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{
"error": err.Error(),
"error": "send_failed",
"message": err.Error(),
})
}

Expand Down
78 changes: 72 additions & 6 deletions internal/destregistry/providers/destrabbitmq/destrabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package destrabbitmq
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -137,12 +138,17 @@ func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) (*
}
defer p.BasePublisher.FinishPublish()

// Ensure we have a valid connection
if err := p.ensureConnection(ctx); err != nil {
return nil, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{
"error": "connection_failed",
"message": err.Error(),
})
return &destregistry.Delivery{
Status: "failed",
Code: ClassifyRabbitMQError(err),
Response: map[string]interface{}{
"error": err.Error(),
},
}, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{
"error": "connection_failed",
"message": err.Error(),
})
}

dataBytes, err := json.Marshal(event.Data)
Expand All @@ -169,7 +175,7 @@ func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) (*
); err != nil {
return &destregistry.Delivery{
Status: "failed",
Code: "ERR",
Code: ClassifyRabbitMQError(err),
Response: map[string]interface{}{
"error": err.Error(),
},
Expand Down Expand Up @@ -228,6 +234,66 @@ func rabbitURL(config *RabbitMQDestinationConfig, credentials *RabbitMQDestinati
return fmt.Sprintf("%s://%s:%s@%s", scheme, credentials.Username, credentials.Password, config.ServerURL)
}

// ClassifyRabbitMQError returns a descriptive error code based on the error type.
// All errors classified here are destination-level failures (DeliveryError → ack + retry).
//
// Error codes and their meanings:
// - dns_error: Domain doesn't exist or DNS lookup failed
// - connection_refused: Server not running or rejecting connections
// - connection_reset: Connection was dropped by the server
// - auth_failed: Authentication/authorization failure
// - channel_error: Channel-level error (closed, etc.)
// - exchange_not_found: Exchange doesn't exist
// - timeout: Connection or operation timed out
// - tls_error: TLS/SSL certificate or handshake failure
// - rabbitmq_error: Other RabbitMQ-related failures (catch-all)
func ClassifyRabbitMQError(err error) string {
if err == nil {
return "unknown"
}

errStr := err.Error()

// Check for AMQP-specific errors first
var amqpErr *amqp091.Error
if errors.As(err, &amqpErr) {
switch amqpErr.Code {
case amqp091.AccessRefused:
return "access_denied"
case amqp091.NotFound:
return "exchange_not_found"
case amqp091.ChannelError:
return "channel_error"
case amqp091.ConnectionForced:
return "connection_forced"
default:
return "rabbitmq_error"
}
}

// Fall back to string matching for network-level errors
switch {
case strings.Contains(errStr, "no such host"):
return "dns_error"
case strings.Contains(errStr, "connection refused"):
return "connection_refused"
case strings.Contains(errStr, "connection reset"):
return "connection_reset"
case strings.Contains(errStr, "i/o timeout"):
return "timeout"
case strings.Contains(errStr, "context deadline exceeded"):
return "timeout"
case strings.Contains(errStr, "tls:") || strings.Contains(errStr, "x509:"):
return "tls_error"
case strings.Contains(errStr, "PLAIN") || strings.Contains(errStr, "auth") || strings.Contains(errStr, "ACCESS_REFUSED"):
return "auth_failed"
case strings.Contains(errStr, "channel"):
return "channel_error"
default:
return "rabbitmq_error"
}
}

// ===== TEST HELPERS =====

func (p *RabbitMQPublisher) GetConnection() *amqp091.Connection {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package destrabbitmq_test

import (
"context"
"testing"

"github.com/hookdeck/outpost/internal/destregistry/providers/destrabbitmq"
Expand Down Expand Up @@ -221,3 +222,83 @@ func toStringMap(table amqp091.Table) map[string]string {
}
return result
}

// TestRabbitMQPublisher_ConnectionErrors tests that connection errors (connection refused, DNS failures)
// return a Delivery object alongside the error, NOT nil.
//
// This is important because the messagehandler uses the presence of a Delivery object to distinguish
// between "pre-delivery errors" (system issues) and "delivery errors" (destination issues):
// - nil delivery + error → PreDeliveryError → nack → DLQ
// - delivery + error → DeliveryError → ack + retry
//
// Connection errors are destination-level failures and should trigger retries, not go to DLQ.
// See: https://github.com/hookdeck/outpost/issues/571
func TestRabbitMQPublisher_ConnectionErrors(t *testing.T) {
t.Parallel()

tests := []struct {
name string
serverURL string
description string
expectedCode string
}{
{
name: "connection refused",
serverURL: "127.0.0.1:1", // Port 1 is typically not listening
description: "simulates a server that is not running",
expectedCode: "connection_refused",
},
{
name: "DNS failure",
serverURL: "this-domain-does-not-exist-abc123xyz.invalid:5672",
description: "simulates an invalid/non-existent domain",
expectedCode: "dns_error",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

provider, err := destrabbitmq.New(testutil.Registry.MetadataLoader(), nil)
require.NoError(t, err)

destination := testutil.DestinationFactory.Any(
testutil.DestinationFactory.WithType("rabbitmq"),
testutil.DestinationFactory.WithConfig(map[string]string{
"server_url": tt.serverURL,
"exchange": "test-exchange",
}),
testutil.DestinationFactory.WithCredentials(map[string]string{
"username": "guest",
"password": "guest",
}),
)

publisher, err := provider.CreatePublisher(context.Background(), &destination)
require.NoError(t, err)
defer publisher.Close()

event := testutil.EventFactory.Any(
testutil.EventFactory.WithData(map[string]interface{}{"key": "value"}),
)

// Attempt to publish to unreachable endpoint
delivery, err := publisher.Publish(context.Background(), &event)

// Should return an error
require.Error(t, err, "should return error for %s", tt.description)

// CRITICAL: Should return a Delivery object, NOT nil
// This ensures the error is treated as a DeliveryError (retryable)
// rather than a PreDeliveryError (goes to DLQ)
require.NotNil(t, delivery, "delivery should NOT be nil for connection errors - "+
"returning nil causes messagehandler to treat this as PreDeliveryError (nack → DLQ) "+
"instead of DeliveryError (ack + retry)")

// Verify the delivery has appropriate status and code
assert.Equal(t, "failed", delivery.Status, "delivery status should be 'failed'")
assert.Equal(t, tt.expectedCode, delivery.Code, "delivery code should indicate error type")
})
}
}
73 changes: 2 additions & 71 deletions internal/destregistry/providers/destwebhook/destwebhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"regexp"
"strings"
Expand Down Expand Up @@ -595,53 +594,8 @@ func (p *WebhookPublisher) Publish(ctx context.Context, event *models.Event) (*d
return nil, err
}

resp, err := p.httpClient.Do(httpReq)
if err != nil {
return nil, destregistry.NewErrDestinationPublishAttempt(err, "webhook", map[string]interface{}{
"error": "request_failed",
"message": err.Error(),
})
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
delivery := &destregistry.Delivery{
Status: "failed",
Code: fmt.Sprintf("%d", resp.StatusCode),
}
parseResponse(delivery, resp)

// Extract body from delivery.Response for error details
var bodyStr string
if delivery.Response != nil {
if body, ok := delivery.Response["body"]; ok {
switch v := body.(type) {
case string:
bodyStr = v
case map[string]interface{}:
if jsonBytes, err := json.Marshal(v); err == nil {
bodyStr = string(jsonBytes)
}
}
}
}

return delivery, destregistry.NewErrDestinationPublishAttempt(
fmt.Errorf("request failed with status %d: %s", resp.StatusCode, bodyStr),
"webhook",
map[string]interface{}{
"status": resp.StatusCode,
"body": bodyStr,
})
}

delivery := &destregistry.Delivery{
Status: "success",
Code: fmt.Sprintf("%d", resp.StatusCode),
}
parseResponse(delivery, resp)

return delivery, nil
result := ExecuteHTTPRequest(ctx, p.httpClient, httpReq, "webhook")
return result.Delivery, result.Error
}

// Format is a helper function to format the event data into an HTTP request.
Expand Down Expand Up @@ -748,26 +702,3 @@ func isTruthy(value string) bool {
return false
}
}

func parseResponse(delivery *destregistry.Delivery, resp *http.Response) {
if strings.Contains(resp.Header.Get("Content-Type"), "application/json") {
bodyBytes, _ := io.ReadAll(resp.Body)
var response map[string]interface{}
if err := json.Unmarshal(bodyBytes, &response); err != nil {
delivery.Response = map[string]interface{}{
"status": resp.StatusCode,
"body": string(bodyBytes),
}
}
delivery.Response = map[string]interface{}{
"status": resp.StatusCode,
"body": response,
}
} else {
bodyBytes, _ := io.ReadAll(resp.Body)
delivery.Response = map[string]interface{}{
"status": resp.StatusCode,
"body": string(bodyBytes),
}
}
}
Loading