Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
bb57674
docs(track-3): add observability & load-testing design docs and handoff
RndmCodeGuy20 Jun 30, 2026
f08526a
feat(tracing): propagate trace context across the outbox→Redis boundary
RndmCodeGuy20 Jun 30, 2026
4c0f49d
feat(tracing): instrument the Python worker end-to-end
RndmCodeGuy20 Jun 30, 2026
f1538ee
feat(observability): correlate logs with traces (API + worker)
RndmCodeGuy20 Jun 30, 2026
0c605e1
fix(metrics): API route label, export interval, buckets & DB pool gauges
RndmCodeGuy20 Jun 30, 2026
1d6541a
feat(observability): resource pinning, SLO rules & Prometheus wiring
RndmCodeGuy20 Jun 30, 2026
fd90278
fix(grafana): provision dashboards from the correct directory
RndmCodeGuy20 Jun 30, 2026
4d85f56
feat(grafana): add Track 3 dashboards
RndmCodeGuy20 Jun 30, 2026
4e2ecd7
feat(loadtest): add k6 load harness
RndmCodeGuy20 Jun 30, 2026
9a67d69
docs(experiments): worker saturation experiment (0001)
RndmCodeGuy20 Jun 30, 2026
74c8091
docs(roadmap): mark Track 3 done and re-prioritize from exp 0001
RndmCodeGuy20 Jun 30, 2026
bd71daf
feat(metrics): add db.query.duration fine-bucket view + NewTestMetric…
RndmCodeGuy20 Jun 30, 2026
4fa5070
feat(webhook): concurrent delivery + wired delivery metrics (Track 1b)
RndmCodeGuy20 Jun 30, 2026
c4a5f2d
feat(worker): bounded concurrent pool + XAUTOCLAIM recovery + DLQ (Tr…
RndmCodeGuy20 Jun 30, 2026
a054993
chore(loadtest): parameterize overlay for A/B + capture mode + fix re…
RndmCodeGuy20 Jun 30, 2026
26b8e7a
docs(experiments): concurrent worker (0002) + webhook throughput (0003)
RndmCodeGuy20 Jun 30, 2026
8b99034
docs(roadmap): mark Track 1 + 1b done; prep Track 2 (autoscaling) han…
RndmCodeGuy20 Jun 30, 2026
05656e2
docs(readme): reflect concurrent worker, recovery/DLQ & webhook throu…
RndmCodeGuy20 Jun 30, 2026
75c9391
docs(track-2): correct queue-depth signal claim in handoff
RndmCodeGuy20 Jun 30, 2026
00883da
docs(track-4): add multi-tenancy/auth handoff; note Track 2 needs k8s
RndmCodeGuy20 Jun 30, 2026
1872758
feat(auth,tenancy): add multi-tenant auth, quotas, idempotency and st…
RndmCodeGuy20 Jul 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ REDIS_WRITE_TIMEOUT=10
# ─── Security ─────────────────────────────────────────────────────────────────
# AES-256 key — must be exactly 32 characters
ENCRYPTION_KEY=LKyGslR3InLES/EYQiJZcW06KFNMoevUd6kehjtrxPA=
# Separate AES-256 key for encrypting webhook secrets at rest (also exactly 32
# characters). Keep this DISTINCT from ENCRYPTION_KEY so leaking one key does
# not compromise the other. Falls back to ENCRYPTION_KEY when unset.
WEBHOOK_ENCRYPTION_KEY=Hh1pVq8sTn4mWz2bKx7dRf3yLc6gJe0a

# ─── Storage ──────────────────────────────────────────────────────────────────
# Set BUCKET_PROVIDER to "gcs" or "s3"
Expand Down Expand Up @@ -90,6 +94,23 @@ LOG_LEVEL=INFO
AUTO_MIGRATE=false
# Python worker: override default migrations directory
# MIGRATIONS_DIR=/path/to/migrations
# Required to apply migration versions 7 and 8 on first bootstrap of a fresh
# database — they drop or alter existing user data (webhook_registrations,
# assets.owner_id). Set to "true" ONLY when bootstrapping a clean database;
# NEVER set this on a database that already contains data — apply the
# migrations manually instead and review the SQL before running it.
MIGRATION_ALLOW_DESTRUCTIVE=false

# ─── Idempotency ──────────────────────────────────────────────────────────────
# How long a stored Idempotency-Key + response is replayable (Go duration).
IDEMPOTENCY_TTL=24h

# ─── Quotas & rate limits (per tenant) ────────────────────────────────────────
# Sustained per-tenant request rate (req/s) and token-bucket burst on presign.
TENANT_RATE_LIMIT_RPS=10
TENANT_RATE_LIMIT_BURST=20
# Max assets a tenant may own (0 = unlimited). Over-quota presigns return 403.
TENANT_ASSET_QUOTA=0

# ─── Worker ───────────────────────────────────────────────────────────────────
WORKER_ID=
Expand Down
111 changes: 88 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ A lightweight, scalable media processing pipeline built with Go and Python. MPip
## 🌟 Features

- **RESTful API Server** - High-performance Go server built with Chi router
- **Asynchronous Processing** - Redis Streams job queue for scalable media processing
- **Concurrent Processing** - Redis Streams job queue with a **bounded worker pool** (`MAX_CONCURRENT_JOBS`) for parallel media processing — ~2.4× throughput vs single-threaded in load tests
- **Resilient delivery** - **`XAUTOCLAIM`** consumer-group recovery reclaims messages from dead workers, and poison/over-retried messages are routed to a **dead-letter stream** (`media:jobs:dlq`) instead of being dropped
- **Pluggable Storage** - GCS and S3/MinIO (any S3-compatible store) behind a single provider abstraction, selected by config
- **Image Processing** - Automatic generation of optimized, content-addressed image variants (resize, re-encode, format conversion)
- **Video Processing** - Poster generation, 720p transcode, and preview clips
- **Database-Backed** - PostgreSQL as the durable source of truth for assets, variants, and jobs
- **Webhooks** - Registration and delivery tracking tables for outbound event notifications
- **Observability** - OpenTelemetry tracing + metrics on the API, Prometheus metrics on the worker, with a bundled Grafana/Tempo/Loki/Prometheus stack
- **Webhooks** - Registration + **concurrent** signed delivery (`WEBHOOK_CONCURRENCY`) with HMAC signatures, exponential-backoff retries, and delivery tracking
- **Observability** - OpenTelemetry tracing + metrics on the API, Prometheus metrics on the worker, with a bundled Grafana/Tempo/Loki/Prometheus stack and a host-run k6 load harness
- **Docker & Kubernetes Ready** - Multi-stage images and manifests for containerized deployment

## 🏗️ Architecture
Expand Down Expand Up @@ -46,10 +47,15 @@ Two-service pipeline communicating over **Redis Streams** (`media:jobs`). Postgr
2. Go server creates the asset + job and returns a presigned upload URL
3. Client uploads the raw file directly to object storage
4. Client marks the asset uploaded; the job is enqueued on the Redis stream
5. Python worker consumes the job, processes media (resize, transcode, optimize)
5. The Python worker consumes jobs **concurrently** (a bounded pool of `MAX_CONCURRENT_JOBS`), processing media (resize, transcode, optimize)
6. Variants are written back to object storage (deduplicated by content hash)
7. Database is updated with asset status and variant metadata

**Resilience:** the worker uses Redis Streams consumer-group semantics — each
message is acked only after its job succeeds, dead-consumer messages are reclaimed
with `XAUTOCLAIM`, and poison/over-retried messages are moved to a dead-letter
stream (`media:jobs:dlq`) for inspection/replay rather than being dropped.

## 📋 Prerequisites

- **Go** 1.24 or higher
Expand Down Expand Up @@ -89,12 +95,15 @@ DB_PASSWORD=your_password
DB_NAME=mpiper
DB_SSL_MODE=false
AUTO_MIGRATE=true # run embedded SQL migrations on startup
MIGRATION_ALLOW_DESTRUCTIVE=true # required on first bootstrap — see warning below

# Redis (transport for the job stream)
REDIS_CONNECTION_STRING=redis://localhost:6379/0

# Security (must be exactly 32 bytes)
ENCRYPTION_KEY=change_me_to_a_32_byte_secret____
# Separate 32-byte key for webhook secrets (falls back to ENCRYPTION_KEY if unset)
WEBHOOK_ENCRYPTION_KEY=change_me_to_a_diff_32_byte_secret

# Storage — pick a provider
BUCKET_PROVIDER=gcs # gcs | s3
Expand All @@ -117,14 +126,38 @@ S3_PUBLIC_ENDPOINT_URL=http://localhost:9000
# Worker
STREAM_NAME=media:jobs
JOB_POLL_INTERVAL=1
MAX_CONCURRENT_JOBS=5
MAX_CONCURRENT_JOBS=5 # bounded worker-pool size; set ≈ CPU cores per worker
RECOVERY_MIN_IDLE_MS=120000 # idle threshold before XAUTOCLAIM reclaims a stuck message
STREAM_DLQ_NAME=media:jobs:dlq
SHUTDOWN_DRAIN_TIMEOUT=30 # seconds to drain in-flight jobs on SIGTERM

# Webhooks
WEBHOOK_CONCURRENCY=10 # concurrent signed deliveries per dispatcher tick
WEBHOOK_BATCH_SIZE=50
WEBHOOK_POLL_INTERVAL=2s
WEBHOOK_MAX_ATTEMPTS=5
```

> **Tuning `MAX_CONCURRENT_JOBS`:** media work is partly CPU-bound (Pillow/ffmpeg),
> so set it close to the worker's CPU-core count. Going much higher *oversubscribes*
> the cores and reduces throughput — load tests showed `mcj=8` on 4 cores was slower
> than `mcj=4`. Size worker memory to the pool, not the single-threaded baseline.

> The worker reads the same `S3_*` variables as the Go server (falling back to `BUCKET_*`), so one `.env` drives both services.

### 3. Set Up the Database

Migrations run automatically on startup when `AUTO_MIGRATE=true` — both the Go server and the Python worker apply the embedded SQL migrations. To apply them manually instead:
Migrations run automatically on startup when `AUTO_MIGRATE=true` — both the Go server and the Python worker apply the embedded SQL migrations.

> **Destructive migrations are gated.** Versions `000007_split_webhook_key` and
> `000008_assets_owner_not_null` drop or alter existing user data
> (`webhook_registrations`, `assets.owner_id`). Both runners refuse to apply
> them unless `MIGRATION_ALLOW_DESTRUCTIVE=true` is set. Set it for local
> bootstrap on a fresh database, but **never** set it on a database that
> already contains production data — apply those migrations by hand and review
> the SQL first.

To apply them manually instead:

```bash
createdb mpiper
Expand Down Expand Up @@ -163,22 +196,20 @@ python -m worker # worker

### 6. Test the API

All `/api/v1` routes require a Bearer token — an AES-256-GCM token carrying a
user id, signed with `ENCRYPTION_KEY` (see [`pkg/utils/crypt.go`](pkg/utils/crypt.go)).
Mint one for local testing:
All `/api/v1` routes require a Bearer **API key** — a scoped, revocable key
(`mp_<prefix>_<secret>`) stored SHA-256-hashed at rest (see
[`pkg/utils/apikey.go`](pkg/utils/apikey.go)). Mint one for a tenant with the
CLI (it prints the key **once**):

```bash
TOKEN=$(python3 - <<'PY'
import base64, os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
key = b"change_me_to_a_32_byte_secret____" # your 32-byte ENCRYPTION_KEY
nonce = os.urandom(12)
ct = AESGCM(key).encrypt(nonce, b"demo-user", None)
print(base64.urlsafe_b64encode(nonce + ct).rstrip(b"=").decode())
PY
)
TOKEN="$(go run ./cmd/mint-api-key --tenant demo-user)"
# optional: --expires 720h --scopes assets:write,webhooks:write
```

> The CLI connects to the database using your environment config (`.env.local`
> in development). For the fully containerized demo, the bundled scripts seed a
> key directly into the running Postgres — see **Run the demo** below.

Request a presigned upload URL:

```bash
Expand Down Expand Up @@ -271,6 +302,34 @@ All `/api/v1` routes require an `Authorization: Bearer <token>` header (see
> For MinIO it is `S3_PUBLIC_ENDPOINT_URL` (the client-facing endpoint), so the
> URL is reachable from wherever the client runs — see [Storage Providers](#storage-providers).

#### Idempotency

`POST /storage/presign` (and the `complete` endpoint) accept an optional
`Idempotency-Key` header so client retries don't create duplicate assets. The
first request for a given key runs normally and its response is stored
(per-tenant, 24h TTL by default — `IDEMPOTENCY_TTL`); a retry with the **same
key and same body** replays the stored response verbatim (with
`Idempotent-Replayed: true`). Reusing a key with a **different body** returns
`422`, and a duplicate that arrives while the first is still in flight returns
`409`.

```bash
curl -X POST http://localhost:5010/api/v1/storage/presign \
-H "Authorization: Bearer $TOKEN" \
-H "Idempotency-Key: 9f1c0b2a-..." \
-H "Content-Type: application/json" \
-d '{ "fileName": "image.jpg", "contentType": "image/jpeg", "size": 1024000 }'
```

#### Rate limits & quotas

Presign is rate-limited **per tenant** (token bucket, `TENANT_RATE_LIMIT_RPS`
sustained / `TENANT_RATE_LIMIT_BURST` burst); exceeding it returns `429` with a
`Retry-After` header. An optional per-tenant asset quota
(`TENANT_ASSET_QUOTA`, `0` = unlimited) returns `403` once a tenant is at its
cap. Limits are isolated per tenant — one tenant hitting its limit does not
affect another.

### Mark an asset complete (enqueue processing)

**Endpoint:** `GET /api/v1/assets/{assetId}/complete`
Expand Down Expand Up @@ -299,8 +358,9 @@ Register an endpoint to receive processing-lifecycle events.

Deliveries are signed: each POST carries an `X-Webhook-Signature: sha256=<hmac>`
header computed over the JSON body using your registration `secret` (stored
encrypted at rest). A background dispatcher delivers pending events with
exponential-backoff retries and tracks them in the `webhook_deliveries` table.
encrypted at rest). A background dispatcher delivers pending events **concurrently**
(bounded by `WEBHOOK_CONCURRENCY`) with exponential-backoff retries and tracks them
in the `webhook_deliveries` table.

```bash
curl -X POST http://localhost:5010/api/v1/webhooks \
Expand Down Expand Up @@ -331,8 +391,8 @@ produce variants, fetches a variant back over HTTP, and asserts the
`job.starting → job.started → job.done` webhooks were delivered. It prints a
PASS/FAIL summary and exits non-zero on any failure.

Requirements on the host: `bash`, `curl`, `jq`, `docker`, and a `python3` with
the `cryptography` package (used only to mint the auth token).
Requirements on the host: `bash`, `curl`, `jq`, `docker`, and a `python3`
(stdlib only — used to mint an API key seeded into the containerized Postgres).

## 🔧 Development

Expand All @@ -359,7 +419,7 @@ mpiper/
│ └── utils/
│ └── storagex/ # Storage abstraction (GCS, S3/MinIO)
├── worker/
│ ├── consumer/ # Redis Streams consumer + config
│ ├── consumer/ # Redis Streams consumer (bounded pool, XAUTOCLAIM recovery, DLQ) + config
│ ├── processing/ # Image/video processing
│ ├── storage/ # Storage adapters (base ABC, GCS, S3) + factory
│ └── utils/ # Worker utilities (metrics)
Expand Down Expand Up @@ -477,6 +537,11 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file
- [x] Support for AWS S3 / MinIO storage
- [x] Webhook delivery with HMAC signing + retry tracking
- [x] Video transcoding with FFmpeg (poster, 720p, preview)
- [x] Concurrent worker pool (`MAX_CONCURRENT_JOBS`) — ~2.4× throughput
- [x] `XAUTOCLAIM` stream recovery + dead-letter stream for poison messages
- [x] Concurrent webhook delivery (`WEBHOOK_CONCURRENCY`)
- [x] End-to-end OpenTelemetry tracing, SLOs, Grafana dashboards + k6 load harness
- [ ] Queue-depth autoscaling (KEDA) — *next*
- [ ] Support for Azure Blob Storage
- [ ] Admin dashboard
- [ ] Batch processing API
Expand Down
115 changes: 115 additions & 0 deletions cmd/mint-api-key/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Command mint-api-key inserts a new API key for a tenant and prints the
// plaintext key exactly once. There is no HTTP admin surface — keys are minted
// out-of-band with this tool.
//
// Usage:
//
// go run ./cmd/mint-api-key --tenant demo-user [--env development]
// [--expires 720h] [--scopes assets:write,webhooks:write]
package main

import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"

"github.com/rndmcodeguy20/mpiper/internal/config"
"github.com/rndmcodeguy20/mpiper/internal/database"
"github.com/rndmcodeguy20/mpiper/internal/repository"
"github.com/rndmcodeguy20/mpiper/pkg/utils"
tenantpkg "github.com/rndmcodeguy20/mpiper/pkg/utils/tenant"
"go.uber.org/zap"
)

