Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ build:

.PHONY: test
test:
@go test -race -p 1 $(shell go list ./... | grep -v '/e2e') -timeout 30m
@go test -race -p 1 $(shell go list ./... | grep -v '/e2e') -v -timeout 30m

# Get Docker socket from active context if DOCKER_HOST is not set
DOCKER_HOST_VAL := $(or $(DOCKER_HOST),$(shell docker context inspect --format '{{.Endpoints.docker.Host}}' 2>/dev/null || echo ""))
Expand Down
241 changes: 241 additions & 0 deletions api/events_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package api

import (
"context"
"encoding/json"
"testing"
"time"

"github.com/oklog/ulid/v2"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/frain-dev/convoy/api/testdb"
"github.com/frain-dev/convoy/database"
"github.com/frain-dev/convoy/database/postgres"
"github.com/frain-dev/convoy/datastore"
"github.com/frain-dev/convoy/internal/pkg/metrics"
"github.com/frain-dev/convoy/pkg/httpheader"
)

type EventsIntegrationTestSuite struct {
suite.Suite
DB database.Database
ConvoyApp *ApplicationHandler
DefaultProject *datastore.Project
DefaultUser *datastore.User
}

func (s *EventsIntegrationTestSuite) SetupSuite() {
s.ConvoyApp = buildServer(s.T())
}

func (s *EventsIntegrationTestSuite) SetupTest() {
var err error

s.DB = s.ConvoyApp.A.DB

// Seed default user
s.DefaultUser, err = testdb.SeedDefaultUser(s.DB)
require.NoError(s.T(), err)

// Seed default organisation
org, err := testdb.SeedDefaultOrganisation(s.DB, s.DefaultUser)
require.NoError(s.T(), err)

// Seed default project
s.DefaultProject, err = testdb.SeedDefaultProject(s.DB, org.UID)
require.NoError(s.T(), err)
}

func (s *EventsIntegrationTestSuite) TearDownTest() {
metrics.Reset()
}

// Test_LoadEventsPaged_WithoutEndpoints tests that events without endpoint associations
// are visible in the event log when no endpoint filter is applied.
func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithoutEndpoints() {
ctx := context.Background()
eventRepo := postgres.NewEventRepo(s.DB)

data := json.RawMessage(`{"test": "data"}`)

// Create an event with no endpoints (simulating an event ingested for a source with no subscriptions)
event := &datastore.Event{
UID: ulid.Make().String(),
EventType: "test-event-no-endpoints",
Endpoints: []string{}, // Empty endpoints array
ProjectID: s.DefaultProject.UID,
Headers: httpheader.HTTPHeader{},
Raw: string(data),
Data: data,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}

err := eventRepo.CreateEvent(ctx, event)
require.NoError(s.T(), err)

// Update status to Failure (events without subscriptions get Failure status)
err = eventRepo.UpdateEventStatus(ctx, event, datastore.FailureStatus)
require.NoError(s.T(), err)

// Query without endpoint filter - should return the event
events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{
SearchParams: datastore.SearchParams{
CreatedAtStart: time.Now().Add(-time.Hour).Unix(),
CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(),
},
Pageable: datastore.Pageable{
PerPage: 10,
Direction: datastore.Next,
NextCursor: datastore.DefaultCursor,
},
})

require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(events))
require.Equal(s.T(), event.UID, events[0].UID)
require.Equal(s.T(), datastore.FailureStatus, events[0].Status)
}

// Test_LoadEventsPaged_WithEndpointFilter tests that:
// 1. When filtering by endpoint, only events with that endpoint are returned
// 2. When not filtering, both events with and without endpoints are returned
func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithEndpointFilter() {
ctx := context.Background()
eventRepo := postgres.NewEventRepo(s.DB)

// Create an endpoint
endpoint, err := testdb.SeedEndpoint(s.DB, s.DefaultProject, "", "", "", false, datastore.ActiveEndpointStatus)
require.NoError(s.T(), err)

data := json.RawMessage(`{"test": "data"}`)

// Create event with endpoint
eventWithEndpoint := &datastore.Event{
UID: ulid.Make().String(),
EventType: "test-event-with-endpoint",
Endpoints: []string{endpoint.UID},
ProjectID: s.DefaultProject.UID,
Headers: httpheader.HTTPHeader{},
Raw: string(data),
Data: data,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}

err = eventRepo.CreateEvent(ctx, eventWithEndpoint)
require.NoError(s.T(), err)

// Create event without endpoint
eventWithoutEndpoint := &datastore.Event{
UID: ulid.Make().String(),
EventType: "test-event-without-endpoint",
Endpoints: []string{},
ProjectID: s.DefaultProject.UID,
Headers: httpheader.HTTPHeader{},
Raw: string(data),
Data: data,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}

err = eventRepo.CreateEvent(ctx, eventWithoutEndpoint)
require.NoError(s.T(), err)

// Update status to Failure (events without subscriptions get Failure status)
err = eventRepo.UpdateEventStatus(ctx, eventWithoutEndpoint, datastore.FailureStatus)
require.NoError(s.T(), err)

// Query with endpoint filter - should only return event with matching endpoint
events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{
EndpointID: endpoint.UID,
SearchParams: datastore.SearchParams{
CreatedAtStart: time.Now().Add(-time.Hour).Unix(),
CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(),
},
Pageable: datastore.Pageable{
PerPage: 10,
Direction: datastore.Next,
NextCursor: datastore.DefaultCursor,
},
})

