From 344bd2a613cd62ea3908654921bb563eada43c35 Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Mon, 23 Feb 2026 15:42:59 +0100 Subject: [PATCH 1/7] Add integration tests for event pagination and update event queries to handle missing endpoints. --- api/events_integration_test.go | 230 +++++++++++++++++++++++++++++++++ database/postgres/event.go | 19 ++- docs/docs.go | 2 +- 3 files changed, 247 insertions(+), 4 deletions(-) create mode 100644 api/events_integration_test.go diff --git a/api/events_integration_test.go b/api/events_integration_test.go new file mode 100644 index 0000000000..83c10ecfcb --- /dev/null +++ b/api/events_integration_test.go @@ -0,0 +1,230 @@ +package api + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/oklog/ulid/v2" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/frain-dev/convoy/api/testdb" + "github.com/frain-dev/convoy/database" + "github.com/frain-dev/convoy/database/postgres" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/internal/pkg/metrics" + "github.com/frain-dev/convoy/pkg/httpheader" +) + +type EventsIntegrationTestSuite struct { + suite.Suite + DB database.Database + ConvoyApp *ApplicationHandler + DefaultProject *datastore.Project + DefaultUser *datastore.User +} + +func (s *EventsIntegrationTestSuite) SetupSuite() { + s.ConvoyApp = buildServer(s.T()) +} + +func (s *EventsIntegrationTestSuite) SetupTest() { + var err error + + s.DB = s.ConvoyApp.A.DB + + // Seed default user + s.DefaultUser, err = testdb.SeedDefaultUser(s.DB) + require.NoError(s.T(), err) + + // Seed default organisation + org, err := testdb.SeedDefaultOrganisation(s.DB, s.DefaultUser) + require.NoError(s.T(), err) + + // Seed default project + s.DefaultProject, err = testdb.SeedDefaultProject(s.DB, org.UID) + require.NoError(s.T(), err) +} + +func (s *EventsIntegrationTestSuite) TearDownTest() { + metrics.Reset() +} + +// Test_LoadEventsPaged_WithoutEndpoints tests that events without endpoint associations +// are visible in the event log when no endpoint filter is applied. +func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithoutEndpoints() { + ctx := context.Background() + eventRepo := postgres.NewEventRepo(s.DB) + + data := json.RawMessage([]byte(`{"test": "data"}`)) + + // Create an event with no endpoints (simulating an event ingested for a source with no subscriptions) + event := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-no-endpoints", + Endpoints: []string{}, // Empty endpoints array + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + Status: datastore.FailureStatus, // Events without subscriptions get Failure status + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err := eventRepo.CreateEvent(ctx, event) + require.NoError(s.T(), err) + + // Query without endpoint filter - should return the event + events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 1, len(events)) + require.Equal(s.T(), event.UID, events[0].UID) + require.Equal(s.T(), datastore.FailureStatus, events[0].Status) +} + +// Test_LoadEventsPaged_WithEndpointFilter tests that: +// 1. When filtering by endpoint, only events with that endpoint are returned +// 2. When not filtering, both events with and without endpoints are returned +func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithEndpointFilter() { + ctx := context.Background() + eventRepo := postgres.NewEventRepo(s.DB) + + // Create an endpoint + endpoint, err := testdb.SeedEndpoint(s.DB, s.DefaultProject, "", "", "", false, datastore.ActiveEndpointStatus) + require.NoError(s.T(), err) + + data := json.RawMessage([]byte(`{"test": "data"}`)) + + // Create event with endpoint + eventWithEndpoint := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-with-endpoint", + Endpoints: []string{endpoint.UID}, + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err = eventRepo.CreateEvent(ctx, eventWithEndpoint) + require.NoError(s.T(), err) + + // Create event without endpoint + eventWithoutEndpoint := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-without-endpoint", + Endpoints: []string{}, + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + Status: datastore.FailureStatus, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err = eventRepo.CreateEvent(ctx, eventWithoutEndpoint) + require.NoError(s.T(), err) + + // Query with endpoint filter - should only return event with matching endpoint + events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + EndpointID: endpoint.UID, + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 1, len(events)) + require.Equal(s.T(), eventWithEndpoint.UID, events[0].UID) + + // Query without endpoint filter - should return both events + events, _, err = eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 2, len(events)) +} + +// Test_LoadEventsPaged_SearchWithoutEndpoints tests that events without endpoints +// are included in search results +func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints() { + ctx := context.Background() + eventRepo := postgres.NewEventRepo(s.DB) + + data := json.RawMessage([]byte(`{"unique_search_term": "test12345"}`)) + + // Create an event with no endpoints but searchable content + event := &datastore.Event{ + UID: ulid.Make().String(), + EventType: "test-event-searchable", + Endpoints: []string{}, + ProjectID: s.DefaultProject.UID, + Headers: httpheader.HTTPHeader{}, + Raw: string(data), + Data: data, + Status: datastore.FailureStatus, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err := eventRepo.CreateEvent(ctx, event) + require.NoError(s.T(), err) + + // Copy to search table for text search + err = eventRepo.CopyRows(ctx, s.DefaultProject.UID, 1) + require.NoError(s.T(), err) + + // Search for the event - should find it despite no endpoints + events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ + Query: "unique_search_term", + SearchParams: datastore.SearchParams{ + CreatedAtStart: time.Now().Add(-time.Hour).Unix(), + CreatedAtEnd: time.Now().Add(5 * time.Minute).Unix(), + }, + Pageable: datastore.Pageable{ + PerPage: 10, + Direction: datastore.Next, + NextCursor: datastore.DefaultCursor, + }, + }) + + require.NoError(s.T(), err) + require.Equal(s.T(), 1, len(events)) + require.Equal(s.T(), event.UID, events[0].UID) +} + +func TestEventsIntegrationTestSuite(t *testing.T) { + suite.Run(t, new(EventsIntegrationTestSuite)) +} diff --git a/database/postgres/event.go b/database/postgres/event.go index ca6bc23072..52ac8469db 100644 --- a/database/postgres/event.go +++ b/database/postgres/event.go @@ -107,7 +107,7 @@ const ( COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events ev LEFT JOIN convoy.events_endpoints ee ON ee.event_id = ev.id - JOIN endpoint_ids e ON e.id = ee.endpoint_id + LEFT JOIN endpoint_ids e ON e.id = ee.endpoint_id LEFT JOIN convoy.sources s ON s.id = ev.source_id WHERE ev.deleted_at IS NULL` @@ -124,7 +124,7 @@ const ( FROM convoy.events_search ev LEFT JOIN convoy.events_endpoints ee ON ee.event_id = ev.id LEFT JOIN convoy.sources s ON s.id = ev.source_id - JOIN convoy.endpoints e ON e.id = ee.endpoint_id + LEFT JOIN convoy.endpoints e ON e.id = ee.endpoint_id WHERE ev.deleted_at IS NULL` baseEventsPagedForward = ` @@ -534,7 +534,14 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte } else { suffix = getExistsBackwardSuffix(sortOrder) } - query = baseEventsPagedExists + existsSubquery + ") " + filterQueryNoEndpoint + suffix + + // If no EXISTS subquery, don't add EXISTS clause + if existsSubquery == "" { + // Remove " AND EXISTS (" from baseEventsPagedExists (13 characters) + query = baseEventsPagedExists[:len(baseEventsPagedExists)-13] + filterQueryNoEndpoint + suffix + } else { + query = baseEventsPagedExists + existsSubquery + ") " + filterQueryNoEndpoint + suffix + } } else { // Search or legacy path: CTE + JOIN + GROUP BY. base := baseEventsPaged @@ -761,7 +768,13 @@ func getCountDeliveriesPrevRowQuery(sortOrder string) string { // buildExistsSubquery returns the EXISTS inner query for the events list (no search). // Caller must bind :owner_id and :endpoint_ids when present. +// Returns empty string when no filters are specified to include events without endpoint associations. func buildExistsSubquery(ownerID string, endpointIDs []string) string { + // If no filters, don't require endpoint associations + if util.IsStringEmpty(ownerID) && len(endpointIDs) == 0 { + return "" + } + q := "SELECT 1 FROM convoy.events_endpoints ee JOIN convoy.endpoints e ON e.id = ee.endpoint_id WHERE ee.event_id = ev.id" if !util.IsStringEmpty(ownerID) { q += " AND e.owner_id = :owner_id" diff --git a/docs/docs.go b/docs/docs.go index ba9adc0843..9e10eccf8a 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1,4 +1,4 @@ -// Package docs Code generated by swaggo/swag at 2026-02-10 14:18:50.71968 +0100 CET m=+1.596704293. DO NOT EDIT +// Package docs Code generated by swaggo/swag at 2026-02-23 15:43:01.720952 +0100 CET m=+2.017875126. DO NOT EDIT package docs import "github.com/swaggo/swag" From de03053d5ac8dd7029622cee01b05560f95e8db7 Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Mon, 23 Feb 2026 18:32:22 +0100 Subject: [PATCH 2/7] Add integration tests for event status updates and enhance queries to include status and metadata fields. --- api/events_integration_test.go | 17 ++++++++++++++--- database/postgres/event.go | 5 +++-- docs/docs.go | 2 +- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/api/events_integration_test.go b/api/events_integration_test.go index 83c10ecfcb..ceec1deef1 100644 --- a/api/events_integration_test.go +++ b/api/events_integration_test.go @@ -69,7 +69,6 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithoutEndpoints() { Headers: httpheader.HTTPHeader{}, Raw: string(data), Data: data, - Status: datastore.FailureStatus, // Events without subscriptions get Failure status CreatedAt: time.Now(), UpdatedAt: time.Now(), } @@ -77,6 +76,10 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithoutEndpoints() { err := eventRepo.CreateEvent(ctx, event) require.NoError(s.T(), err) + // Update status to Failure (events without subscriptions get Failure status) + err = eventRepo.UpdateEventStatus(ctx, event, datastore.FailureStatus) + require.NoError(s.T(), err) + // Query without endpoint filter - should return the event events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ SearchParams: datastore.SearchParams{ @@ -134,7 +137,6 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithEndpointFilter() { Headers: httpheader.HTTPHeader{}, Raw: string(data), Data: data, - Status: datastore.FailureStatus, CreatedAt: time.Now(), UpdatedAt: time.Now(), } @@ -142,6 +144,10 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithEndpointFilter() { err = eventRepo.CreateEvent(ctx, eventWithoutEndpoint) require.NoError(s.T(), err) + // Update status to Failure (events without subscriptions get Failure status) + err = eventRepo.UpdateEventStatus(ctx, eventWithoutEndpoint, datastore.FailureStatus) + require.NoError(s.T(), err) + // Query with endpoint filter - should only return event with matching endpoint events, _, err := eventRepo.LoadEventsPaged(ctx, s.DefaultProject.UID, &datastore.Filter{ EndpointID: endpoint.UID, @@ -194,7 +200,6 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints Headers: httpheader.HTTPHeader{}, Raw: string(data), Data: data, - Status: datastore.FailureStatus, CreatedAt: time.Now(), UpdatedAt: time.Now(), } @@ -202,6 +207,10 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints err := eventRepo.CreateEvent(ctx, event) require.NoError(s.T(), err) + // Update status to Failure (events without subscriptions get Failure status) + err = eventRepo.UpdateEventStatus(ctx, event, datastore.FailureStatus) + require.NoError(s.T(), err) + // Copy to search table for text search err = eventRepo.CopyRows(ctx, s.DefaultProject.UID, 1) require.NoError(s.T(), err) @@ -223,6 +232,8 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints require.NoError(s.T(), err) require.Equal(s.T(), 1, len(events)) require.Equal(s.T(), event.UID, events[0].UID) + // Note: events_search table doesn't have metadata or status columns, + // so we don't check status for search results } func TestEventsIntegrationTestSuite(t *testing.T) { diff --git a/database/postgres/event.go b/database/postgres/event.go index 52ac8469db..b242f06ad6 100644 --- a/database/postgres/event.go +++ b/database/postgres/event.go @@ -102,7 +102,7 @@ const ( ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(idempotency_key, '') AS idempotency_key, COALESCE(url_query_params, '') AS url_query_params, - ev.updated_at, ev.deleted_at,ev.acknowledged_at, + ev.updated_at, ev.deleted_at,ev.acknowledged_at,ev.metadata,ev.status, COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events ev @@ -112,6 +112,7 @@ const ( WHERE ev.deleted_at IS NULL` baseEventsSearch = ` + with events as ( SELECT ev.id, ev.project_id, ev.id AS event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, @@ -185,7 +186,7 @@ const ( COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(ev.idempotency_key, '') AS idempotency_key, COALESCE(ev.url_query_params, '') AS url_query_params, - ev.updated_at, ev.deleted_at, ev.acknowledged_at, + ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status, COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events ev LEFT JOIN convoy.sources s ON s.id = ev.source_id diff --git a/docs/docs.go b/docs/docs.go index 9e10eccf8a..3475833ea5 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1,4 +1,4 @@ -// Package docs Code generated by swaggo/swag at 2026-02-23 15:43:01.720952 +0100 CET m=+2.017875126. DO NOT EDIT +// Package docs Code generated by swaggo/swag at 2026-02-23 18:32:24.79121 +0100 CET m=+2.388581417. DO NOT EDIT package docs import "github.com/swaggo/swag" From 2ec796bdb9f529d95b96a3aac24f03df181d7c88 Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Mon, 23 Feb 2026 22:30:11 +0100 Subject: [PATCH 3/7] Fix event queries by correcting `event_type` field alias and formatting inconsistencies. --- database/postgres/event.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/database/postgres/event.go b/database/postgres/event.go index b242f06ad6..aa2ba8b088 100644 --- a/database/postgres/event.go +++ b/database/postgres/event.go @@ -97,12 +97,12 @@ const ( baseEventsPaged = ` with endpoint_ids as (select id from convoy.endpoints where owner_id = :owner_id), events as ( SELECT ev.id, ev.project_id, - ev.id AS event_type, ev.is_duplicate_event, + ev.event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(idempotency_key, '') AS idempotency_key, COALESCE(url_query_params, '') AS url_query_params, - ev.updated_at, ev.deleted_at,ev.acknowledged_at,ev.metadata,ev.status, + ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status, COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events ev @@ -114,7 +114,7 @@ const ( baseEventsSearch = ` with events as ( SELECT ev.id, ev.project_id, - ev.id AS event_type, ev.is_duplicate_event, + ev.event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(idempotency_key, '') AS idempotency_key, @@ -182,7 +182,7 @@ const ( // EXISTS path: no GROUP BY, uses idx_events_project_created_pagination when filter.Query is empty baseEventsPagedExists = ` - SELECT ev.id, ev.project_id, ev.id AS event_type, ev.is_duplicate_event, + SELECT ev.id, ev.project_id, ev.event_type, ev.is_duplicate_event, COALESCE(ev.source_id, '') AS source_id, ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(ev.idempotency_key, '') AS idempotency_key, COALESCE(ev.url_query_params, '') AS url_query_params, From d04682f574134357556d8afbe929a67308739e89 Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Mon, 2 Mar 2026 12:42:34 +0100 Subject: [PATCH 4/7] Add columns to `events_search` table to match `events` schema and fix related queries --- api/events_integration_test.go | 6 +++--- database/postgres/event.go | 2 +- docs/docs.go | 2 +- sql/1692105853.sql | 5 +++-- sql/1772451263.sql | 11 +++++++++++ 5 files changed, 19 insertions(+), 7 deletions(-) create mode 100644 sql/1772451263.sql diff --git a/api/events_integration_test.go b/api/events_integration_test.go index ceec1deef1..306c13e437 100644 --- a/api/events_integration_test.go +++ b/api/events_integration_test.go @@ -58,7 +58,7 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithoutEndpoints() { ctx := context.Background() eventRepo := postgres.NewEventRepo(s.DB) - data := json.RawMessage([]byte(`{"test": "data"}`)) + data := json.RawMessage(`{"test": "data"}`) // Create an event with no endpoints (simulating an event ingested for a source with no subscriptions) event := &datastore.Event{ @@ -110,7 +110,7 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_WithEndpointFilter() { endpoint, err := testdb.SeedEndpoint(s.DB, s.DefaultProject, "", "", "", false, datastore.ActiveEndpointStatus) require.NoError(s.T(), err) - data := json.RawMessage([]byte(`{"test": "data"}`)) + data := json.RawMessage(`{"test": "data"}`) // Create event with endpoint eventWithEndpoint := &datastore.Event{ @@ -189,7 +189,7 @@ func (s *EventsIntegrationTestSuite) Test_LoadEventsPaged_SearchWithoutEndpoints ctx := context.Background() eventRepo := postgres.NewEventRepo(s.DB) - data := json.RawMessage([]byte(`{"unique_search_term": "test12345"}`)) + data := json.RawMessage(`{"unique_search_term": "test12345"}`) // Create an event with no endpoints but searchable content event := &datastore.Event{ diff --git a/database/postgres/event.go b/database/postgres/event.go index aa2ba8b088..cfd22c7e42 100644 --- a/database/postgres/event.go +++ b/database/postgres/event.go @@ -119,7 +119,7 @@ const ( ev.headers, ev.raw, ev.data, ev.created_at, COALESCE(idempotency_key, '') AS idempotency_key, COALESCE(url_query_params, '') AS url_query_params, - ev.updated_at, ev.deleted_at, + ev.updated_at, ev.deleted_at, ev.acknowledged_at, ev.metadata, ev.status, COALESCE(s.id, '') AS "source_metadata.id", COALESCE(s.name, '') AS "source_metadata.name" FROM convoy.events_search ev diff --git a/docs/docs.go b/docs/docs.go index 3475833ea5..832746aa9e 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1,4 +1,4 @@ -// Package docs Code generated by swaggo/swag at 2026-02-23 18:32:24.79121 +0100 CET m=+2.388581417. DO NOT EDIT +// Package docs Code generated by swaggo/swag at 2026-03-02 12:42:37.072763 +0100 CET m=+2.387421626. DO NOT EDIT package docs import "github.com/swaggo/swag" diff --git a/sql/1692105853.sql b/sql/1692105853.sql index 7a4ec9171a..23211d1fd0 100644 --- a/sql/1692105853.sql +++ b/sql/1692105853.sql @@ -76,10 +76,11 @@ BEGIN EXIT WHEN NOT FOUND; INSERT INTO convoy.events_search (id, event_type, endpoints, project_id, source_id, headers, raw, data, created_at, updated_at, deleted_at, url_query_params, idempotency_key, - is_duplicate_event) + is_duplicate_event, acknowledged_at, status, metadata) VALUES (row_data.id, row_data.event_type, row_data.endpoints, row_data.project_id, row_data.source_id, row_data.headers, row_data.raw, row_data.data, row_data.created_at, row_data.updated_at, - row_data.deleted_at, row_data.url_query_params, row_data.idempotency_key, row_data.is_duplicate_event); + row_data.deleted_at, row_data.url_query_params, row_data.idempotency_key, row_data.is_duplicate_event, + row_data.acknowledged_at, row_data.status, row_data.metadata); END LOOP; CLOSE cs; END; diff --git a/sql/1772451263.sql b/sql/1772451263.sql new file mode 100644 index 0000000000..da792c2cb4 --- /dev/null +++ b/sql/1772451263.sql @@ -0,0 +1,11 @@ +-- +migrate Up +-- Add columns to events_search to match events table schema +-- These columns were added to events table but not to events_search, causing queries to fail +ALTER TABLE convoy.events_search ADD COLUMN IF NOT EXISTS acknowledged_at TIMESTAMPTZ DEFAULT NULL; +ALTER TABLE convoy.events_search ADD COLUMN IF NOT EXISTS status TEXT DEFAULT NULL; +ALTER TABLE convoy.events_search ADD COLUMN IF NOT EXISTS metadata TEXT DEFAULT NULL; + +-- +migrate Down +ALTER TABLE convoy.events_search DROP COLUMN IF EXISTS acknowledged_at; +ALTER TABLE convoy.events_search DROP COLUMN IF EXISTS status; +ALTER TABLE convoy.events_search DROP COLUMN IF EXISTS metadata; From 497577302285a18f6c736baa30eec5e0ab0c381c Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Mon, 2 Mar 2026 14:09:08 +0100 Subject: [PATCH 5/7] Make `test` target verbose in Makefile by adding `-v` flag to `go test`. --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 7f512b3d6d..eec3f2dcfe 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ build: .PHONY: test test: - @go test -p 1 $(shell go list ./... | grep -v '/e2e') + @go test -p 1 $(shell go list ./... | grep -v '/e2e') -v # Get Docker socket from active context if DOCKER_HOST is not set DOCKER_HOST_VAL := $(or $(DOCKER_HOST),$(shell docker context inspect --format '{{.Endpoints.docker.Host}}' 2>/dev/null || echo "")) From a0036838c3574400d55a119edf3f51eb043cfb17 Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Mon, 2 Mar 2026 15:53:02 +0100 Subject: [PATCH 6/7] Update `copy_rows` function to include new columns and adjust reversal logic --- sql/1692105853.sql | 5 ++-- sql/1772462842.sql | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) create mode 100644 sql/1772462842.sql diff --git a/sql/1692105853.sql b/sql/1692105853.sql index 23211d1fd0..7a4ec9171a 100644 --- a/sql/1692105853.sql +++ b/sql/1692105853.sql @@ -76,11 +76,10 @@ BEGIN EXIT WHEN NOT FOUND; INSERT INTO convoy.events_search (id, event_type, endpoints, project_id, source_id, headers, raw, data, created_at, updated_at, deleted_at, url_query_params, idempotency_key, - is_duplicate_event, acknowledged_at, status, metadata) + is_duplicate_event) VALUES (row_data.id, row_data.event_type, row_data.endpoints, row_data.project_id, row_data.source_id, row_data.headers, row_data.raw, row_data.data, row_data.created_at, row_data.updated_at, - row_data.deleted_at, row_data.url_query_params, row_data.idempotency_key, row_data.is_duplicate_event, - row_data.acknowledged_at, row_data.status, row_data.metadata); + row_data.deleted_at, row_data.url_query_params, row_data.idempotency_key, row_data.is_duplicate_event); END LOOP; CLOSE cs; END; diff --git a/sql/1772462842.sql b/sql/1772462842.sql new file mode 100644 index 0000000000..5c24afbe28 --- /dev/null +++ b/sql/1772462842.sql @@ -0,0 +1,57 @@ +-- +migrate Up +-- Update copy_rows function to copy new columns (acknowledged_at, status, metadata) +-- that were added to both events and events_search tables in earlier migrations +-- +migrate StatementBegin +CREATE OR REPLACE FUNCTION convoy.copy_rows(pid VARCHAR, dur INTEGER) RETURNS VOID AS +$$ +DECLARE + cs CURSOR FOR + SELECT * FROM convoy.events + WHERE project_id = pid + AND created_at >= NOW() - MAKE_INTERVAL(hours := dur); + row_data RECORD; +BEGIN + OPEN cs; + LOOP + FETCH cs INTO row_data; + EXIT WHEN NOT FOUND; + INSERT INTO convoy.events_search (id, event_type, endpoints, project_id, source_id, headers, raw, data, + created_at, updated_at, deleted_at, url_query_params, idempotency_key, + is_duplicate_event, acknowledged_at, status, metadata) + VALUES (row_data.id, row_data.event_type, row_data.endpoints, row_data.project_id, row_data.source_id, + row_data.headers, row_data.raw, row_data.data, row_data.created_at, row_data.updated_at, + row_data.deleted_at, row_data.url_query_params, row_data.idempotency_key, row_data.is_duplicate_event, + row_data.acknowledged_at, row_data.status, row_data.metadata); + END LOOP; + CLOSE cs; +END; +$$ LANGUAGE plpgsql; +-- +migrate StatementEnd + +-- +migrate Down +-- Revert copy_rows function to not include new columns +-- +migrate StatementBegin +CREATE OR REPLACE FUNCTION convoy.copy_rows(pid VARCHAR, dur INTEGER) RETURNS VOID AS +$$ +DECLARE + cs CURSOR FOR + SELECT * FROM convoy.events + WHERE project_id = pid + AND created_at >= NOW() - MAKE_INTERVAL(hours := dur); + row_data RECORD; +BEGIN + OPEN cs; + LOOP + FETCH cs INTO row_data; + EXIT WHEN NOT FOUND; + INSERT INTO convoy.events_search (id, event_type, endpoints, project_id, source_id, headers, raw, data, + created_at, updated_at, deleted_at, url_query_params, idempotency_key, + is_duplicate_event) + VALUES (row_data.id, row_data.event_type, row_data.endpoints, row_data.project_id, row_data.source_id, + row_data.headers, row_data.raw, row_data.data, row_data.created_at, row_data.updated_at, + row_data.deleted_at, row_data.url_query_params, row_data.idempotency_key, row_data.is_duplicate_event); + END LOOP; + CLOSE cs; +END; +$$ LANGUAGE plpgsql; +-- +migrate StatementEnd From 684ee09fa9a8c4829b025422fcd626133792a2c1 Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Mon, 2 Mar 2026 17:53:10 +0100 Subject: [PATCH 7/7] Set query timeouts and fix pagination query formatting in event-related migrations and queries. --- database/postgres/event.go | 2 +- sql/1772451263.sql | 2 ++ sql/1772462842.sql | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/database/postgres/event.go b/database/postgres/event.go index f1d5aee96b..2c138caced 100644 --- a/database/postgres/event.go +++ b/database/postgres/event.go @@ -576,7 +576,7 @@ func (e *eventRepo) LoadEventsPaged(ctx context.Context, projectID string, filte if filter.Pageable.Direction == datastore.Prev { preOrder = reverseOrder(preOrder) } - query = fmt.Sprintf(baseQueryPagination, base, filterQuery, preOrder, filter.Pageable.SortOrder()) + query = fmt.Sprintf(baseQueryPagination, base, filterQuery, preOrder, preOrder, filter.Pageable.SortOrder(), filter.Pageable.SortOrder()) } query, args, err = sqlx.Named(query, arg) diff --git a/sql/1772451263.sql b/sql/1772451263.sql index da792c2cb4..e406b1dd47 100644 --- a/sql/1772451263.sql +++ b/sql/1772451263.sql @@ -1,4 +1,6 @@ -- +migrate Up +SET lock_timeout = '2s'; +SET statement_timeout = '30s'; -- Add columns to events_search to match events table schema -- These columns were added to events table but not to events_search, causing queries to fail ALTER TABLE convoy.events_search ADD COLUMN IF NOT EXISTS acknowledged_at TIMESTAMPTZ DEFAULT NULL; diff --git a/sql/1772462842.sql b/sql/1772462842.sql index 5c24afbe28..87d45162b0 100644 --- a/sql/1772462842.sql +++ b/sql/1772462842.sql @@ -1,4 +1,6 @@ -- +migrate Up +SET lock_timeout = '2s'; +SET statement_timeout = '30s'; -- Update copy_rows function to copy new columns (acknowledged_at, status, metadata) -- that were added to both events and events_search tables in earlier migrations -- +migrate StatementBegin