diff --git a/Makefile b/Makefile index 9035353f06..df204fc110 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ build: .PHONY: test test: - @go test -race -p 1 $(shell go list ./... | grep -v '/e2e') -timeout 30m + @go test -race -p 1 $(shell go list ./... | grep -v '/e2e') -v -timeout 30m # Get Docker socket from active context if DOCKER_HOST is not set DOCKER_HOST_VAL := $(or $(DOCKER_HOST),$(shell docker context inspect --format '{{.Endpoints.docker.Host}}' 2>/dev/null || echo "")) diff --git a/api/events_integration_test.go b/api/events_integration_test.go new file mode 100644 index 0000000000..306c13e437 --- /dev/null +++ b/api/events_integration_test.go @@ -0,0 +1,241 @@ +package api + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/oklog/ulid/v2" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/frain-dev/convoy/api/testdb" + "github.com/frain-dev/convoy/database" + "github.com/frain-dev/convoy/database/postgres" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/internal/pkg/metrics" + "github.com/frain-dev/convoy/pkg/httpheader" +) + +type EventsIntegrationTestSuite struct { + suite.Suite + DB database.Database + ConvoyApp *ApplicationHandler + DefaultProject *datastore.Project + DefaultUser *datastore.User +} + +func (s *EventsIntegrationTestSuite) SetupSuite() { + s.ConvoyApp = buildServer(s.T()) +} + +func (s *EventsIntegrationTestSuite) SetupTest() { + var err error + + s.DB = s.ConvoyApp.A.DB + + // Seed default user + s.DefaultUser, err = testdb.SeedDefaultUser(s.DB) + require.NoError(s.T(), err) + + // Seed default organisation + org, err := testdb.SeedDefaultOrganisation(s.DB, s.DefaultUser) + require.NoError(s.T(), err) + + // Seed default project + s.DefaultProject, err = testdb.SeedDefaultProject(s.DB, org.UID) + require.NoError(s.T(), err) +} + +func (s *EventsIntegrationTestSuite) TearDownTest() { + metrics.Reset() +} + +// Test_LoadEventsPaged_WithoutEndpoints tests that events without endpoint associations +// are visible in the event log when no endpoint filter is applied. +func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithoutEndpoints() { + ctx := context.Background() + eventRepo := postgres.NewEventRepo(s.DB) + + data := json.RawMessage(`{"test": "data"}`) + + // Create an event with no endpoints (simulating an event ingested for a source with no subscriptions) + event := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-no-endpoints", + Endpoints: []string{}, // Empty endpoints array + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err := eventRepo.CreateEvent(ctx, event) + require.NoError(s.T(), err) + + // Update status to Failure (events without subscriptions get Failure status) + err = eventRepo.UpdateEventStatus(ctx, event, datastore.FailureStatus) + require.NoError(s.T(), err) + + // Query without endpoint filter - should return the event + events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 1, len(events)) + require.Equal(s.T(), event.UID, events[0].UID) + require.Equal(s.T(), datastore.FailureStatus, events[0].Status) +} + +// Test_LoadEventsPaged_WithEndpointFilter tests that: +// 1. When filtering by endpoint, only events with that endpoint are returned +// 2. When not filtering, both events with and without endpoints are returned +func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithEndpointFilter() { + ctx := context.Background() + eventRepo := postgres.NewEventRepo(s.DB) + + // Create an endpoint + endpoint, err := testdb.SeedEndpoint(s.DB, s.DefaultProject, "", "", "", false, datastore.ActiveEndpointStatus) + require.NoError(s.T(), err) + + data := json.RawMessage(`{"test": "data"}`) + + // Create event with endpoint + eventWithEndpoint := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-with-endpoint", + Endpoints: []string{endpoint.UID}, + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err = eventRepo.CreateEvent(ctx, eventWithEndpoint) + require.NoError(s.T(), err) + + // Create event without endpoint + eventWithoutEndpoint := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-without-endpoint", + Endpoints: []string{}, + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err = eventRepo.CreateEvent(ctx, eventWithoutEndpoint) + require.NoError(s.T(), err) + + // Update status to Failure (events without subscriptions get Failure status) + err = eventRepo.UpdateEventStatus(ctx, eventWithoutEndpoint, datastore.FailureStatus) + require.NoError(s.T(), err) + + // Query with endpoint filter - should only return event with matching endpoint + events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + EndpointID: endpoint.UID, + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 1, len(events)) + require.Equal(s.T(), eventWithEndpoint.UID, events[0].UID) + + // Query without endpoint filter - should return both events + events, _, err = eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 2, len(events)) +} + +// Test_LoadEventsPaged_SearchWithoutEndpoints tests that events without endpoints +// are included in search results +func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints() { + ctx := context.Background() + eventRepo := postgres.NewEventRepo(s.DB) + + data := json.RawMessage(`{"unique_search_term": "test12345"}`) + + // Create an event with no endpoints but searchable content + event := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-searchable", + Endpoints: []string{}, + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err := eventRepo.CreateEvent(ctx, event) + require.NoError(s.T(), err) + + // Update status to Failure (events without subscriptions get Failure status) + err = eventRepo.UpdateEventStatus(ctx, event, datastore.FailureStatus) + require.NoError(s.T(), err) + + // Copy to search table for text search + err = eventRepo.CopyRows(ctx, s.DefaultProject.UID, 1) + require.NoError(s.T(), err) + + // Search for the event - should find it despite no endpoints + events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + Query: "unique_search_term", + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 1, len(events)) + require.Equal(s.T(), event.UID, events[0].UID) + // Note: events_search table doesn't have metadata or status columns, + // so we don't check status for search results +} + +func TestEventsIntegrationTestSuite(t *testing.T) { + suite.Run(t, new(EventsIntegrationTestSuite)) +} diff --git a/database/postgres/event.go b/database/postgres/event.go index 63e0a1c357..2c138caced 100644 --- a/database/postgres/event.go +++ b/database/postgres/event.go @@ -97,34 +97,35 @@ const ( baseEventsPaged = ` with endpoint_ids as (select id from convoy.endpoints where owner_id = :owner_id), events as ( SELECT ev.id, ev.project_id, - ev.id AS event_type, ev.is_duplicate_event, + ev.event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(idempotency_key, '') AS idempotency_key, COALESCE(url_query_params, '') AS url_query_params, - ev.updated_at, ev.deleted_at,ev.acknowledged_at, + ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status, COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events ev LEFT JOIN convoy.events_endpoints ee ON ee.event_id = ev.id - JOIN endpoint_ids e ON e.id = ee.endpoint_id + LEFT JOIN endpoint_ids e ON e.id = ee.endpoint_id LEFT JOIN convoy.sources s ON s.id = ev.source_id WHERE ev.deleted_at IS NULL` baseEventsSearch = ` + with events as ( SELECT ev.id, ev.project_id, - ev.id AS event_type, ev.is_duplicate_event, + ev.event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(idempotency_key, '') AS idempotency_key, COALESCE(url_query_params, '') AS url_query_params, - ev.updated_at, ev.deleted_at, + ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status, COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events_search ev LEFT JOIN convoy.events_endpoints ee ON ee.event_id = ev.id LEFT JOIN convoy.sources s ON s.id = ev.source_id - JOIN convoy.endpoints e ON e.id = ee.endpoint_id + LEFT JOIN convoy.endpoints e ON e.id = ee.endpoint_id WHERE ev.deleted_at IS NULL` baseEventsPagedForward = ` @@ -181,11 +182,11 @@ const ( // EXISTS path: no GROUP BY, uses idx_events_project_created_pagination when filter.Query is empty baseEventsPagedExists = ` - SELECT ev.id, ev.project_id, ev.id AS event_type, ev.is_duplicate_event, + SELECT ev.id, ev.project_id, ev.event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(ev.idempotency_key, '') AS idempotency_key, COALESCE(ev.url_query_params, '') AS url_query_params, - ev.updated_at, ev.deleted_at, ev.acknowledged_at, + ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status, COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events ev LEFT JOIN convoy.sources s ON s.id = ev.source_id @@ -534,7 +535,14 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte } else { suffix = getExistsBackwardSuffix(sortOrder) } - query = baseEventsPagedExists + existsSubquery + ") " + filterQueryNoEndpoint + suffix + + // If no EXISTS subquery, don't add EXISTS clause + if existsSubquery == "" { + // Remove " AND EXISTS (" from baseEventsPagedExists (13 characters) + query = baseEventsPagedExists[:len(baseEventsPagedExists)-13] + filterQueryNoEndpoint + suffix + } else { + query = baseEventsPagedExists + existsSubquery + ") " + filterQueryNoEndpoint + suffix + } } else { // Search or legacy path: CTE + JOIN + GROUP BY. base := baseEventsPaged @@ -568,7 +576,7 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte if filter.Pageable.Direction == datastore.Prev { preOrder = reverseOrder(preOrder) } - query = fmt.Sprintf(baseQueryPagination, base, filterQuery, preOrder, filter.Pageable.SortOrder()) + query = fmt.Sprintf(baseQueryPagination, base, filterQuery, preOrder, preOrder, filter.Pageable.SortOrder(), filter.Pageable.SortOrder()) } query, args, err = sqlx.Named(query, arg) @@ -761,7 +769,13 @@ func getCountDeliveriesPrevRowQuery(sortOrder string) string { // buildExistsSubquery returns the EXISTS inner query for the events list (no search). // Caller must bind :owner_id and :endpoint_ids when present. +// Returns empty string when no filters are specified to include events without endpoint associations. func buildExistsSubquery(ownerID string, endpointIDs []string) string { + // If no filters, don't require endpoint associations + if util.IsStringEmpty(ownerID) && len(endpointIDs) == 0 { + return "" + } + q := "SELECT 1 FROM convoy.events_endpoints ee JOIN convoy.endpoints e ON e.id = ee.endpoint_id WHERE ee.event_id = ev.id" if !util.IsStringEmpty(ownerID) { q += " AND e.owner_id = :owner_id" diff --git a/docs/docs.go b/docs/docs.go index ba9adc0843..832746aa9e 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1,4 +1,4 @@ -// Package docs Code generated by swaggo/swag at 2026-02-10 14:18:50.71968 +0100 CET m=+1.596704293. DO NOT EDIT +// Package docs Code generated by swaggo/swag at 2026-03-02 12:42:37.072763 +0100 CET m=+2.387421626. DO NOT EDIT package docs import "github.com/swaggo/swag" diff --git a/sql/1772451263.sql b/sql/1772451263.sql new file mode 100644 index 0000000000..e406b1dd47 --- /dev/null +++ b/sql/1772451263.sql @@ -0,0 +1,13 @@ +-- +migrate Up +SET lock_timeout = '2s'; +SET statement_timeout = '30s'; +-- Add columns to events_search to match events table schema +-- These columns were added to events table but not to events_search, causing queries to fail +ALTER TABLE convoy.events_search ADD COLUMN IF NOT EXISTS acknowledged_at TIMESTAMPTZ DEFAULT NULL; +ALTER TABLE convoy.events_search ADD COLUMN IF NOT EXISTS status TEXT DEFAULT NULL; +ALTER TABLE convoy.events_search ADD COLUMN IF NOT EXISTS metadata TEXT DEFAULT NULL; + +-- +migrate Down +ALTER TABLE convoy.events_search DROP COLUMN IF EXISTS acknowledged_at; +ALTER TABLE convoy.events_search DROP COLUMN IF EXISTS status; +ALTER TABLE convoy.events_search DROP COLUMN IF EXISTS metadata; diff --git a/sql/1772462842.sql b/sql/1772462842.sql new file mode 100644 index 0000000000..87d45162b0 --- /dev/null +++ b/sql/1772462842.sql @@ -0,0 +1,59 @@ +-- +migrate Up +SET lock_timeout = '2s'; +SET statement_timeout = '30s'; +-- Update copy_rows function to copy new columns (acknowledged_at, status, metadata) +-- that were added to both events and events_search tables in earlier migrations +-- +migrate StatementBegin +CREATE OR REPLACE FUNCTION convoy.copy_rows(pid VARCHAR, dur INTEGER) RETURNS VOID AS +$$ +DECLARE + cs CURSOR FOR + SELECT * FROM convoy.events + WHERE project_id = pid + AND created_at >= NOW() - MAKE_INTERVAL(hours := dur); + row_data RECORD; +BEGIN + OPEN cs; + LOOP + FETCH cs INTO row_data; + EXIT WHEN NOT FOUND; + INSERT INTO convoy.events_search (id, event_type, endpoints, project_id, source_id, headers, raw, data, + created_at, updated_at, deleted_at, url_query_params, idempotency_key, + is_duplicate_event, acknowledged_at, status, metadata) + VALUES (row_data.id, row_data.event_type, row_data.endpoints, row_data.project_id, row_data.source_id, + row_data.headers, row_data.raw, row_data.data, row_data.created_at, row_data.updated_at, + row_data.deleted_at, row_data.url_query_params, row_data.idempotency_key, row_data.is_duplicate_event, + row_data.acknowledged_at, row_data.status, row_data.metadata); + END LOOP; + CLOSE cs; +END; +$$ LANGUAGE plpgsql; +-- +migrate StatementEnd + +-- +migrate Down +-- Revert copy_rows function to not include new columns +-- +migrate StatementBegin +CREATE OR REPLACE FUNCTION convoy.copy_rows(pid VARCHAR, dur INTEGER) RETURNS VOID AS +$$ +DECLARE + cs CURSOR FOR + SELECT * FROM convoy.events + WHERE project_id = pid + AND created_at >= NOW() - MAKE_INTERVAL(hours := dur); + row_data RECORD; +BEGIN + OPEN cs; + LOOP + FETCH cs INTO row_data; + EXIT WHEN NOT FOUND; + INSERT INTO convoy.events_search (id, event_type, endpoints, project_id, source_id, headers, raw, data, + created_at, updated_at, deleted_at, url_query_params, idempotency_key, + is_duplicate_event) + VALUES (row_data.id, row_data.event_type, row_data.endpoints, row_data.project_id, row_data.source_id, + row_data.headers, row_data.raw, row_data.data, row_data.created_at, row_data.updated_at, + row_data.deleted_at, row_data.url_query_params, row_data.idempotency_key, row_data.is_duplicate_event); + END LOOP; + CLOSE cs; +END; +$$ LANGUAGE plpgsql; +-- +migrate StatementEnd