diff --git a/.gitignore b/.gitignore index 6f7f206a8d..3f04e2764f 100644 --- a/.gitignore +++ b/.gitignore @@ -16,15 +16,11 @@ .idea .vscode +# Ignore all UI build artifacts, but preserve the directory and a stub/gitkeep for go:embed /api/ui/build/* -!/api/ui/build/go_test_stub.txt -!/api/ui/build/index.html -!/api/ui/build/*.js -!/api/ui/build/*.css -!/api/ui/build/*.woff -!/api/ui/build/*.woff2 -!/api/ui/build/*.ttf -!/api/ui/build/assets +!/api/ui/build/.gitkeep +!/api/ui/build/index.txt + /data/mongo/ .DS_Store *.db @@ -54,5 +50,4 @@ convoy-ce # used to generate test stubs .cursor -mise.local.toml -api/ui/* \ No newline at end of file +mise.local.toml \ No newline at end of file diff --git a/Makefile b/Makefile index 7f512b3d6d..43f9484637 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,15 @@ ui_install: build: scripts/build.sh +.PHONY: build-ui +build-ui: + @echo "Building UI..." + @scripts/ui.sh -b ce + @echo "Updating embedded assets..." + @rm -rf api/ui/build/* + @cp -R web/ui/dashboard/dist/* api/ui/build/ + @echo "UI Build complete!" + .PHONY: test test: @go test -p 1 $(shell go list ./... | grep -v '/e2e') diff --git a/api/ingest.go b/api/ingest.go index 55250d2c26..3650db3642 100644 --- a/api/ingest.go +++ b/api/ingest.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "strings" + "sync" "time" "github.com/go-chi/chi/v5" @@ -30,6 +31,15 @@ import ( "github.com/frain-dev/convoy/worker/task" ) +var ( + eventPool = sync.Pool{ + New: func() any { return &datastore.Event{} }, + } + bufferPool = sync.Pool{ + New: func() any { return new(bytes.Buffer) }, + } +) + func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request) { // s.AppService.CountProjectApplications() // 1. Retrieve mask ID @@ -160,12 +170,18 @@ func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request) // 3.1 On Failure // Return 400 Bad Request. // Read raw body for signature verification first (e.g., GitHub signs raw bytes) - rawPayload, err := io.ReadAll(io.LimitReader(r.Body, int64(maxIngestSize))) + + buf := bufferPool.Get().(*bytes.Buffer) + buf.Reset() + defer bufferPool.Put(buf) + + _, err = io.Copy(buf, io.LimitReader(r.Body, int64(maxIngestSize))) if err != nil { a.A.Logger.WithError(err).Error("Failed to read request body") _ = render.Render(w, r, util.NewErrorResponse("Invalid request format", http.StatusBadRequest)) return } + rawPayload := buf.Bytes() // Restore body for subsequent reads r.Body = io.NopCloser(bytes.NewReader(rawPayload)) @@ -208,19 +224,21 @@ func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request) // 3.2 On success // Attach Source to Event. // Write Event to the Ingestion Queue. - event := &datastore.Event{ - UID: ulid.Make().String(), - EventType: datastore.EventType(maskID), - SourceID: source.UID, - ProjectID: source.ProjectID, - Raw: string(payload), - Data: payload, - IsDuplicateEvent: isDuplicate, - URLQueryParams: r.URL.RawQuery, - IdempotencyKey: checksum, - Headers: httpheader.HTTPHeader(r.Header), - AcknowledgedAt: null.TimeFrom(time.Now()), - } + + event := eventPool.Get().(*datastore.Event) + event.Reset() + + event.UID = ulid.Make().String() + event.EventType = datastore.EventType(maskID) + event.SourceID = source.UID + event.ProjectID = source.ProjectID + event.Raw = string(payload) + event.Data = payload + event.IsDuplicateEvent = isDuplicate + event.URLQueryParams = r.URL.RawQuery + event.IdempotencyKey = checksum + event.Headers = httpheader.HTTPHeader(r.Header) + event.AcknowledgedAt = null.TimeFrom(time.Now()) event.Headers["X-Convoy-Source-Id"] = []string{source.MaskID} @@ -231,6 +249,9 @@ func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request) } eventByte, err := msgpack.EncodeMsgPack(createEvent) + + eventPool.Put(event) + if err != nil { a.A.Logger.WithError(err).Error("Failed to encode event data") _ = render.Render(w, r, util.NewErrorResponse("Failed to process event", http.StatusBadRequest)) diff --git a/api/ui/build/go_test_stub.txt b/api/ui/build/.gitkeep similarity index 100% rename from api/ui/build/go_test_stub.txt rename to api/ui/build/.gitkeep diff --git a/api/ui/build/index.html b/api/ui/build/index.html deleted file mode 100644 index d219a7dc4a..0000000000 --- a/api/ui/build/index.html +++ /dev/null @@ -1,388 +0,0 @@ - - - Convoy - - - - - - - - - - - - \ No newline at end of file diff --git a/api/ui/build/index.txt b/api/ui/build/index.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/datastore/models.go b/datastore/models.go index 2e3772016c..a04a773b59 100644 --- a/datastore/models.go +++ b/datastore/models.go @@ -828,6 +828,14 @@ type Event struct { DeletedAt null.Time `json:"deleted_at,omitempty" db:"deleted_at" swaggertype:"string"` } +func (d *DeliveryAttempt) Reset() { + *d = DeliveryAttempt{} +} + +func (e *Event) Reset() { + *e = Event{} +} + func (e *Event) GetRawHeaders() map[string]interface{} { h := make(map[string]interface{}, len(e.Headers)) @@ -1064,6 +1072,10 @@ type EventDelivery struct { DeletedAt null.Time `json:"deleted_at,omitempty" db:"deleted_at" swaggertype:"string"` } +func (e *EventDelivery) Reset() { + *e = EventDelivery{} +} + func (d *EventDelivery) GetLatencyStartTime() time.Time { if d.AcknowledgedAt.IsZero() { return d.CreatedAt diff --git a/docker-compose.infra.yml b/docker-compose.infra.yml new file mode 100644 index 0000000000..db9756e3e7 --- /dev/null +++ b/docker-compose.infra.yml @@ -0,0 +1,52 @@ +version: "3" + +volumes: + postgres_data: + redis_data: + +services: + postgres: + image: postgres:15.2-alpine + restart: unless-stopped + ports: + - "5432:5432" + environment: + POSTGRES_DB: convoy + POSTGRES_USER: convoy + POSTGRES_PASSWORD: convoy + PGDATA: /data/postgres + volumes: + - postgres_data:/data/postgres + healthcheck: + test: [ "CMD-SHELL", "pg_isready -U convoy" ] + interval: 5s + timeout: 5s + retries: 5 + + redis_server: + image: redis:7-alpine + restart: always + ports: + - "6379:6379" + volumes: + - redis_data:/data + + prometheus: + image: prom/prometheus:v2.24.0 + volumes: + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml + ports: + - 9090:9090 + restart: always + + migrate: + build: + context: ./ + dockerfile: Dockerfile.dev + entrypoint: ["./cmd", "migrate", "up"] + volumes: + - ./convoy.json:/convoy.json + restart: on-failure + depends_on: + postgres: + condition: service_healthy diff --git a/pkg/msgpack/msgpack.go b/pkg/msgpack/msgpack.go index 7c3abc3052..2c3a66055a 100644 --- a/pkg/msgpack/msgpack.go +++ b/pkg/msgpack/msgpack.go @@ -3,30 +3,57 @@ package msgpack import ( "bytes" + "sync" + "github.com/vmihailenco/msgpack/v5" ) +var ( + encPool = sync.Pool{ + New: func() any { + var buf bytes.Buffer + enc := msgpack.NewEncoder(&buf) + enc.SetCustomStructTag("json") + return enc + }, + } + + decPool = sync.Pool{ + New: func() any { + dec := msgpack.NewDecoder(nil) + dec.SetCustomStructTag("json") + return dec + }, + } +) + func EncodeMsgPack(payload interface{}) ([]byte, error) { - var buf bytes.Buffer - enc := msgpack.NewEncoder(&buf) + enc := encPool.Get().(*msgpack.Encoder) + defer encPool.Put(enc) + enc.SetCustomStructTag("json") + buf := enc.Writer().(*bytes.Buffer) + buf.Reset() + err := enc.Encode(payload) if err != nil { return nil, err } - return buf.Bytes(), nil + res := make([]byte, buf.Len()) + copy(res, buf.Bytes()) + return res, nil } func DecodeMsgPack(pack []byte, target interface{}) error { - var buf bytes.Buffer - buf.Write(pack) + dec := decPool.Get().(*msgpack.Decoder) + defer decPool.Put(dec) - enc := msgpack.NewDecoder(&buf) - enc.SetCustomStructTag("json") + dec.Reset(bytes.NewReader(pack)) + dec.SetCustomStructTag("json") - err := enc.Decode(&target) + err := dec.Decode(target) if err != nil { return err } diff --git a/web/ui/dashboard/package.json b/web/ui/dashboard/package.json index 8ef5bda322..d0d129199c 100644 --- a/web/ui/dashboard/package.json +++ b/web/ui/dashboard/package.json @@ -5,7 +5,7 @@ "ng": "ng", "start": "ng serve --hmr", "build": "ng build", - "postbuild": "mkdir -p ./source-maps && mv `pwd`/dist/*.map ./source-maps", + "postbuild": "mkdir -p ./source-maps && mv -f `pwd`/dist/*.map ./source-maps", "watch": "ng build --watch --configuration development", "test": "ng test", "build:ee": "ng build --configuration ee" diff --git a/worker/task/process_event_creation.go b/worker/task/process_event_creation.go index c87b77d333..639decb4bc 100644 --- a/worker/task/process_event_creation.go +++ b/worker/task/process_event_creation.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strconv" + "sync" "time" "github.com/hibiken/asynq" @@ -54,6 +55,10 @@ type CreateEvent struct { CreateSubscription bool } +func (c *CreateEvent) Reset() { + *c = CreateEvent{} +} + type DefaultEventChannel struct { } @@ -72,6 +77,10 @@ type EventProcessorDeps struct { EarlyAdopterFeatureFetcher fflag.EarlyAdopterFeatureFetcher } +var createEventPool = sync.Pool{ + New: func() any { return &CreateEvent{} }, +} + func NewDefaultEventChannel() *DefaultEventChannel { return &DefaultEventChannel{} } @@ -87,7 +96,10 @@ func (d *DefaultEventChannel) GetConfig() *EventChannelConfig { // find & match subscriptions & create deliveries (e = SUCCESS) // deliver ed (ed = SUCCESS) func (d *DefaultEventChannel) CreateEvent(ctx context.Context, t *asynq.Task, channel EventChannel, args EventChannelArgs) (*datastore.Event, error) { - var createEvent CreateEvent + createEvent := createEventPool.Get().(*CreateEvent) + createEvent.Reset() + defer createEventPool.Put(createEvent) + var event *datastore.Event var projectID string @@ -98,9 +110,9 @@ func (d *DefaultEventChannel) CreateEvent(ctx context.Context, t *asynq.Task, ch "channel": channel, } - err := msgpack.DecodeMsgPack(t.Payload(), &createEvent) + err := msgpack.DecodeMsgPack(t.Payload(), createEvent) if err != nil { - err = json.Unmarshal(t.Payload(), &createEvent) + err = json.Unmarshal(t.Payload(), createEvent) if err != nil { args.tracerBackend.Capture(ctx, "event.creation.error", attributes, startTime, time.Now()) return nil, &EndpointError{Err: err, delay: defaultDelay} diff --git a/worker/task/process_event_delivery.go b/worker/task/process_event_delivery.go index 4b670d497c..3ecacf5c5c 100644 --- a/worker/task/process_event_delivery.go +++ b/worker/task/process_event_delivery.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" "github.com/hibiken/asynq" @@ -52,6 +53,10 @@ type EventDeliveryProcessorDeps struct { OAuth2TokenService OAuth2TokenService } +var eventDeliveryPool = sync.Pool{ + New: func() any { return &EventDelivery{} }, +} + func ProcessEventDelivery(deps EventDeliveryProcessorDeps) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) (err error) { // Start a new trace span for event delivery @@ -60,7 +65,10 @@ func ProcessEventDelivery(deps EventDeliveryProcessorDeps) func(context.Context, "event.type": "event.delivery", } - var data EventDelivery + data := eventDeliveryPool.Get().(*EventDelivery) + data.Reset() + defer eventDeliveryPool.Put(data) + var delayDuration time.Duration defer func() { @@ -89,9 +97,9 @@ func ProcessEventDelivery(deps EventDeliveryProcessorDeps) func(context.Context, } }() - err = msgpack.DecodeMsgPack(t.Payload(), &data) + err = msgpack.DecodeMsgPack(t.Payload(), data) if err != nil { - err = json.Unmarshal(t.Payload(), &data) + err = json.Unmarshal(t.Payload(), data) if err != nil { deps.TracerBackend.Capture(ctx, "event.delivery.error", attributes, traceStartTime, time.Now()) return &DeliveryError{Err: err} diff --git a/worker/task/task.go b/worker/task/task.go index aebcd08136..ce7599f1dc 100644 --- a/worker/task/task.go +++ b/worker/task/task.go @@ -68,6 +68,10 @@ type EventDelivery struct { ProjectID string } +func (e *EventDelivery) Reset() { + *e = EventDelivery{} +} + type EventDeliveryConfig struct { project *datastore.Project subscription *datastore.Subscription