diff --git a/.gitignore b/.gitignore
index 6f7f206a8d..3f04e2764f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,15 +16,11 @@
.idea
.vscode
+# Ignore all UI build artifacts, but preserve the directory and a stub/gitkeep for go:embed
/api/ui/build/*
-!/api/ui/build/go_test_stub.txt
-!/api/ui/build/index.html
-!/api/ui/build/*.js
-!/api/ui/build/*.css
-!/api/ui/build/*.woff
-!/api/ui/build/*.woff2
-!/api/ui/build/*.ttf
-!/api/ui/build/assets
+!/api/ui/build/.gitkeep
+!/api/ui/build/index.txt
+
/data/mongo/
.DS_Store
*.db
@@ -54,5 +50,4 @@ convoy-ce
# used to generate test stubs
.cursor
-mise.local.toml
-api/ui/*
\ No newline at end of file
+mise.local.toml
\ No newline at end of file
diff --git a/Makefile b/Makefile
index 7f512b3d6d..43f9484637 100644
--- a/Makefile
+++ b/Makefile
@@ -15,6 +15,15 @@ ui_install:
build:
scripts/build.sh
+.PHONY: build-ui
+build-ui:
+ @echo "Building UI..."
+ @scripts/ui.sh -b ce
+ @echo "Updating embedded assets..."
+ @rm -rf api/ui/build/*
+ @cp -R web/ui/dashboard/dist/* api/ui/build/
+ @echo "UI Build complete!"
+
.PHONY: test
test:
@go test -p 1 $(shell go list ./... | grep -v '/e2e')
diff --git a/api/ingest.go b/api/ingest.go
index 55250d2c26..3650db3642 100644
--- a/api/ingest.go
+++ b/api/ingest.go
@@ -7,6 +7,7 @@ import (
"io"
"net/http"
"strings"
+ "sync"
"time"
"github.com/go-chi/chi/v5"
@@ -30,6 +31,15 @@ import (
"github.com/frain-dev/convoy/worker/task"
)
+var (
+ eventPool = sync.Pool{
+ New: func() any { return &datastore.Event{} },
+ }
+ bufferPool = sync.Pool{
+ New: func() any { return new(bytes.Buffer) },
+ }
+)
+
func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request) {
// s.AppService.CountProjectApplications()
// 1. Retrieve mask ID
@@ -160,12 +170,18 @@ func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request)
// 3.1 On Failure
// Return 400 Bad Request.
// Read raw body for signature verification first (e.g., GitHub signs raw bytes)
- rawPayload, err := io.ReadAll(io.LimitReader(r.Body, int64(maxIngestSize)))
+
+ buf := bufferPool.Get().(*bytes.Buffer)
+ buf.Reset()
+ defer bufferPool.Put(buf)
+
+ _, err = io.Copy(buf, io.LimitReader(r.Body, int64(maxIngestSize)))
if err != nil {
a.A.Logger.WithError(err).Error("Failed to read request body")
_ = render.Render(w, r, util.NewErrorResponse("Invalid request format", http.StatusBadRequest))
return
}
+ rawPayload := buf.Bytes()
// Restore body for subsequent reads
r.Body = io.NopCloser(bytes.NewReader(rawPayload))
@@ -208,19 +224,21 @@ func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request)
// 3.2 On success
// Attach Source to Event.
// Write Event to the Ingestion Queue.
- event := &datastore.Event{
- UID: ulid.Make().String(),
- EventType: datastore.EventType(maskID),
- SourceID: source.UID,
- ProjectID: source.ProjectID,
- Raw: string(payload),
- Data: payload,
- IsDuplicateEvent: isDuplicate,
- URLQueryParams: r.URL.RawQuery,
- IdempotencyKey: checksum,
- Headers: httpheader.HTTPHeader(r.Header),
- AcknowledgedAt: null.TimeFrom(time.Now()),
- }
+
+ event := eventPool.Get().(*datastore.Event)
+ event.Reset()
+
+ event.UID = ulid.Make().String()
+ event.EventType = datastore.EventType(maskID)
+ event.SourceID = source.UID
+ event.ProjectID = source.ProjectID
+ event.Raw = string(payload)
+ event.Data = payload
+ event.IsDuplicateEvent = isDuplicate
+ event.URLQueryParams = r.URL.RawQuery
+ event.IdempotencyKey = checksum
+ event.Headers = httpheader.HTTPHeader(r.Header)
+ event.AcknowledgedAt = null.TimeFrom(time.Now())
event.Headers["X-Convoy-Source-Id"] = []string{source.MaskID}
@@ -231,6 +249,9 @@ func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request)
}
eventByte, err := msgpack.EncodeMsgPack(createEvent)
+
+ eventPool.Put(event)
+
if err != nil {
a.A.Logger.WithError(err).Error("Failed to encode event data")
_ = render.Render(w, r, util.NewErrorResponse("Failed to process event", http.StatusBadRequest))
diff --git a/api/ui/build/go_test_stub.txt b/api/ui/build/.gitkeep
similarity index 100%
rename from api/ui/build/go_test_stub.txt
rename to api/ui/build/.gitkeep
diff --git a/api/ui/build/index.html b/api/ui/build/index.html
deleted file mode 100644
index d219a7dc4a..0000000000
--- a/api/ui/build/index.html
+++ /dev/null
@@ -1,388 +0,0 @@
-
-
- Convoy
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/api/ui/build/index.txt b/api/ui/build/index.txt
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/datastore/models.go b/datastore/models.go
index 2e3772016c..a04a773b59 100644
--- a/datastore/models.go
+++ b/datastore/models.go
@@ -828,6 +828,14 @@ type Event struct {
DeletedAt null.Time `json:"deleted_at,omitempty" db:"deleted_at" swaggertype:"string"`
}
+func (d *DeliveryAttempt) Reset() {
+ *d = DeliveryAttempt{}
+}
+
+func (e *Event) Reset() {
+ *e = Event{}
+}
+
func (e *Event) GetRawHeaders() map[string]interface{} {
h := make(map[string]interface{}, len(e.Headers))
@@ -1064,6 +1072,10 @@ type EventDelivery struct {
DeletedAt null.Time `json:"deleted_at,omitempty" db:"deleted_at" swaggertype:"string"`
}
+func (e *EventDelivery) Reset() {
+ *e = EventDelivery{}
+}
+
func (d *EventDelivery) GetLatencyStartTime() time.Time {
if d.AcknowledgedAt.IsZero() {
return d.CreatedAt
diff --git a/docker-compose.infra.yml b/docker-compose.infra.yml
new file mode 100644
index 0000000000..db9756e3e7
--- /dev/null
+++ b/docker-compose.infra.yml
@@ -0,0 +1,52 @@
+version: "3"
+
+volumes:
+ postgres_data:
+ redis_data:
+
+services:
+ postgres:
+ image: postgres:15.2-alpine
+ restart: unless-stopped
+ ports:
+ - "5432:5432"
+ environment:
+ POSTGRES_DB: convoy
+ POSTGRES_USER: convoy
+ POSTGRES_PASSWORD: convoy
+ PGDATA: /data/postgres
+ volumes:
+ - postgres_data:/data/postgres
+ healthcheck:
+ test: [ "CMD-SHELL", "pg_isready -U convoy" ]
+ interval: 5s
+ timeout: 5s
+ retries: 5
+
+ redis_server:
+ image: redis:7-alpine
+ restart: always
+ ports:
+ - "6379:6379"
+ volumes:
+ - redis_data:/data
+
+ prometheus:
+ image: prom/prometheus:v2.24.0
+ volumes:
+ - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
+ ports:
+ - 9090:9090
+ restart: always
+
+ migrate:
+ build:
+ context: ./
+ dockerfile: Dockerfile.dev
+ entrypoint: ["./cmd", "migrate", "up"]
+ volumes:
+ - ./convoy.json:/convoy.json
+ restart: on-failure
+ depends_on:
+ postgres:
+ condition: service_healthy
diff --git a/pkg/msgpack/msgpack.go b/pkg/msgpack/msgpack.go
index 7c3abc3052..2c3a66055a 100644
--- a/pkg/msgpack/msgpack.go
+++ b/pkg/msgpack/msgpack.go
@@ -3,30 +3,57 @@ package msgpack
import (
"bytes"
+ "sync"
+
"github.com/vmihailenco/msgpack/v5"
)
+var (
+ encPool = sync.Pool{
+ New: func() any {
+ var buf bytes.Buffer
+ enc := msgpack.NewEncoder(&buf)
+ enc.SetCustomStructTag("json")
+ return enc
+ },
+ }
+
+ decPool = sync.Pool{
+ New: func() any {
+ dec := msgpack.NewDecoder(nil)
+ dec.SetCustomStructTag("json")
+ return dec
+ },
+ }
+)
+
func EncodeMsgPack(payload interface{}) ([]byte, error) {
- var buf bytes.Buffer
- enc := msgpack.NewEncoder(&buf)
+ enc := encPool.Get().(*msgpack.Encoder)
+ defer encPool.Put(enc)
+
enc.SetCustomStructTag("json")
+ buf := enc.Writer().(*bytes.Buffer)
+ buf.Reset()
+
err := enc.Encode(payload)
if err != nil {
return nil, err
}
- return buf.Bytes(), nil
+ res := make([]byte, buf.Len())
+ copy(res, buf.Bytes())
+ return res, nil
}
func DecodeMsgPack(pack []byte, target interface{}) error {
- var buf bytes.Buffer
- buf.Write(pack)
+ dec := decPool.Get().(*msgpack.Decoder)
+ defer decPool.Put(dec)
- enc := msgpack.NewDecoder(&buf)
- enc.SetCustomStructTag("json")
+ dec.Reset(bytes.NewReader(pack))
+ dec.SetCustomStructTag("json")
- err := enc.Decode(&target)
+ err := dec.Decode(target)
if err != nil {
return err
}
diff --git a/web/ui/dashboard/package.json b/web/ui/dashboard/package.json
index 8ef5bda322..d0d129199c 100644
--- a/web/ui/dashboard/package.json
+++ b/web/ui/dashboard/package.json
@@ -5,7 +5,7 @@
"ng": "ng",
"start": "ng serve --hmr",
"build": "ng build",
- "postbuild": "mkdir -p ./source-maps && mv `pwd`/dist/*.map ./source-maps",
+ "postbuild": "mkdir -p ./source-maps && mv -f `pwd`/dist/*.map ./source-maps",
"watch": "ng build --watch --configuration development",
"test": "ng test",
"build:ee": "ng build --configuration ee"
diff --git a/worker/task/process_event_creation.go b/worker/task/process_event_creation.go
index c87b77d333..639decb4bc 100644
--- a/worker/task/process_event_creation.go
+++ b/worker/task/process_event_creation.go
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strconv"
+ "sync"
"time"
"github.com/hibiken/asynq"
@@ -54,6 +55,10 @@ type CreateEvent struct {
CreateSubscription bool
}
+func (c *CreateEvent) Reset() {
+ *c = CreateEvent{}
+}
+
type DefaultEventChannel struct {
}
@@ -72,6 +77,10 @@ type EventProcessorDeps struct {
EarlyAdopterFeatureFetcher fflag.EarlyAdopterFeatureFetcher
}
+var createEventPool = sync.Pool{
+ New: func() any { return &CreateEvent{} },
+}
+
func NewDefaultEventChannel() *DefaultEventChannel {
return &DefaultEventChannel{}
}
@@ -87,7 +96,10 @@ func (d *DefaultEventChannel) GetConfig() *EventChannelConfig {
// find & match subscriptions & create deliveries (e = SUCCESS)
// deliver ed (ed = SUCCESS)
func (d *DefaultEventChannel) CreateEvent(ctx context.Context, t *asynq.Task, channel EventChannel, args EventChannelArgs) (*datastore.Event, error) {
- var createEvent CreateEvent
+ createEvent := createEventPool.Get().(*CreateEvent)
+ createEvent.Reset()
+ defer createEventPool.Put(createEvent)
+
var event *datastore.Event
var projectID string
@@ -98,9 +110,9 @@ func (d *DefaultEventChannel) CreateEvent(ctx context.Context, t *asynq.Task, ch
"channel": channel,
}
- err := msgpack.DecodeMsgPack(t.Payload(), &createEvent)
+ err := msgpack.DecodeMsgPack(t.Payload(), createEvent)
if err != nil {
- err = json.Unmarshal(t.Payload(), &createEvent)
+ err = json.Unmarshal(t.Payload(), createEvent)
if err != nil {
args.tracerBackend.Capture(ctx, "event.creation.error", attributes, startTime, time.Now())
return nil, &EndpointError{Err: err, delay: defaultDelay}
diff --git a/worker/task/process_event_delivery.go b/worker/task/process_event_delivery.go
index 4b670d497c..3ecacf5c5c 100644
--- a/worker/task/process_event_delivery.go
+++ b/worker/task/process_event_delivery.go
@@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "sync"
"time"
"github.com/hibiken/asynq"
@@ -52,6 +53,10 @@ type EventDeliveryProcessorDeps struct {
OAuth2TokenService OAuth2TokenService
}
+var eventDeliveryPool = sync.Pool{
+ New: func() any { return &EventDelivery{} },
+}
+
func ProcessEventDelivery(deps EventDeliveryProcessorDeps) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) (err error) {
// Start a new trace span for event delivery
@@ -60,7 +65,10 @@ func ProcessEventDelivery(deps EventDeliveryProcessorDeps) func(context.Context,
"event.type": "event.delivery",
}
- var data EventDelivery
+ data := eventDeliveryPool.Get().(*EventDelivery)
+ data.Reset()
+ defer eventDeliveryPool.Put(data)
+
var delayDuration time.Duration
defer func() {
@@ -89,9 +97,9 @@ func ProcessEventDelivery(deps EventDeliveryProcessorDeps) func(context.Context,
}
}()
- err = msgpack.DecodeMsgPack(t.Payload(), &data)
+ err = msgpack.DecodeMsgPack(t.Payload(), data)
if err != nil {
- err = json.Unmarshal(t.Payload(), &data)
+ err = json.Unmarshal(t.Payload(), data)
if err != nil {
deps.TracerBackend.Capture(ctx, "event.delivery.error", attributes, traceStartTime, time.Now())
return &DeliveryError{Err: err}
diff --git a/worker/task/task.go b/worker/task/task.go
index aebcd08136..ce7599f1dc 100644
--- a/worker/task/task.go
+++ b/worker/task/task.go
@@ -68,6 +68,10 @@ type EventDelivery struct {
ProjectID string
}
+func (e *EventDelivery) Reset() {
+ *e = EventDelivery{}
+}
+
type EventDeliveryConfig struct {
project *datastore.Project
subscription *datastore.Subscription