Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@
.idea
.vscode

# Ignore all UI build artifacts, but preserve the directory and a stub/gitkeep for go:embed
/api/ui/build/*
!/api/ui/build/go_test_stub.txt
!/api/ui/build/index.html
!/api/ui/build/*.js
!/api/ui/build/*.css
!/api/ui/build/*.woff
!/api/ui/build/*.woff2
!/api/ui/build/*.ttf
!/api/ui/build/assets
!/api/ui/build/.gitkeep
!/api/ui/build/index.txt

/data/mongo/
.DS_Store
*.db
Expand Down Expand Up @@ -54,5 +50,4 @@ convoy-ce
# used to generate test stubs
.cursor

mise.local.toml
api/ui/*
mise.local.toml
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ ui_install:
build:
scripts/build.sh

.PHONY: build-ui
build-ui:
@echo "Building UI..."
@scripts/ui.sh -b ce
@echo "Updating embedded assets..."
@rm -rf api/ui/build/*
@cp -R web/ui/dashboard/dist/* api/ui/build/
@echo "UI Build complete!"

.PHONY: test
test:
@go test -p 1 $(shell go list ./... | grep -v '/e2e')
Expand Down
49 changes: 35 additions & 14 deletions api/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net/http"
"strings"
"sync"
"time"

"github.com/go-chi/chi/v5"
Expand All @@ -30,6 +31,15 @@ import (
"github.com/frain-dev/convoy/worker/task"
)

var (
eventPool = sync.Pool{
New: func() any { return &datastore.Event{} },
}
bufferPool = sync.Pool{
New: func() any { return new(bytes.Buffer) },
}
)

func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request) {
// s.AppService.CountProjectApplications()
// 1. Retrieve mask ID
Expand Down Expand Up @@ -160,12 +170,18 @@ func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request)
// 3.1 On Failure
// Return 400 Bad Request.
// Read raw body for signature verification first (e.g., GitHub signs raw bytes)
rawPayload, err := io.ReadAll(io.LimitReader(r.Body, int64(maxIngestSize)))

buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufferPool.Put(buf)

_, err = io.Copy(buf, io.LimitReader(r.Body, int64(maxIngestSize)))
if err != nil {
a.A.Logger.WithError(err).Error("Failed to read request body")
_ = render.Render(w, r, util.NewErrorResponse("Invalid request format", http.StatusBadRequest))
return
}
rawPayload := buf.Bytes()

// Restore body for subsequent reads
r.Body = io.NopCloser(bytes.NewReader(rawPayload))
Expand Down Expand Up @@ -208,19 +224,21 @@ func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request)
// 3.2 On success
// Attach Source to Event.
// Write Event to the Ingestion Queue.
event := &datastore.Event{
UID: ulid.Make().String(),
EventType: datastore.EventType(maskID),
SourceID: source.UID,
ProjectID: source.ProjectID,
Raw: string(payload),
Data: payload,
IsDuplicateEvent: isDuplicate,
URLQueryParams: r.URL.RawQuery,
IdempotencyKey: checksum,
Headers: httpheader.HTTPHeader(r.Header),
AcknowledgedAt: null.TimeFrom(time.Now()),
}

event := eventPool.Get().(*datastore.Event)
event.Reset()

event.UID = ulid.Make().String()
event.EventType = datastore.EventType(maskID)
event.SourceID = source.UID
event.ProjectID = source.ProjectID
event.Raw = string(payload)
event.Data = payload
event.IsDuplicateEvent = isDuplicate
event.URLQueryParams = r.URL.RawQuery
event.IdempotencyKey = checksum
event.Headers = httpheader.HTTPHeader(r.Header)
event.AcknowledgedAt = null.TimeFrom(time.Now())

event.Headers["X-Convoy-Source-Id"] = []string{source.MaskID}

Expand All @@ -231,6 +249,9 @@ func (a *ApplicationHandler) IngestEvent(w http.ResponseWriter, r *http.Request)
}

eventByte, err := msgpack.EncodeMsgPack(createEvent)

eventPool.Put(event)

if err != nil {
a.A.Logger.WithError(err).Error("Failed to encode event data")
_ = render.Render(w, r, util.NewErrorResponse("Failed to process event", http.StatusBadRequest))
Expand Down
File renamed without changes.
388 changes: 0 additions & 388 deletions api/ui/build/index.html

This file was deleted.

Empty file added api/ui/build/index.txt
Empty file.
12 changes: 12 additions & 0 deletions datastore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,14 @@ type Event struct {
DeletedAt null.Time `json:"deleted_at,omitempty" db:"deleted_at" swaggertype:"string"`
}

func (d *DeliveryAttempt) Reset() {
*d = DeliveryAttempt{}
}

func (e *Event) Reset() {
*e = Event{}
}

func (e *Event) GetRawHeaders() map[string]interface{} {
h := make(map[string]interface{}, len(e.Headers))

Expand Down Expand Up @@ -1064,6 +1072,10 @@ type EventDelivery struct {
DeletedAt null.Time `json:"deleted_at,omitempty" db:"deleted_at" swaggertype:"string"`
}

func (e *EventDelivery) Reset() {
*e = EventDelivery{}
}

func (d *EventDelivery) GetLatencyStartTime() time.Time {
if d.AcknowledgedAt.IsZero() {
return d.CreatedAt
Expand Down
52 changes: 52 additions & 0 deletions docker-compose.infra.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
version: "3"

volumes:
postgres_data:
redis_data:

services:
postgres:
image: postgres:15.2-alpine
restart: unless-stopped
ports:
- "5432:5432"
environment:
POSTGRES_DB: convoy
POSTGRES_USER: convoy
POSTGRES_PASSWORD: convoy
PGDATA: /data/postgres
volumes:
- postgres_data:/data/postgres
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U convoy" ]
interval: 5s
timeout: 5s
retries: 5

redis_server:
image: redis:7-alpine
restart: always
ports:
- "6379:6379"
volumes:
- redis_data:/data

prometheus:
image: prom/prometheus:v2.24.0
volumes:
- ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- 9090:9090
restart: always

migrate:
build:
context: ./
dockerfile: Dockerfile.dev
entrypoint: ["./cmd", "migrate", "up"]
volumes:
- ./convoy.json:/convoy.json
restart: on-failure
depends_on:
postgres:
condition: service_healthy
43 changes: 35 additions & 8 deletions pkg/msgpack/msgpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,57 @@ package msgpack
import (
"bytes"

"sync"

"github.com/vmihailenco/msgpack/v5"
)

var (
encPool = sync.Pool{
New: func() any {
var buf bytes.Buffer
enc := msgpack.NewEncoder(&buf)
enc.SetCustomStructTag("json")
return enc
},
}

decPool = sync.Pool{
New: func() any {
dec := msgpack.NewDecoder(nil)
dec.SetCustomStructTag("json")
return dec
},
}
)

func EncodeMsgPack(payload interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := msgpack.NewEncoder(&buf)
enc := encPool.Get().(*msgpack.Encoder)
defer encPool.Put(enc)

enc.SetCustomStructTag("json")

buf := enc.Writer().(*bytes.Buffer)
buf.Reset()

err := enc.Encode(payload)
if err != nil {
return nil, err
}

return buf.Bytes(), nil
res := make([]byte, buf.Len())
copy(res, buf.Bytes())
return res, nil
}

func DecodeMsgPack(pack []byte, target interface{}) error {
var buf bytes.Buffer
buf.Write(pack)
dec := decPool.Get().(*msgpack.Decoder)
defer decPool.Put(dec)

enc := msgpack.NewDecoder(&buf)
enc.SetCustomStructTag("json")
dec.Reset(bytes.NewReader(pack))
dec.SetCustomStructTag("json")

err := enc.Decode(&target)
err := dec.Decode(target)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion web/ui/dashboard/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"ng": "ng",
"start": "ng serve --hmr",
"build": "ng build",
"postbuild": "mkdir -p ./source-maps && mv `pwd`/dist/*.map ./source-maps",
"postbuild": "mkdir -p ./source-maps && mv -f `pwd`/dist/*.map ./source-maps",
"watch": "ng build --watch --configuration development",
"test": "ng test",
"build:ee": "ng build --configuration ee"
Expand Down
18 changes: 15 additions & 3 deletions worker/task/process_event_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"time"

"github.com/hibiken/asynq"
Expand Down Expand Up @@ -54,6 +55,10 @@ type CreateEvent struct {
CreateSubscription bool
}

func (c *CreateEvent) Reset() {
*c = CreateEvent{}
}

type DefaultEventChannel struct {
}

Expand All @@ -72,6 +77,10 @@ type EventProcessorDeps struct {
EarlyAdopterFeatureFetcher fflag.EarlyAdopterFeatureFetcher
}

var createEventPool = sync.Pool{
New: func() any { return &CreateEvent{} },
}

func NewDefaultEventChannel() *DefaultEventChannel {
return &DefaultEventChannel{}
}
Expand All @@ -87,7 +96,10 @@ func (d *DefaultEventChannel) GetConfig() *EventChannelConfig {
// find & match subscriptions & create deliveries (e = SUCCESS)
// deliver ed (ed = SUCCESS)
func (d *DefaultEventChannel) CreateEvent(ctx context.Context, t *asynq.Task, channel EventChannel, args EventChannelArgs) (*datastore.Event, error) {
var createEvent CreateEvent
createEvent := createEventPool.Get().(*CreateEvent)
createEvent.Reset()
defer createEventPool.Put(createEvent)

var event *datastore.Event
var projectID string

Expand All @@ -98,9 +110,9 @@ func (d *DefaultEventChannel) CreateEvent(ctx context.Context, t *asynq.Task, ch
"channel": channel,
}

err := msgpack.DecodeMsgPack(t.Payload(), &createEvent)
err := msgpack.DecodeMsgPack(t.Payload(), createEvent)
if err != nil {
err = json.Unmarshal(t.Payload(), &createEvent)
err = json.Unmarshal(t.Payload(), createEvent)
if err != nil {
args.tracerBackend.Capture(ctx, "event.creation.error", attributes, startTime, time.Now())
return nil, &EndpointError{Err: err, delay: defaultDelay}
Expand Down
14 changes: 11 additions & 3 deletions worker/task/process_event_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/hibiken/asynq"
Expand Down Expand Up @@ -52,6 +53,10 @@ type EventDeliveryProcessorDeps struct {
OAuth2TokenService OAuth2TokenService
}

var eventDeliveryPool = sync.Pool{
New: func() any { return &EventDelivery{} },
}

func ProcessEventDelivery(deps EventDeliveryProcessorDeps) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) (err error) {
// Start a new trace span for event delivery
Expand All @@ -60,7 +65,10 @@ func ProcessEventDelivery(deps EventDeliveryProcessorDeps) func(context.Context,
"event.type": "event.delivery",
}

var data EventDelivery
data := eventDeliveryPool.Get().(*EventDelivery)
data.Reset()
defer eventDeliveryPool.Put(data)

var delayDuration time.Duration

defer func() {
Expand Down Expand Up @@ -89,9 +97,9 @@ func ProcessEventDelivery(deps EventDeliveryProcessorDeps) func(context.Context,
}
}()

err = msgpack.DecodeMsgPack(t.Payload(), &data)
err = msgpack.DecodeMsgPack(t.Payload(), data)
if err != nil {
err = json.Unmarshal(t.Payload(), &data)
err = json.Unmarshal(t.Payload(), data)
if err != nil {
deps.TracerBackend.Capture(ctx, "event.delivery.error", attributes, traceStartTime, time.Now())
return &DeliveryError{Err: err}
Expand Down
4 changes: 4 additions & 0 deletions worker/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type EventDelivery struct {
ProjectID string
}

func (e *EventDelivery) Reset() {
*e = EventDelivery{}
}

type EventDeliveryConfig struct {
project *datastore.Project
subscription *datastore.Subscription
Expand Down
Loading