Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/google-pubsub-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
CONVOY_REDIS_PORT: 6379

- name: Run PubSub E2E tests (testcontainers)
run: go test -v ./e2e/... -run TestE2E_GooglePubSub
run: go test -race -v ./e2e/... -run TestE2E_GooglePubSub -timeout 30m
env:
TEST_LICENSE_KEY: ${{ secrets.CONVOY_TEST_LICENSE_KEY }}
CONVOY_LICENSE_KEY: ${{ secrets.CONVOY_TEST_LICENSE_KEY }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/kafka-pubsub-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
CONVOY_REDIS_PORT: 6379

- name: Run PubSub E2E tests (testcontainers)
run: go test -v ./e2e/... -run TestE2E_Kafka
run: go test -race -v ./e2e/... -run TestE2E_Kafka -timeout 30m
env:
TEST_LICENSE_KEY: ${{ secrets.CONVOY_TEST_LICENSE_KEY }}
CONVOY_LICENSE_KEY: ${{ secrets.CONVOY_TEST_LICENSE_KEY }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rabbitmq-pubsub-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
CONVOY_REDIS_PORT: 6379

- name: Run PubSub E2E tests (testcontainers)
run: go test -v ./e2e/... -run TestE2E_AMQP
run: go test -race -v ./e2e/... -run TestE2E_AMQP -timeout 30m
env:
TEST_LICENSE_KEY: ${{ secrets.CONVOY_TEST_LICENSE_KEY }}
CONVOY_LICENSE_KEY: ${{ secrets.CONVOY_TEST_LICENSE_KEY }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sqs-pubsub-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
CONVOY_REDIS_PORT: 6379

- name: Run PubSub E2E tests (testcontainers)
run: go test -v ./e2e/... -run TestE2E_SQS
run: go test -race -v ./e2e/... -run TestE2E_SQS -timeout 30m
env:
TEST_LICENSE_KEY: ${{ secrets.CONVOY_TEST_LICENSE_KEY }}
CONVOY_LICENSE_KEY: ${{ secrets.CONVOY_TEST_LICENSE_KEY }}
Expand Down
60 changes: 30 additions & 30 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ build:

.PHONY: test
test:
@go test -p 1 $(shell go list ./... | grep -v '/e2e')
@go test -race -p 1 $(shell go list ./... | grep -v '/e2e') -timeout 30m

# Get Docker socket from active context if DOCKER_HOST is not set
DOCKER_HOST_VAL := $(or $(DOCKER_HOST),$(shell docker context inspect --format '{{.Endpoints.docker.Host}}' 2>/dev/null || echo ""))
Expand All @@ -31,34 +31,34 @@ test_e2e_fast:
@echo "Using docker context: $(DOCKER_CONTEXT) (DOCKER_HOST=$(DOCKER_HOST_VAL))"
@echo "Running Fast E2E tests (non-pubsub)..."
@echo "Running Direct Event tests..."
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_DirectEvent_AllSubscriptions -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_DirectEvent_MustMatchSubscription -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_DirectEvent_AllSubscriptions
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_DirectEvent_MustMatchSubscription
@echo "Running Fanout Event tests..."
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_FanOutEvent_AllSubscriptions -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_FanOutEvent_MustMatchSubscription -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_FanOutEvent_AllSubscriptions
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_FanOutEvent_MustMatchSubscription
@echo "Running Form Endpoint tests..."
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_FormEndpoint_ContentType -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_FormEndpoint_WithCustomHeaders -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_FormEndpoint_ContentType
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_FormEndpoint_WithCustomHeaders
@echo "Running OAuth2 tests..."
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_OAuth2_SharedSecret -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_OAuth2_ClientAssertion -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_OAuth2_SharedSecret
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_OAuth2_ClientAssertion
@echo "Running Job ID tests..."
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_SingleEvent_JobID_Format -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_SingleEvent_JobID_Deduplication -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_FanoutEvent_JobID_Format -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_FanoutEvent_MultipleOwners -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_BroadcastEvent_JobID_Format -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_BroadcastEvent_AllSubscribers -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_DynamicEvent_JobID_Format -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_DynamicEvent_MultipleEventTypes -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_ReplayEvent_JobID_Format -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_ReplayEvent_MultipleReplays -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_SingleEvent_JobID_Format
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_SingleEvent_JobID_Deduplication
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_FanoutEvent_JobID_Format
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_FanoutEvent_MultipleOwners
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_BroadcastEvent_JobID_Format
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_BroadcastEvent_AllSubscribers
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_DynamicEvent_JobID_Format
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_DynamicEvent_MultipleEventTypes
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_ReplayEvent_JobID_Format
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_ReplayEvent_MultipleReplays
@echo "Running Backup tests..."
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_BackupProjectData_MinIO -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_BackupProjectData_OnPrem -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_BackupProjectData_MultiTenant -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_BackupProjectData_TimeFiltering -timeout 2m
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_BackupProjectData_AllTables -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_BackupProjectData_MinIO
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_BackupProjectData_OnPrem
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_BackupProjectData_MultiTenant
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_BackupProjectData_TimeFiltering
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_BackupProjectData_AllTables
@echo "✅ Fast E2E tests passed!"

# Slow PubSub/Message Broker tests - Run daily (60+ minutes)
Expand All @@ -67,13 +67,13 @@ test_e2e_pubsub:
@echo "Using docker context: $(DOCKER_CONTEXT) (DOCKER_HOST=$(DOCKER_HOST_VAL))"
@echo "Running PubSub/Message Broker E2E tests..."
@echo "Running AMQP PubSub tests..."
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_AMQP -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_AMQP -timeout 30m
@echo "Running SQS PubSub tests..."
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_SQS -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_SQS -timeout 30m
@echo "Running Kafka PubSub tests..."
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_Kafka -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_Kafka -timeout 30m
@echo "Running Google Pub/Sub tests..."
@$(TEST_ENV) go test -v ./e2e/... -run TestE2E_GooglePubSub -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run TestE2E_GooglePubSub -timeout 30m
@echo "✅ All PubSub E2E tests passed!"

# Original test_e2e - runs ALL tests (for local comprehensive testing)
Expand All @@ -84,13 +84,13 @@ test_e2e: test_e2e_fast test_e2e_pubsub
# Run all E2E tests together (may be flaky, use test_e2e for CI)
test_e2e_all:
@echo "Using docker context: $(DOCKER_CONTEXT) (DOCKER_HOST=$(DOCKER_HOST_VAL))"
@$(TEST_ENV) go test -v ./e2e/... -timeout 10m
@$(TEST_ENV) go test -race -v ./e2e/...

# Run a specific E2E test
# Usage: make test_e2e_single TEST=TestE2E_DirectEvent_AllSubscriptions
test_e2e_single:
@echo "Using docker context: $(DOCKER_CONTEXT) (DOCKER_HOST=$(DOCKER_HOST_VAL))"
@$(TEST_ENV) go test -v ./e2e/... -run $(TEST) -timeout 2m
@$(TEST_ENV) go test -race -v ./e2e/... -run $(TEST)

generate_migration_time:
@date +"%Y%m%d%H%M%S"
Expand Down
6 changes: 2 additions & 4 deletions cmd/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ func PreRun(app *cli.App, db *postgres.Postgres) func(cmd *cobra.Command, args [
}
lo.SetLevel(lvl)

lo.Debugf("redis dsn: %s", cfg.Redis.BuildDsn())
lo.Debugf("postgres dsn: %s", cfg.Database.BuildDsn())

postgresDB, err := postgres.NewDB(cfg)
if err != nil {
return errors.New("failed to connect to postgres with err: " + err.Error())
Expand Down Expand Up @@ -950,7 +947,8 @@ func ensureDefaultUser(ctx context.Context, a *cli.App) error {
return fmt.Errorf("failed to create user - %w", err)
}

a.Logger.Infof("Created Superuser with username: %s and password: %s", defaultUser.Email, p.Plaintext)
a.Logger.Infof("Created Superuser with username: %s", defaultUser.Email)
fmt.Printf("Superuser created successfully:\n Username: %s\n Password: %s\n", defaultUser.Email, p.Plaintext)

return nil
}
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/memorystore/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func (t *Table) deleteInternal(key Key) {
}

func (t *Table) GetKeys() []Key {
t.mu.RLock()
defer t.mu.RUnlock()

keys := make([]Key, 0, len(t.rows))
for k := range t.rows {
keys = append(keys, k)
Expand Down
68 changes: 56 additions & 12 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"strings"
"sync"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -146,63 +147,104 @@ func NewContext(ctx context.Context, lo StdLogger, fields Fields) context.Contex

// Logger logs message to io.Writer at various log levels.
type Logger struct {
mu sync.RWMutex
logger *logrus.Logger
entry *logrus.Entry
}

func (l *Logger) Debug(args ...interface{}) {
l.entry.Debug(args...)
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Debug(args...)
}

func (l *Logger) Info(args ...interface{}) {
l.entry.Info(args...)
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Info(args...)
}

func (l *Logger) Warn(args ...interface{}) {
l.entry.Warn(args...)
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Warn(args...)
}

func (l *Logger) Error(args ...interface{}) {
l.entry.Error(args...)
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Error(args...)
}

func (l *Logger) Fatal(args ...interface{}) {
l.entry.Fatal(args...)
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Fatal(args...)
}

func (l *Logger) Debugf(format string, args ...interface{}) {
l.entry.Debug(fmt.Sprintf(format, args...))
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Debug(fmt.Sprintf(format, args...))
}

func (l *Logger) Infof(format string, args ...interface{}) {
l.entry.Info(fmt.Sprintf(format, args...))
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Info(fmt.Sprintf(format, args...))
}

func (l *Logger) Warnf(format string, args ...interface{}) {
l.entry.Warn(fmt.Sprintf(format, args...))
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Warn(fmt.Sprintf(format, args...))
}

func (l *Logger) Errorf(format string, args ...interface{}) {
l.entry.Error(fmt.Sprintf(format, args...))
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Error(fmt.Sprintf(format, args...))
}

func (l *Logger) Errorln(args ...interface{}) {
l.entry.Errorln(args...)
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Errorln(args...)
}

func (l *Logger) WithFields(f Fields) *logrus.Entry {
l.mu.RLock()
defer l.mu.RUnlock()
return l.entry.WithFields(f)
}

func (l *Logger) Printf(format string, args ...interface{}) {
l.entry.Printf(format, args...)
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Printf(format, args...)
}

func (l *Logger) Fatalf(format string, args ...interface{}) {
l.entry.Fatal(fmt.Sprintf(format, args...))
l.mu.RLock()
entry := l.entry
l.mu.RUnlock()
entry.Fatal(fmt.Sprintf(format, args...))
}

func (l *Logger) WithError(err error) *logrus.Entry {
l.mu.RLock()
defer l.mu.RUnlock()
return l.entry.WithError(err)
}

Expand All @@ -223,6 +265,8 @@ func (l *Logger) SetLevel(v Level) {

// SetPrefix sets logger fields
func (l *Logger) SetPrefix(value interface{}) {
l.mu.Lock()
defer l.mu.Unlock()
l.entry = l.entry.WithField("source", value)
}

Expand Down
Loading