func main() {
var (
tenant = flag.String("tenant", "", "tenant id the key authenticates as (required)")
env = flag.String("env", envOr("ENV", "development"), "config environment (development|staging|production)")
expires = flag.Duration("expires", 0, "optional validity window, e.g. 720h; 0 means never expires")
scopes = flag.String("scopes", "", "optional comma-separated scopes")
)
flag.Parse()

if *tenant == "" {
fmt.Fprintln(os.Stderr, "error: --tenant is required")
flag.Usage()
os.Exit(2)
}
if !tenantpkg.IsValidSlug(*tenant) {
fmt.Fprintf(os.Stderr, "error: --tenant %q is not a valid tenant identifier (allowed: a-z, 0-9, _, -; max 64 chars)\n", *tenant)
flag.Usage()
os.Exit(2)
}

cfg, err := config.InitializeConfig(config.ToEnvironment(*env))
if err != nil {
fatalf("load config: %v", err)
}
config.Init(cfg)

db, err := database.NewPostgresDB(cfg.DB)
if err != nil {
fatalf("connect db: %v", err)
}
defer func() { _ = db.Close() }()

mat, err := utils.GenerateAPIKey()
if err != nil {
fatalf("generate key: %v", err)
}

var scopeList []string
if s := strings.TrimSpace(*scopes); s != "" {
for _, sc := range strings.Split(s, ",") {
if t := strings.TrimSpace(sc); t != "" {
scopeList = append(scopeList, t)
}
}
}

var expiresAt *time.Time
if *expires > 0 {
t := time.Now().Add(*expires).UTC()
expiresAt = &t
}

repo := repository.NewAPIKeyRepository(db, zap.NewNop())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

id, err := repo.Create(ctx, *tenant, mat.Hash, mat.Prefix, scopeList, expiresAt)
if err != nil {
fatalf("insert key: %v", err)
}

exp := "never"
if expiresAt != nil {
exp = expiresAt.Format(time.RFC3339)
}

// Human-readable summary to stderr; the bare key to stdout so it can be
// captured cleanly: KEY=$(go run ./cmd/mint-api-key --tenant t)
fmt.Fprintf(os.Stderr, "API key created\n")
fmt.Fprintf(os.Stderr, " id: %s\n", id)
fmt.Fprintf(os.Stderr, " tenant: %s\n", *tenant)
fmt.Fprintf(os.Stderr, " prefix: %s\n", mat.Prefix)
fmt.Fprintf(os.Stderr, " scopes: %v\n", scopeList)
fmt.Fprintf(os.Stderr, " expires: %s\n", exp)
fmt.Fprintf(os.Stderr, " (the key below is shown ONCE and is not recoverable)\n")
fmt.Println(mat.Full)
}

func fatalf(format string, args ...any) {
fmt.Fprintf(os.Stderr, "mint-api-key: "+format+"\n", args...)
os.Exit(1)
}

func envOr(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
Loading