require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(events))
require.Equal(s.T(), eventWithEndpoint.UID, events[0].UID)

// Query without endpoint filter - should return both events
events, _, err = eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{
SearchParams: datastore.SearchParams{
CreatedAtStart: time.Now().Add(-time.Hour).Unix(),
CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(),
},
Pageable: datastore.Pageable{
PerPage: 10,
Direction: datastore.Next,
NextCursor: datastore.DefaultCursor,
},
})

require.NoError(s.T(), err)
require.Equal(s.T(), 2, len(events))
}

// Test_LoadEventsPaged_SearchWithoutEndpoints tests that events without endpoints
// are included in search results
func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints() {
ctx := context.Background()
eventRepo := postgres.NewEventRepo(s.DB)

data := json.RawMessage(`{"unique_search_term": "test12345"}`)

// Create an event with no endpoints but searchable content
event := &datastore.Event{
UID: ulid.Make().String(),
EventType: "test-event-searchable",
Endpoints: []string{},
ProjectID: s.DefaultProject.UID,
Headers: httpheader.HTTPHeader{},
Raw: string(data),
Data: data,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}

err := eventRepo.CreateEvent(ctx, event)
require.NoError(s.T(), err)

// Update status to Failure (events without subscriptions get Failure status)
err = eventRepo.UpdateEventStatus(ctx, event, datastore.FailureStatus)
require.NoError(s.T(), err)

// Copy to search table for text search
err = eventRepo.CopyRows(ctx, s.DefaultProject.UID, 1)
require.NoError(s.T(), err)

// Search for the event - should find it despite no endpoints
events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{
Query: "unique_search_term",
SearchParams: datastore.SearchParams{
CreatedAtStart: time.Now().Add(-time.Hour).Unix(),
CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(),
},
Pageable: datastore.Pageable{
PerPage: 10,
Direction: datastore.Next,
NextCursor: datastore.DefaultCursor,
},
})

require.NoError(s.T(), err)
require.Equal(s.T(), 1, len(events))
require.Equal(s.T(), event.UID, events[0].UID)
// Note: events_search table doesn't have metadata or status columns,
// so we don't check status for search results
}

func TestEventsIntegrationTestSuite(t *testing.T) {
suite.Run(t, new(EventsIntegrationTestSuite))
}
34 changes: 24 additions & 10 deletions database/postgres/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,34 +97,35 @@ const (
baseEventsPaged = `
with endpoint_ids as (select id from convoy.endpoints where owner_id = :owner_id), events as (
SELECT ev.id, ev.project_id,
ev.id AS event_type, ev.is_duplicate_event,
ev.event_type, ev.is_duplicate_event,
COALESCE(ev.source_id, '') AS source_id,
ev.headers, ev.raw, ev.data, ev.created_at,
COALESCE(idempotency_key, '') AS idempotency_key,
COALESCE(url_query_params, '') AS url_query_params,
ev.updated_at, ev.deleted_at,ev.acknowledged_at,
ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status,
COALESCE(s.id, '') AS "source_metadata.id",
COALESCE(s.name, '') AS "source_metadata.name"
FROM convoy.events ev
LEFT JOIN convoy.events_endpoints ee ON ee.event_id = ev.id
JOIN endpoint_ids e ON e.id = ee.endpoint_id
LEFT JOIN endpoint_ids e ON e.id = ee.endpoint_id
LEFT JOIN convoy.sources s ON s.id = ev.source_id
WHERE ev.deleted_at IS NULL`

baseEventsSearch = `
with events as (
SELECT ev.id, ev.project_id,
ev.id AS event_type, ev.is_duplicate_event,
ev.event_type, ev.is_duplicate_event,
COALESCE(ev.source_id, '') AS source_id,
ev.headers, ev.raw, ev.data, ev.created_at,
COALESCE(idempotency_key, '') AS idempotency_key,
COALESCE(url_query_params, '') AS url_query_params,
ev.updated_at, ev.deleted_at,
ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status,
COALESCE(s.id, '') AS "source_metadata.id",
COALESCE(s.name, '') AS "source_metadata.name"
FROM convoy.events_search ev
LEFT JOIN convoy.events_endpoints ee ON ee.event_id = ev.id
LEFT JOIN convoy.sources s ON s.id = ev.source_id
JOIN convoy.endpoints e ON e.id = ee.endpoint_id
LEFT JOIN convoy.endpoints e ON e.id = ee.endpoint_id
WHERE ev.deleted_at IS NULL`

baseEventsPagedForward = `
Expand Down Expand Up @@ -181,11 +182,11 @@ const (

// EXISTS path: no GROUP BY, uses idx_events_project_created_pagination when filter.Query is empty
baseEventsPagedExists = `
SELECT ev.id, ev.project_id, ev.id AS event_type, ev.is_duplicate_event,
SELECT ev.id, ev.project_id, ev.event_type, ev.is_duplicate_event,
COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at,
COALESCE(ev.idempotency_key, '') AS idempotency_key,
COALESCE(ev.url_query_params, '') AS url_query_params,
ev.updated_at, ev.deleted_at, ev.acknowledged_at,
ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status,
COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name"
FROM convoy.events ev
LEFT JOIN convoy.sources s ON s.id = ev.source_id
Expand Down Expand Up @@ -534,7 +535,14 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte
} else {
suffix = getExistsBackwardSuffix(sortOrder)
}
query = baseEventsPagedExists + existsSubquery + ") " + filterQueryNoEndpoint + suffix

// If no EXISTS subquery, don't add EXISTS clause
if existsSubquery == "" {
// Remove " AND EXISTS (" from baseEventsPagedExists (13 characters)
query = baseEventsPagedExists[:len(baseEventsPagedExists)-13] + filterQueryNoEndpoint + suffix
} else {
query = baseEventsPagedExists + existsSubquery + ") " + filterQueryNoEndpoint + suffix
}
} else {
// Search or legacy path: CTE + JOIN + GROUP BY.
base := baseEventsPaged
Expand Down Expand Up @@ -568,7 +576,7 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte
if filter.Pageable.Direction == datastore.Prev {
preOrder = reverseOrder(preOrder)
}
query = fmt.Sprintf(baseQueryPagination, base, filterQuery, preOrder, filter.Pageable.SortOrder())
query = fmt.Sprintf(baseQueryPagination, base, filterQuery, preOrder, preOrder, filter.Pageable.SortOrder(), filter.Pageable.SortOrder())
}

query, args, err = sqlx.Named(query, arg)
Expand Down Expand Up @@ -761,7 +769,13 @@ func getCountDeliveriesPrevRowQuery(sortOrder string) string {

// buildExistsSubquery returns the EXISTS inner query for the events list (no search).
// Caller must bind :owner_id and :endpoint_ids when present.
// Returns empty string when no filters are specified to include events without endpoint associations.
func buildExistsSubquery(ownerID string, endpointIDs []string) string {
// If no filters, don't require endpoint associations
if util.IsStringEmpty(ownerID) && len(endpointIDs) == 0 {
return ""
}

q := "SELECT 1 FROM convoy.events_endpoints ee JOIN convoy.endpoints e ON e.id = ee.endpoint_id WHERE ee.event_id = ev.id"
if !util.IsStringEmpty(ownerID) {
q += " AND e.owner_id = :owner_id"
Expand Down
2 changes: 1 addition & 1 deletion docs/docs.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package docs Code generated by swaggo/swag at 2026-02-10 14:18:50.71968 +0100 CET m=+1.596704293. DO NOT EDIT
// Package docs Code generated by swaggo/swag at 2026-03-02 12:42:37.072763 +0100 CET m=+2.387421626. DO NOT EDIT
package docs

import "github.com/swaggo/swag"
Expand Down
13 changes: 13 additions & 0 deletions sql/1772451263.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +migrate Up
SET lock_timeout = '2s';
SET statement_timeout = '30s';
-- Add columns to events_search to match events table schema
-- These columns were added to events table but not to events_search, causing queries to fail
ALTER TABLE convoy.events_search ADD COLUMN IF NOT EXISTS acknowledged_at TIMESTAMPTZ DEFAULT NULL;
ALTER TABLE convoy.events_search ADD COLUMN IF NOT EXISTS status TEXT DEFAULT NULL;
ALTER TABLE convoy.events_search ADD COLUMN IF NOT EXISTS metadata TEXT DEFAULT NULL;

-- +migrate Down
ALTER TABLE convoy.events_search DROP COLUMN IF EXISTS acknowledged_at;
ALTER TABLE convoy.events_search DROP COLUMN IF EXISTS status;
ALTER TABLE convoy.events_search DROP COLUMN IF EXISTS metadata;
Loading
Loading