diff --git a/.env.example b/.env.example index 3ac171e..c6e7134 100644 --- a/.env.example +++ b/.env.example @@ -41,6 +41,10 @@ REDIS_WRITE_TIMEOUT=10 # ─── Security ───────────────────────────────────────────────────────────────── # AES-256 key — must be exactly 32 characters ENCRYPTION_KEY=LKyGslR3InLES/EYQiJZcW06KFNMoevUd6kehjtrxPA= +# Separate AES-256 key for encrypting webhook secrets at rest (also exactly 32 +# characters). Keep this DISTINCT from ENCRYPTION_KEY so leaking one key does +# not compromise the other. Falls back to ENCRYPTION_KEY when unset. +WEBHOOK_ENCRYPTION_KEY=Hh1pVq8sTn4mWz2bKx7dRf3yLc6gJe0a # ─── Storage ────────────────────────────────────────────────────────────────── # Set BUCKET_PROVIDER to "gcs" or "s3" @@ -90,6 +94,23 @@ LOG_LEVEL=INFO AUTO_MIGRATE=false # Python worker: override default migrations directory # MIGRATIONS_DIR=/path/to/migrations +# Required to apply migration versions 7 and 8 on first bootstrap of a fresh +# database — they drop or alter existing user data (webhook_registrations, +# assets.owner_id). Set to "true" ONLY when bootstrapping a clean database; +# NEVER set this on a database that already contains data — apply the +# migrations manually instead and review the SQL before running it. +MIGRATION_ALLOW_DESTRUCTIVE=false + +# ─── Idempotency ────────────────────────────────────────────────────────────── +# How long a stored Idempotency-Key + response is replayable (Go duration). +IDEMPOTENCY_TTL=24h + +# ─── Quotas & rate limits (per tenant) ──────────────────────────────────────── +# Sustained per-tenant request rate (req/s) and token-bucket burst on presign. +TENANT_RATE_LIMIT_RPS=10 +TENANT_RATE_LIMIT_BURST=20 +# Max assets a tenant may own (0 = unlimited). Over-quota presigns return 403. +TENANT_ASSET_QUOTA=0 # ─── Worker ─────────────────────────────────────────────────────────────────── WORKER_ID= diff --git a/README.md b/README.md index 80a0a4e..d0ac973 100644 --- a/README.md +++ b/README.md @@ -10,13 +10,14 @@ A lightweight, scalable media processing pipeline built with Go and Python. MPip ## 🌟 Features - **RESTful API Server** - High-performance Go server built with Chi router -- **Asynchronous Processing** - Redis Streams job queue for scalable media processing +- **Concurrent Processing** - Redis Streams job queue with a **bounded worker pool** (`MAX_CONCURRENT_JOBS`) for parallel media processing — ~2.4× throughput vs single-threaded in load tests +- **Resilient delivery** - **`XAUTOCLAIM`** consumer-group recovery reclaims messages from dead workers, and poison/over-retried messages are routed to a **dead-letter stream** (`media:jobs:dlq`) instead of being dropped - **Pluggable Storage** - GCS and S3/MinIO (any S3-compatible store) behind a single provider abstraction, selected by config - **Image Processing** - Automatic generation of optimized, content-addressed image variants (resize, re-encode, format conversion) - **Video Processing** - Poster generation, 720p transcode, and preview clips - **Database-Backed** - PostgreSQL as the durable source of truth for assets, variants, and jobs -- **Webhooks** - Registration and delivery tracking tables for outbound event notifications -- **Observability** - OpenTelemetry tracing + metrics on the API, Prometheus metrics on the worker, with a bundled Grafana/Tempo/Loki/Prometheus stack +- **Webhooks** - Registration + **concurrent** signed delivery (`WEBHOOK_CONCURRENCY`) with HMAC signatures, exponential-backoff retries, and delivery tracking +- **Observability** - OpenTelemetry tracing + metrics on the API, Prometheus metrics on the worker, with a bundled Grafana/Tempo/Loki/Prometheus stack and a host-run k6 load harness - **Docker & Kubernetes Ready** - Multi-stage images and manifests for containerized deployment ## 🏗️ Architecture @@ -46,10 +47,15 @@ Two-service pipeline communicating over **Redis Streams** (`media:jobs`). Postgr 2. Go server creates the asset + job and returns a presigned upload URL 3. Client uploads the raw file directly to object storage 4. Client marks the asset uploaded; the job is enqueued on the Redis stream -5. Python worker consumes the job, processes media (resize, transcode, optimize) +5. The Python worker consumes jobs **concurrently** (a bounded pool of `MAX_CONCURRENT_JOBS`), processing media (resize, transcode, optimize) 6. Variants are written back to object storage (deduplicated by content hash) 7. Database is updated with asset status and variant metadata +**Resilience:** the worker uses Redis Streams consumer-group semantics — each +message is acked only after its job succeeds, dead-consumer messages are reclaimed +with `XAUTOCLAIM`, and poison/over-retried messages are moved to a dead-letter +stream (`media:jobs:dlq`) for inspection/replay rather than being dropped. + ## 📋 Prerequisites - **Go** 1.24 or higher @@ -89,12 +95,15 @@ DB_PASSWORD=your_password DB_NAME=mpiper DB_SSL_MODE=false AUTO_MIGRATE=true # run embedded SQL migrations on startup +MIGRATION_ALLOW_DESTRUCTIVE=true # required on first bootstrap — see warning below # Redis (transport for the job stream) REDIS_CONNECTION_STRING=redis://localhost:6379/0 # Security (must be exactly 32 bytes) ENCRYPTION_KEY=change_me_to_a_32_byte_secret____ +# Separate 32-byte key for webhook secrets (falls back to ENCRYPTION_KEY if unset) +WEBHOOK_ENCRYPTION_KEY=change_me_to_a_diff_32_byte_secret # Storage — pick a provider BUCKET_PROVIDER=gcs # gcs | s3 @@ -117,14 +126,38 @@ S3_PUBLIC_ENDPOINT_URL=http://localhost:9000 # Worker STREAM_NAME=media:jobs JOB_POLL_INTERVAL=1 -MAX_CONCURRENT_JOBS=5 +MAX_CONCURRENT_JOBS=5 # bounded worker-pool size; set ≈ CPU cores per worker +RECOVERY_MIN_IDLE_MS=120000 # idle threshold before XAUTOCLAIM reclaims a stuck message +STREAM_DLQ_NAME=media:jobs:dlq +SHUTDOWN_DRAIN_TIMEOUT=30 # seconds to drain in-flight jobs on SIGTERM + +# Webhooks +WEBHOOK_CONCURRENCY=10 # concurrent signed deliveries per dispatcher tick +WEBHOOK_BATCH_SIZE=50 +WEBHOOK_POLL_INTERVAL=2s +WEBHOOK_MAX_ATTEMPTS=5 ``` +> **Tuning `MAX_CONCURRENT_JOBS`:** media work is partly CPU-bound (Pillow/ffmpeg), +> so set it close to the worker's CPU-core count. Going much higher *oversubscribes* +> the cores and reduces throughput — load tests showed `mcj=8` on 4 cores was slower +> than `mcj=4`. Size worker memory to the pool, not the single-threaded baseline. + > The worker reads the same `S3_*` variables as the Go server (falling back to `BUCKET_*`), so one `.env` drives both services. ### 3. Set Up the Database -Migrations run automatically on startup when `AUTO_MIGRATE=true` — both the Go server and the Python worker apply the embedded SQL migrations. To apply them manually instead: +Migrations run automatically on startup when `AUTO_MIGRATE=true` — both the Go server and the Python worker apply the embedded SQL migrations. + +> **Destructive migrations are gated.** Versions `000007_split_webhook_key` and +> `000008_assets_owner_not_null` drop or alter existing user data +> (`webhook_registrations`, `assets.owner_id`). Both runners refuse to apply +> them unless `MIGRATION_ALLOW_DESTRUCTIVE=true` is set. Set it for local +> bootstrap on a fresh database, but **never** set it on a database that +> already contains production data — apply those migrations by hand and review +> the SQL first. + +To apply them manually instead: ```bash createdb mpiper @@ -163,22 +196,20 @@ python -m worker # worker ### 6. Test the API -All `/api/v1` routes require a Bearer token — an AES-256-GCM token carrying a -user id, signed with `ENCRYPTION_KEY` (see [`pkg/utils/crypt.go`](pkg/utils/crypt.go)). -Mint one for local testing: +All `/api/v1` routes require a Bearer **API key** — a scoped, revocable key +(`mp__`) stored SHA-256-hashed at rest (see +[`pkg/utils/apikey.go`](pkg/utils/apikey.go)). Mint one for a tenant with the +CLI (it prints the key **once**): ```bash -TOKEN=$(python3 - <<'PY' -import base64, os -from cryptography.hazmat.primitives.ciphers.aead import AESGCM -key = b"change_me_to_a_32_byte_secret____" # your 32-byte ENCRYPTION_KEY -nonce = os.urandom(12) -ct = AESGCM(key).encrypt(nonce, b"demo-user", None) -print(base64.urlsafe_b64encode(nonce + ct).rstrip(b"=").decode()) -PY -) +TOKEN="$(go run ./cmd/mint-api-key --tenant demo-user)" +# optional: --expires 720h --scopes assets:write,webhooks:write ``` +> The CLI connects to the database using your environment config (`.env.local` +> in development). For the fully containerized demo, the bundled scripts seed a +> key directly into the running Postgres — see **Run the demo** below. + Request a presigned upload URL: ```bash @@ -271,6 +302,34 @@ All `/api/v1` routes require an `Authorization: Bearer ` header (see > For MinIO it is `S3_PUBLIC_ENDPOINT_URL` (the client-facing endpoint), so the > URL is reachable from wherever the client runs — see [Storage Providers](#storage-providers). +#### Idempotency + +`POST /storage/presign` (and the `complete` endpoint) accept an optional +`Idempotency-Key` header so client retries don't create duplicate assets. The +first request for a given key runs normally and its response is stored +(per-tenant, 24h TTL by default — `IDEMPOTENCY_TTL`); a retry with the **same +key and same body** replays the stored response verbatim (with +`Idempotent-Replayed: true`). Reusing a key with a **different body** returns +`422`, and a duplicate that arrives while the first is still in flight returns +`409`. + +```bash +curl -X POST http://localhost:5010/api/v1/storage/presign \ + -H "Authorization: Bearer $TOKEN" \ + -H "Idempotency-Key: 9f1c0b2a-..." \ + -H "Content-Type: application/json" \ + -d '{ "fileName": "image.jpg", "contentType": "image/jpeg", "size": 1024000 }' +``` + +#### Rate limits & quotas + +Presign is rate-limited **per tenant** (token bucket, `TENANT_RATE_LIMIT_RPS` +sustained / `TENANT_RATE_LIMIT_BURST` burst); exceeding it returns `429` with a +`Retry-After` header. An optional per-tenant asset quota +(`TENANT_ASSET_QUOTA`, `0` = unlimited) returns `403` once a tenant is at its +cap. Limits are isolated per tenant — one tenant hitting its limit does not +affect another. + ### Mark an asset complete (enqueue processing) **Endpoint:** `GET /api/v1/assets/{assetId}/complete` @@ -299,8 +358,9 @@ Register an endpoint to receive processing-lifecycle events. Deliveries are signed: each POST carries an `X-Webhook-Signature: sha256=` header computed over the JSON body using your registration `secret` (stored -encrypted at rest). A background dispatcher delivers pending events with -exponential-backoff retries and tracks them in the `webhook_deliveries` table. +encrypted at rest). A background dispatcher delivers pending events **concurrently** +(bounded by `WEBHOOK_CONCURRENCY`) with exponential-backoff retries and tracks them +in the `webhook_deliveries` table. ```bash curl -X POST http://localhost:5010/api/v1/webhooks \ @@ -331,8 +391,8 @@ produce variants, fetches a variant back over HTTP, and asserts the `job.starting → job.started → job.done` webhooks were delivered. It prints a PASS/FAIL summary and exits non-zero on any failure. -Requirements on the host: `bash`, `curl`, `jq`, `docker`, and a `python3` with -the `cryptography` package (used only to mint the auth token). +Requirements on the host: `bash`, `curl`, `jq`, `docker`, and a `python3` +(stdlib only — used to mint an API key seeded into the containerized Postgres). ## 🔧 Development @@ -359,7 +419,7 @@ mpiper/ │ └── utils/ │ └── storagex/ # Storage abstraction (GCS, S3/MinIO) ├── worker/ -│ ├── consumer/ # Redis Streams consumer + config +│ ├── consumer/ # Redis Streams consumer (bounded pool, XAUTOCLAIM recovery, DLQ) + config │ ├── processing/ # Image/video processing │ ├── storage/ # Storage adapters (base ABC, GCS, S3) + factory │ └── utils/ # Worker utilities (metrics) @@ -477,6 +537,11 @@ This project is licensed under the MIT License - see the [LICENSE](LICENSE) file - [x] Support for AWS S3 / MinIO storage - [x] Webhook delivery with HMAC signing + retry tracking - [x] Video transcoding with FFmpeg (poster, 720p, preview) +- [x] Concurrent worker pool (`MAX_CONCURRENT_JOBS`) — ~2.4× throughput +- [x] `XAUTOCLAIM` stream recovery + dead-letter stream for poison messages +- [x] Concurrent webhook delivery (`WEBHOOK_CONCURRENCY`) +- [x] End-to-end OpenTelemetry tracing, SLOs, Grafana dashboards + k6 load harness +- [ ] Queue-depth autoscaling (KEDA) — *next* - [ ] Support for Azure Blob Storage - [ ] Admin dashboard - [ ] Batch processing API diff --git a/cmd/mint-api-key/main.go b/cmd/mint-api-key/main.go new file mode 100644 index 0000000..c3f0bae --- /dev/null +++ b/cmd/mint-api-key/main.go @@ -0,0 +1,115 @@ +// Command mint-api-key inserts a new API key for a tenant and prints the +// plaintext key exactly once. There is no HTTP admin surface — keys are minted +// out-of-band with this tool. +// +// Usage: +// +// go run ./cmd/mint-api-key --tenant demo-user [--env development] +// [--expires 720h] [--scopes assets:write,webhooks:write] +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strings" + "time" + + "github.com/rndmcodeguy20/mpiper/internal/config" + "github.com/rndmcodeguy20/mpiper/internal/database" + "github.com/rndmcodeguy20/mpiper/internal/repository" + "github.com/rndmcodeguy20/mpiper/pkg/utils" + tenantpkg "github.com/rndmcodeguy20/mpiper/pkg/utils/tenant" + "go.uber.org/zap" +) + +func main() { + var ( + tenant = flag.String("tenant", "", "tenant id the key authenticates as (required)") + env = flag.String("env", envOr("ENV", "development"), "config environment (development|staging|production)") + expires = flag.Duration("expires", 0, "optional validity window, e.g. 720h; 0 means never expires") + scopes = flag.String("scopes", "", "optional comma-separated scopes") + ) + flag.Parse() + + if *tenant == "" { + fmt.Fprintln(os.Stderr, "error: --tenant is required") + flag.Usage() + os.Exit(2) + } + if !tenantpkg.IsValidSlug(*tenant) { + fmt.Fprintf(os.Stderr, "error: --tenant %q is not a valid tenant identifier (allowed: a-z, 0-9, _, -; max 64 chars)\n", *tenant) + flag.Usage() + os.Exit(2) + } + + cfg, err := config.InitializeConfig(config.ToEnvironment(*env)) + if err != nil { + fatalf("load config: %v", err) + } + config.Init(cfg) + + db, err := database.NewPostgresDB(cfg.DB) + if err != nil { + fatalf("connect db: %v", err) + } + defer func() { _ = db.Close() }() + + mat, err := utils.GenerateAPIKey() + if err != nil { + fatalf("generate key: %v", err) + } + + var scopeList []string + if s := strings.TrimSpace(*scopes); s != "" { + for _, sc := range strings.Split(s, ",") { + if t := strings.TrimSpace(sc); t != "" { + scopeList = append(scopeList, t) + } + } + } + + var expiresAt *time.Time + if *expires > 0 { + t := time.Now().Add(*expires).UTC() + expiresAt = &t + } + + repo := repository.NewAPIKeyRepository(db, zap.NewNop()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + id, err := repo.Create(ctx, *tenant, mat.Hash, mat.Prefix, scopeList, expiresAt) + if err != nil { + fatalf("insert key: %v", err) + } + + exp := "never" + if expiresAt != nil { + exp = expiresAt.Format(time.RFC3339) + } + + // Human-readable summary to stderr; the bare key to stdout so it can be + // captured cleanly: KEY=$(go run ./cmd/mint-api-key --tenant t) + fmt.Fprintf(os.Stderr, "API key created\n") + fmt.Fprintf(os.Stderr, " id: %s\n", id) + fmt.Fprintf(os.Stderr, " tenant: %s\n", *tenant) + fmt.Fprintf(os.Stderr, " prefix: %s\n", mat.Prefix) + fmt.Fprintf(os.Stderr, " scopes: %v\n", scopeList) + fmt.Fprintf(os.Stderr, " expires: %s\n", exp) + fmt.Fprintf(os.Stderr, " (the key below is shown ONCE and is not recoverable)\n") + fmt.Println(mat.Full) +} + +func fatalf(format string, args ...any) { + fmt.Fprintf(os.Stderr, "mint-api-key: "+format+"\n", args...) + os.Exit(1) +} + +func envOr(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} diff --git a/cmd/server/main.go b/cmd/server/main.go index bb4f886..c9f3fc3 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "database/sql" "errors" "fmt" "net/http" @@ -100,7 +101,7 @@ func main() { if cfg.AutoMigrate { baseLogger.Info("AUTO_MIGRATE=true: running migrations") - if err := database.RunMigrations(db.DB); err != nil { + if err := database.RunMigrations(db.DB, cfg.MigrationAllowDestructive); err != nil { baseLogger.Sugar().Fatalf("Migration failed: %v", err) } baseLogger.Info("Migrations applied successfully") @@ -125,6 +126,13 @@ func main() { _ = m.RegisterOutboxPendingFunc(func(ctx context.Context) (int64, error) { return outboxRepo.CountPending(ctx) }) + + // Observe the database connection-pool stats (in-use / idle / open / max / + // wait count). sqlx.DB embeds *sql.DB, so db.Stats() exposes pool saturation + // — the key signal for whether the DB pool is a bottleneck under load. + _ = m.RegisterDBStatsFunc(func() sql.DBStats { + return db.Stats() + }) go relay.Start(serverCtx) go relay.StartCleanup(serverCtx, cfg.Outbox.Retention) @@ -134,9 +142,10 @@ func main() { BatchSize: cfg.Webhook.BatchSize, Timeout: cfg.Webhook.Timeout, MaxAttempts: cfg.Webhook.MaxAttempts, - EncryptionKey: cfg.EncryptionKey, + EncryptionKey: cfg.WebhookEncryptionKey, Retention: cfg.Webhook.Retention, - }) + Concurrency: cfg.Webhook.Concurrency, + }, m) go webhookDispatcher.Start(serverCtx) go webhookDispatcher.StartCleanup(serverCtx) @@ -146,6 +155,31 @@ func main() { return count, err }) + // --- Idempotency key TTL sweep --- + // Periodically purge expired idempotency keys so the table doesn't grow + // unbounded. Interval is a fraction of the TTL (min 1 minute). + idempotencyRepo := repository.NewIdempotencyRepository(db, baseLogger) + go func() { + interval := cfg.IdempotencyTTL / 24 + if interval < time.Minute { + interval = time.Minute + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-serverCtx.Done(): + return + case <-ticker.C: + if n, err := idempotencyRepo.DeleteExpired(serverCtx); err != nil { + baseLogger.Sugar().Errorf("idempotency sweep failed: %v", err) + } else if n > 0 { + baseLogger.Sugar().Infof("idempotency sweep: deleted %d expired keys", n) + } + } + } + }() + srv := server.NewServer(db, cfg, m) go func() { if err := srv.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { diff --git a/deploy/k8s/secrets.yaml b/deploy/k8s/secrets.yaml index 9997941..e1967a2 100644 --- a/deploy/k8s/secrets.yaml +++ b/deploy/k8s/secrets.yaml @@ -12,6 +12,12 @@ stringData: # Redis password (if required) REDIS_PASSWORD: "" + # AES-256 keys — each must be exactly 32 bytes. Keep WEBHOOK_ENCRYPTION_KEY + # DISTINCT from ENCRYPTION_KEY (auth vs webhook-secret encryption); it falls + # back to ENCRYPTION_KEY only when unset. + ENCRYPTION_KEY: "CHANGE_ME_TO_A_32_BYTE_SECRET___" + WEBHOOK_ENCRYPTION_KEY: "CHANGE_ME_TO_A_DIFF_32_BYTE_KEY_" + # GCS Service Account JSON (base64 encode your actual JSON file) # Create with: kubectl create secret generic mpiper-secrets --from-file=gcs-credentials=./service-account.json -n mpiper --- diff --git a/docker-compose.loadtest.yml b/docker-compose.loadtest.yml new file mode 100644 index 0000000..499e1db --- /dev/null +++ b/docker-compose.loadtest.yml @@ -0,0 +1,68 @@ +# ============================================================================ +# MPiper — Load-test overlay (Track 3, Phase 0) +# +# docker compose \ +# -f docker-compose.yml \ +# -f docker-compose.observability.yml \ +# -f docker-compose.loadtest.yml \ +# up -d --build +# +# Why this exists +# --------------- +# Local load-test results are only interpretable when resources are PINNED, so +# the bottleneck is a stable, observable fact rather than something that moves +# run to run with spare laptop cores. This overlay caps CPU/memory on `api` and +# `worker` and forces TRACE_SAMPLING_RATE=1.0 (every asset traced). +# +# Tunable knobs (env, with baseline defaults) — flip these for an A/B on the +# SAME binary without editing this file or adding new overlays: +# +# WORKER_CPUS worker CPU limit (default 1.0) +# WORKER_MEM worker memory limit (default 1024M) +# MAX_CONCURRENT_JOBS worker pool size (default 1 → serial baseline) +# WEBHOOK_CONCURRENCY webhook delivery fan-out (default 1 → serial baseline) +# JOB_POLL_INTERVAL worker idle poll (s) (default 1) +# +# Example A/B (concurrent worker + webhooks), same image, fixed core budget: +# BEFORE: WORKER_CPUS=4 MAX_CONCURRENT_JOBS=1 WEBHOOK_CONCURRENCY=1 docker compose … up -d --force-recreate worker api +# AFTER: WORKER_CPUS=4 MAX_CONCURRENT_JOBS=8 WEBHOOK_CONCURRENCY=10 docker compose … up -d --force-recreate worker api +# +# Defaults reproduce the single-threaded Track-1 baseline. Record whatever knob +# values you used alongside every experiment — they change the ceiling. +# ============================================================================ + +name: mpiper + +services: + api: + environment: + # Full sampling locally — never lose a trace to the 0.1 code default. + TRACE_SAMPLING_RATE: "1.0" + # Webhook delivery fan-out. Default 1 = serial baseline; raise for the A/B. + WEBHOOK_CONCURRENCY: "${WEBHOOK_CONCURRENCY:-1}" + deploy: + resources: + limits: + cpus: "1.0" + memory: 512M + reservations: + cpus: "0.25" + memory: 128M + + worker: + environment: + # Full sampling locally; matches the API so the whole trace is captured. + TRACE_SAMPLING_RATE: "1.0" + # Worker pool size. Default 1 = single-threaded baseline; raise for the A/B. + MAX_CONCURRENT_JOBS: "${MAX_CONCURRENT_JOBS:-1}" + JOB_POLL_INTERVAL: "${JOB_POLL_INTERVAL:-1}" + deploy: + resources: + limits: + # Default 1 CPU surfaces the single-threaded baseline; set WORKER_CPUS + # higher (e.g. 4) to give the pool real cores for the concurrency A/B. + cpus: "${WORKER_CPUS:-1.0}" + memory: "${WORKER_MEM:-1024M}" + reservations: + cpus: "0.5" + memory: 256M diff --git a/docker-compose.observability.yml b/docker-compose.observability.yml index 65d4760..7ff8300 100644 --- a/docker-compose.observability.yml +++ b/docker-compose.observability.yml @@ -21,7 +21,7 @@ services: # Grafana Tempo - Distributed Tracing Backend # ========================================================================== tempo: - image: grafana/tempo:latest + image: grafana/tempo:2.6.1 container_name: mpiper-tempo command: ["-config.file=/etc/tempo.yaml"] volumes: @@ -102,8 +102,14 @@ services: - '--web.console.libraries=/usr/share/prometheus/console_libraries' - '--web.console.templates=/usr/share/prometheus/consoles' - '--web.enable-lifecycle' + # Accept k6's Prometheus remote-write output (Track 3, Phase 4) so client- + # side load metrics land in the same Prometheus as the server-side ones. + - '--web.enable-remote-write-receiver' + # Exemplar storage powers the histogram-bucket -> Tempo trace links. + - '--enable-feature=exemplar-storage' volumes: - ./observability/prometheus.yml:/etc/prometheus/prometheus.yml + - ./observability/prometheus.rules.yml:/etc/prometheus/prometheus.rules.yml - prometheus-data:/prometheus ports: - "9090:9090" diff --git a/docker-compose.webhooks.yml b/docker-compose.webhooks.yml index e9b5200..5b7d6eb 100644 --- a/docker-compose.webhooks.yml +++ b/docker-compose.webhooks.yml @@ -9,7 +9,9 @@ services: image: mendhak/http-https-echo:latest container_name: mpiper-webhook-receiver ports: - - "8888:8080" + # Host 8899 (not 8888 — that collides with the otel-collector). Internal + # delivery uses the docker-network name http://webhook-receiver:8080. + - "8899:8080" environment: HTTP_PORT: 8080 networks: diff --git a/docs/arch/abr-transcoding-pipeline.md b/docs/arch/abr-transcoding-pipeline.md new file mode 100644 index 0000000..80fb1d2 --- /dev/null +++ b/docs/arch/abr-transcoding-pipeline.md @@ -0,0 +1,94 @@ +# ADR 0003: Adaptive Bitrate Transcoding Pipeline + +## Status +Accepted + +## Context +Current pipeline produces single 720p H.264 output (`worker/processing/videos.py:transcode_720p`). Need multi-bitrate HLS/DASH for adaptive streaming across devices and network conditions. + +## Decision + +### ABR Ladder (H.264 baseline → high profiles) +| Variant | Resolution | Bitrate | Audio | Profile | Use Case | +|---------|------------|---------|-------|---------|----------| +| 1080p | 1920×1080 | 5000k | 192k | high | Desktop, TV | +| 720p | 1280×720 | 2800k | 128k | high | Tablet, Mobile WiFi | +| 480p | 854×480 | 1400k | 128k | main | Mobile 3G/4G | +| 360p | 640×360 | 800k | 96k | baseline| Mobile 2G, fallback | +| 240p | 426×240 | 400k | 64k | baseline| Audio-only fallback | + +### Encoding Strategy +- **Single-pass filter_complex** — one ffmpeg invocation encodes all rungs simultaneously via `split` + parallel `scale` + `encode` filtergraph +- **Segment duration**: 6s (VOD), 2s (Live/LL-HLS) +- **Segment format**: fMP4 (CMAF) for HLS + DASH shared segments +- **Manifests**: HLS v7 (`#EXT-X-VERSION:7`) + DASH MPD (ISO BMFF) +- **Deduplication**: Content-hash based (already implemented in `processor.py:check_for_duplicate`) + +### Hardware Acceleration +| Platform | Encoder | Detection | +|----------|---------|-----------| +| NVIDIA GPU | h264_nvenc | `nvidia-smi` / `ffmpeg -encoders` | +| Intel QSV | h264_qsv | `vainfo` / `ffmpeg -encoders` | +| AMD VCN | h264_amf | `rocminfo` / `ffmpeg -encoders` | +| Apple VT | h264_videotoolbox | `system_profiler` / `ffmpeg -encoders` | +| Fallback | libx264 | Always available | + +Selection logic: prefer HW encoder → fallback to libx264 `preset=fast`/`crf=22`. + +### Output Structure (Object Storage) +``` +media/{owner_id}/processed/{asset_id}/ +├── hls/ +│ ├── master.m3u8 +│ ├── 1080p/index.m3u8 + seg_*.m4s +│ ├── 720p/index.m3u8 + seg_*.m4s +│ ├── 480p/index.m3u8 + seg_*.m4s +│ ├── 360p/index.m3u8 + seg_*.m4s +│ └── 240p/index.m3u8 + seg_*.m4s +└── dash/ + ├── manifest.mpd + └── (shared fMP4 segments via BaseURL) +``` + +### Database Schema Extensions +```sql +-- Add to variants.video +ALTER TABLE variants.video ADD COLUMN IF NOT EXISTS manifest_url TEXT; +ALTER TABLE variants.video ADD COLUMN IF NOT EXISTS abr_ladder JSONB; -- stores ladder config used +ALTER TABLE variants.video ADD COLUMN IF NOT EXISTS segment_duration INT DEFAULT 6; +ALTER TABLE variants.video ADD COLUMN IF NOT EXISTS codec_profile TEXT; -- baseline/main/high +``` + +## Consequences + +### Positive +- 3-5x faster than sequential encodes (single decode, single filtergraph) +- True ABR playback on all HLS/DASH clients +- CMAF single-storage for both HLS and DASH +- Content-hash deduplication works across ABR variants + +### Negative +- Requires GPU instances for cost-effective HD+ encoding +- Manifest generation adds eventual consistency (segments appear before manifest updates) +- Higher storage: ~2.5× single 720p + +### Risks +- ffmpeg filter_complex syntax is brittle; need integration tests per codec/hwaccel combo +- HW encoder availability varies by cloud provider/instance type + +## Implementation Plan + +| Phase | Task | Files | +|-------|------|-------| +| P0 | Single-pass ABR filtergraph generator | `worker/processing/abr.py` | +| P0 | HLS/DASH manifest writer (fMP4 segments) | `worker/processing/manifests.py` | +| P0 | HW acceleration detection + fallback | `worker/processing/hwaccel.py` | +| P0 | Integrate into `process_video_file` | `worker/processing/videos.py` | +| P1 | Per-title encoding (convex hull analysis) | `worker/processing/per_title.py` | +| P1 | Cost tracking per asset (CPU-sec, GB-egress) | `internal/metrics/cost.go` | +| P2 | DRM integration (Widevine/FairPlay/PlayReady) | `worker/processing/drm.py` | +| P2 | Thumbnail sprites + WebVTT for scrubbing | `worker/processing/sprites.py` | + +## Related ADRs +- ADR 0001: Ingress Outbox & Idempotent Consumer (`ingress-outbox-and-idempotent-consumer.md`) +- ADR 0002: Reliability & Correctness (`reliability-and-correctness.md`) \ No newline at end of file diff --git a/docs/arch/image-pipeline-advancements.md b/docs/arch/image-pipeline-advancements.md new file mode 100644 index 0000000..cb16a54 --- /dev/null +++ b/docs/arch/image-pipeline-advancements.md @@ -0,0 +1,178 @@ +# ADR 0004: Image Pipeline Advancements + +## Status +Proposed + +## Context +Current pipeline (`worker/processing/images.py`) generates 3 fixed WebP variants (thumbnail 256², display_small 512w, display_large 1280w). This wastes bandwidth on simple images, upscales small images, and lacks modern formats (AVIF, JPEG XL). + +## Decision + +### Phase P0 — Immediate Wins (2-3 days) + +#### 1. Modern Format Stack +Encode each variant as AVIF → JPEG XL → WebP → JPEG (progressive), serve via `Accept` header negotiation. + +| Format | MIME | Pillow/lib | Quality/Speed | Browser Support | +|--------|------|------------|---------------|-----------------| +| AVIF | image/avif | pillow-avif-plugin / libavif | q=50, speed=6 | Chrome 85+, Firefox 93+, Safari 16.4+ | +| JPEG XL| image/jxl | libjxl / pillow-jxl | q=50, effort=7 | Chrome 110+ (flag), Safari 17+ (flag) | +| WebP | image/webp | Pillow (built-in) | q=75, method=6 | Universal | +| JPEG | image/jpeg | Pillow (built-in) | q=82, progressive, optimize | Universal | + +#### 2. Adaptive Variant Ladder (Per-Image) +Replace fixed widths with content-aware targets: + +```python +def compute_adaptive_variants(src_width: int, src_height: int, mime: str) -> list[dict]: + aspect = src_width / src_height + mpx = (src_width * src_height) / 1_000_000 + + # Target megapixels per role (no upscale > 1.2x) + targets = [0.07, 0.3, 1.0, 3.0] # thumb, small, medium, large + variants = [] + + for i, tgt in enumerate(targets): + if tgt > mpx * 1.2: + break + w = int((tgt * 1_000_000 * aspect) ** 0.5) + h = int(w / aspect) + variants.append({ + "role": ["thumbnail", "small", "medium", "large"][i], + "width": w, "height": h, + "format": "avif", # primary; others generated as fallbacks + "quality": 55 + i * 5, + }) + return variants +``` + +#### 3. Parallel Variant Generation +```python +from concurrent.futures import ThreadPoolExecutor + +def process_image_file_parallel(asset_id, owner_id, path, content_hash, pg_pool, storage, cfg): + with Image.open(path) as img: + variants = compute_adaptive_variants(img.width, img.height, mime) + + with ThreadPoolExecutor(max_workers=cfg.image_workers) as pool: + futures = { + pool.submit(encode_and_upload, img, v, asset_id, owner_id, storage, pg_pool): v + for v in variants + } + for fut in as_completed(futures): + fut.result() # propagate exceptions +``` + +### Phase P1 — Smart Operations (1-2 weeks) + +#### 4. Smart Crop (Saliency / Focal Point) +- **Option A**: OpenCV spectral residual saliency (fast, no model) +- **Option B**: Client-provided focal point `{x: 0.5, y: 0.3}` in upload metadata +- Apply to `thumbnail` and `small` variants (center-crop → content-aware crop) + +#### 5. Animated Image Support +- Detect GIF/WebP/APNG animation via `Image.is_animated` +- Re-encode as animated WebP (method=6, lossless=false) → 70-80% size reduction vs GIF +- Extract first frame as static poster (WebP/AVIF) +- Store frame count, loop count, duration in `variants.image` metadata + +#### 6. Color Space Normalization +- `ImageOps.exif_transpose()` for orientation +- Convert to sRGB ICC profile; preserve Display P3 if source has it +- Optional: soft-proof for sRGB gamut mapping + +### Phase P2 — Frontend Integration & Quality Assurance (1 week) + +#### 7. Responsive HTML Generator +```python +def generate_picture_html(variants: list[dict], base_url: str, alt: str = "") -> str: + """Generate with format fallbacks + srcset.""" + by_role = {} + for v in variants: + by_role.setdefault(v["role"], []).append(v) + + sources = [] + for role, fmts in by_role.items(): + for fmt in fmts: + sources.append( + f'' + ) + sources.append(f'{alt}') + return "\n" + "\n".join(sources) + "\n" +``` + +#### 8. Perceptual Quality Metrics +- Integrate SSIMULACRA2 (Google) or Butteraugli for automated QA +- Fail build if variant quality drops below threshold +- Store metric scores in `variants.image.quality_score` + +## Database Schema Extensions + +```sql +ALTER TABLE variants.image ADD COLUMN IF NOT EXISTS format_set TEXT[]; -- ['avif','webp','jpeg'] +ALTER TABLE variants.image ADD COLUMN IF NOT EXISTS quality_score REAL; -- SSIMULACRA2 score +ALTER TABLE variants.image ADD COLUMN IF NOT EXISTS is_animated BOOLEAN DEFAULT FALSE; +ALTER TABLE variants.image ADD COLUMN IF NOT EXISTS frame_count INT; +ALTER TABLE variants.image ADD COLUMN IF NOT EXISTS focal_point_x REAL; +ALTER TABLE variants.image ADD COLUMN IF NOT EXISTS focal_point_y REAL; +``` + +## Storage Key Structure (Backward Compatible) + +``` +media/{owner_id}/processed/{asset_id}/ +├── thumbnail.avif +├── thumbnail.webp +├── thumbnail.jpg +├── small.avif +├── small.webp +├── small.jpg +├── medium.avif +├── medium.webp +├── medium.jpg +├── large.avif +├── large.webp +├── large.jpg +├── poster.avif -- for animated sources +├── poster.webp +└── animation.webp -- animated WebP (replaces GIF) +``` + +## Consequences + +### Positive +- 30-50% bandwidth reduction (AVIF vs WebP) +- No wasted upscales (adaptive ladder) +- 3-4x faster processing (parallel Pillow) +- Animated GIF → WebP: 80% size reduction +- Drop-in `` HTML for frontend + +### Negative +- AVIF encoding slower (~3x WebP); mitigate with `speed=6` + parallel workers +- JPEG XL browser support still behind flags +- More storage variants (4 formats × 4 roles = 16 files vs 3) +- OpenCV dependency for saliency (optional) + +### Risks +- Pillow AVIF plugin requires `libavif` system package +- ThreadPoolExecutor GIL contention on CPU-heavy Pillow ops (mitigated: C extensions release GIL) +- Accept header parsing complexity at CDN edge + +## Implementation Plan + +| Phase | Task | Files | Effort | +|-------|------|-------|--------| +| P0 | AVIF/WebP/JXL encoder + format negotiation | `worker/processing/formats.py` | 1 day | +| P0 | Adaptive variant ladder | `worker/processing/adaptive_images.py` | 1 day | +| P0 | Parallel processing + integrate | `worker/processing/images.py` | 0.5 day | +| P1 | Smart crop (saliency/focal) | `worker/processing/smart_crop.py` | 2 days | +| P1 | Animated image support | `worker/processing/animated.py` | 1 day | +| P1 | Color space normalization | `worker/processing/color.py` | 1 day | +| P2 | `` HTML generator | `worker/processing/responsive.py` | 0.5 day | +| P2 | SSIMULACRA2 quality gate | `worker/processing/quality.py` | 2 days | + +## Related ADRs +- ADR 0003: ABR Transcoding Pipeline (`abr-transcoding-pipeline.md`) +- ADR 0001: Ingress Outbox & Idempotent Consumer (`ingress-outbox-and-idempotent-consumer.md`) \ No newline at end of file diff --git a/docs/enhancements/README.md b/docs/enhancements/README.md new file mode 100644 index 0000000..015a1c8 --- /dev/null +++ b/docs/enhancements/README.md @@ -0,0 +1,219 @@ +# MPiper Enhancements — Roadmap + +This directory tracks the work that takes MPiper from a well-built side project +to a production-grade media platform. Each **track** is chosen to teach a +distinct, transferable systems-engineering concept *and* to add real product +value — not feature-padding. + +The philosophy: **write a design doc per track before coding** (problem, options, +decision, tradeoffs, how success is measured), and pair every track with a +load test or chaos experiment so each claim ("now it scales", "now it's +exactly-once") is *demonstrated*, not assumed. + +> **Progress:** Track 3 (observability + load testing) ✅, **Track 1 (concurrent +> worker + XAUTOCLAIM recovery + DLQ) ✅**, and **Track 1b (webhook delivery +> throughput) ✅** are done. Track 3 shipped the foundation that makes everything +> measurable — tracing, SLOs, dashboards, a k6 harness. Track 1 then turned the #1 +> bottleneck into a **measured 2.37× worker throughput win** (0.73 → 1.73 jobs/s, +> 1 → ~3.2 cores; see [`experiments/0002`](../../experiments/0002-concurrent-worker.md)), +> and Track 1b wired the webhook delivery metrics + concurrent fan-out (see +> [`experiments/0003`](../../experiments/0003-webhook-throughput.md)). **Next: Track 2 +> (autoscaling)** — now unblocked, since there is finally a concurrent worker to scale. + +## Where we are today + +A clean, correct, **single-tenant, best-effort, single-node-throughput** pipeline +with good bones — now fully observable: + +- Transactional enqueue via an **outbox relay** (Postgres → Redis Streams). +- An **idempotent-ish consumer** with content-hash dedup. +- **Presigned uploads** with a split internal/public storage endpoint. +- **Webhooks** with HMAC signing + exponential backoff. +- **End-to-end distributed tracing** (API → outbox → Redis → worker → ffmpeg, one + waterfall per asset), **OTel metrics** on API and worker, **SLO recording rules**, + provisioned **Grafana dashboards**, and a host-run **k6 load harness** — on the + bundled Grafana/Tempo/Loki/Prometheus stack. + +Known seams where "side project" becomes "system" (verified in code, and now +several of them **measured** under load): + +- **~~Single-threaded worker~~ ✅ RESOLVED (Track 1)** — now a bounded + `ThreadPoolExecutor` honouring `MAX_CONCURRENT_JOBS`. *Measured:* μ rose from + ~1.1 → **1.73 jobs/s (2.37×)** and worker CPU from 1 → ~3.2 cores at `mcj=4`. + Lesson banked: tune `MAX_CONCURRENT_JOBS ≈ cores` — `mcj=8` on 4 cores + *oversubscribed* and was slower. +- **~~Webhook dispatcher can't keep up~~ ✅ RESOLVED (Track 1b)** — concurrent + `errgroup` fan-out (`WEBHOOK_CONCURRENCY`) + tuned HTTP transport, and the + previously-unrecorded `webhook_delivery_*` metrics are now wired. *Note:* at + local scale (fast receiver, CPU-pinned API) the dispatcher kept up even + serially, so the win here is observability + headroom; see + [`experiments/0003`](../../experiments/0003-webhook-throughput.md). +- **~~Homegrown recovery~~ ✅ RESOLVED (Track 1)** — replaced the DB-scan + re-`XADD` + with `XAUTOCLAIM` consumer-group recovery, and added a **dead-letter stream** + (`media:jobs:dlq`) with failure metadata + a depth gauge for poison/over-retried + messages (previously dropped/unacked-forever). +- **No raw-upload lifecycle** — objects in `media/raw/` are never deleted. + *Measured:* ~**50%** of presigned uploads are never completed → orphaned objects + accumulate. +- **Homegrown auth** — an AES-GCM token with no expiry/rotation, and the same + `ENCRYPTION_KEY` signs both auth tokens and webhook secrets. +- **Polled high-churn tables** (`jobs`, `event_outbox`, `webhook_deliveries`), grown + unbounded with cleanup-by-retention only. *Measured:* `event_outbox` kept up with + **0 backlog** and the DB had headroom (**18 ms** mean query, **5/25** connections); + only `webhook_deliveries` actually strained. + +## What the first load test proved (exp 0001) + +Track 3 gave us the instrumentation to stop guessing. The first saturating run +(`open --rate 10/s`, CPU-pinned worker) turned the seams above into a **measured, +ranked** list — and every track below now has a baseline to beat by re-running the +*same* k6 profile and comparing the dashboards. + +| Finding (measured) | What it means | Owner | +|---|---|---| +| Worker μ ≈ **1.1 jobs/s**, CPU 98%, queue → **2,544** | Single-threaded worker is the throughput ceiling | **Track 1 (P0)** | +| `webhook_pending` → **5,901**, never drains | Dispatcher delivery rate ≪ insertion rate | **Track 1b (P1, new)** | +| `event_outbox` **0 backlog**; DB **18 ms** mean, **5/25** conns | Outbox + DB have large headroom *today* | Track 7 → **defer** | +| `webhook_deliveries` is the one polled table straining | The real, current trigger for data-layer work | Track 7 → **rescope to this** | +| **~50%** of presigns never completed → orphaned `media/raw/` | Storage grows with abandoned uploads | Track 5 (small) | +| `/complete` p99 **358 ms** (synchronous MinIO HEAD) | Minor hot-path tail | Track 5 (small) | + +Net effect: **Track 1 is confirmed P0**, a **webhook-throughput bottleneck was +surfaced that no track owned** (now Track 1b), and **Track 7's table-partitioning is +premature** — the DB isn't the problem yet; the webhook *delivery loop* is. + +## Tracks + +| # | Track | Core systems lesson | Status | +|---|-------|---------------------|--------| +| 1 | [Concurrent worker + proper stream recovery + DLQ](track-01-concurrent-worker.md) | Concurrency models, at-least-once recovery, poison-message handling, head-of-line blocking | **done ✅ (2.37× — exp 0002)** | +| 1b | Webhook delivery throughput *(surfaced by exp 0001)* | Concurrent I/O-bound delivery, backpressure on a side-channel, decoupling fan-out from job completion | **done ✅ (exp 0003)** | +| 2 | [Queue-depth autoscaling](track-02-handoff.md) | Backpressure, control loops, Little's Law, SLO-driven capacity | **ready — needs k8s (deferred until a cluster is available)** | +| 3 | [End-to-end tracing, SLOs & local load testing](track-03-observability-and-load.md) | Context propagation across async boundaries, the three pillars, SLO/SLI/error budgets, load-test methodology | **done ✅** | +| 4 | [Multi-tenancy, auth & quotas](track-04-handoff.md) | AuthN vs AuthZ, key rotation, the idempotency pattern, tenant isolation | planned | +| 5 | [Production ingestion pipeline](track-05-ingestion.md) | Resumable/multipart uploads, pipeline stages, defense-in-depth, trust boundaries | planned | +| 6 | [Adaptive streaming + CDN](track-06-adaptive-streaming.md) | ABR streaming, CDN cache/invalidation, edge auth, encoding cost/quality tradeoffs | planned | +| 7 | [Data layer at scale](track-07-data-layer.md) | Table partitioning, CDC vs polling, index design under write load | **deferred — rescope to `webhook_deliveries`** | +| 8 | [Resilience & correctness verification](track-08-resilience.md) | Failure-mode analysis, exactly-once in practice, replay attacks, chaos engineering | planned | + +> Track 3 is the only track with a full design doc checked in, because it was built +> first. Now that it's done, every track below is **measurable**: implement, re-run +> the same k6 profile, compare dashboards, and record an `experiments/NNNN-*.md` +> writeup. "It scales" is a claim we can prove, not assert. + +## Recommended sequence (re-prioritized from exp 0001 data) + +1. **~~Track 1 — concurrent worker + DLQ + stream recovery.~~ ✅ DONE.** + Was the P0 throughput ceiling (μ ≈ 1.1 jobs/s). Now a bounded pool: **measured + 2.37×** (0.73 → 1.73 jobs/s), multi-core, 100% success, live DLQ. See + `experiments/0002`. Lesson: set `MAX_CONCURRENT_JOBS ≈ cores`. +2. **~~Track 1b — webhook delivery throughput.~~ ✅ DONE.** Concurrent fan-out + + wired `webhook_delivery_*` metrics + transport tuning. Not the bottleneck at + local scale (kept up serially), so the win is observability + headroom; see + `experiments/0003`. To prove it under stress, re-run with a latency-bearing + receiver. +3. **Track 2 — autoscaling. ← NEXT.** Now unblocked: there is finally a concurrent + worker to scale. Drive worker replica count off the queue-lag signal we already + expose (KEDA; k8s manifests exist). *Verify:* a backlog → scale-up → drain cycle. + Carry the Track 1 lesson forward — each replica runs its own pool, so set + `MAX_CONCURRENT_JOBS ≈ cores-per-pod` and scale *pods*, not threads. See + [`track-02-handoff.md`](track-02-handoff.md). +4. **Track 4 — multi-tenancy + idempotency + auth.** The leap to "real users". +5. **Track 6 — adaptive streaming + CDN.** The headline product feature. +6. **Track 5 — ingestion.** Includes the small wins exp 0001 surfaced: abandoned-upload + lifecycle (~50% orphaned `media/raw/`) and the `/complete` MinIO-HEAD tail. +7. **Track 7 — data layer.** **Deferred and rescoped.** DB/outbox have headroom today; + revisit when volume justifies, scoped first to `webhook_deliveries` churn (the one + polled table that actually strained) rather than blanket partitioning. +8. **Track 8 — resilience & correctness.** Depth once the throughput tracks land. + +> **~~Track 3 follow-ups~~ ✅ DONE** (folded into Track 1b): `webhook_delivery_*` +> metrics wired, `db.query.duration` fine-bucket view added, `storage_operation_*` +> confirmed already recorded. Histogram-bucket standardization remains a watch-item +> when reading p95 across a deploy window. + +--- + +## Track catalog (summaries) + +### Track 1 — Concurrent worker + proper stream recovery + DLQ +**Gap:** one job at a time; a 3s video blocks a 200ms thumbnail. Recovery scans +Postgres and re-`XADD`s instead of using consumer-group delivery state. +**Move:** bounded worker pool (process pool for CPU-bound ffmpeg/Pillow vs async +for I/O — *choosing which is the lesson*); honour `MAX_CONCURRENT_JOBS` as a +semaphore; `XAUTOCLAIM`/`XPENDING` to reclaim dead-consumer messages; a +**dead-letter stream** for messages past the attempt cap; priority lanes so small +jobs don't queue behind large transcodes. +**Teaches:** thread vs process vs async, the GIL, CPU vs I/O bound, at-least-once +recovery, poison-message handling, head-of-line blocking. + +### Track 1b — Webhook delivery throughput *(surfaced by exp 0001)* +**Gap:** the dispatcher polls every 2s, batch 50, and delivers webhooks with +*synchronous* HTTP + retries on a single loop. Each job emits 3 events +(`job.starting/started/done`), so insertion rate ≫ delivery rate — the load test +drove `webhook_pending` to ~5,900 with no recovery. Delivery is also under- +instrumented: `webhook_delivery_total/duration/failures` are defined but never +recorded, so only the `pending` gauge revealed the backlog. +**Move:** a bounded pool of concurrent delivery workers (I/O-bound → async/threads +fits); decouple fan-out from job completion; wire the delivery metrics + a +delivery-latency SLI. Optionally move webhook rows onto their own stream consumer +rather than a DB poll. +**Teaches:** concurrency for I/O-bound work, backpressure on a side-channel, +decoupling producers from slow consumers, instrumenting before optimizing. + +### Track 2 — Queue-depth autoscaling +**Gap:** static worker count; bursts grow latency unbounded, idle wastes capacity. +**Move:** expose stream lag + oldest-message-age (extend the existing relay-lag +metric); drive **KEDA** (k8s manifests already exist) to scale workers on lag; +load-test the backlog → scale → drain cycle. +**Teaches:** backpressure, control loops, latency- vs queue-depth-based scaling, +Little's Law (L = λW), capacity planning. + +### Track 4 — Multi-tenancy, auth & quotas +**Gap:** homegrown AES token (no expiry/rotation, shared key with webhook secrets); +single bucket, path-prefixed; no idempotency keys (retried presign = duplicate asset). +**Move:** OIDC/JWT (asymmetric keys, expiry, JWKS rotation) or scoped API keys; +separate webhook-signing secret; org→project→asset model with repository-layer +row scoping and per-tenant storage prefixes/credentials; **idempotency keys** on +`presign`/`complete`; per-tenant **quotas + rate limits** with usage accounting. +**Teaches:** authN vs authZ, key management/rotation, the idempotency pattern, +tenant isolation, security blast-radius. + +### Track 5 — Production ingestion pipeline +**Gap:** single presigned `PUT`, 500MB cap, no resumability, MIME-only validation, +no scanning. Plus (from exp 0001) **no lifecycle for abandoned uploads** — ~50% of +presigns never complete, orphaning `media/raw/` objects. +**Move:** S3 **multipart/resumable** uploads with part-level retry; a validation +stage with real content sniffing (`python-magic` is already a dep); optional +**ClamAV** malware scan as a stage; dedup *before* full download via verified +client-supplied hash; a TTL/lifecycle sweep for un-completed raw uploads. +**Teaches:** large-file transfer, pipeline/stage design, defense-in-depth, trust +boundaries (never trust client content-type). + +### Track 6 — Adaptive streaming + CDN +**Gap:** one fixed 720p MP4 at hardcoded 2500kbps, served straight from MinIO. +**Move:** generate an **HLS/DASH adaptive ladder** (multiple renditions + manifest — +`variants.video.manifest_url` already exists in the schema); serve via **CDN** with +signed URLs + cache-control; content-aware/per-title encoding decisions. +**Teaches:** adaptive bitrate streaming, CDN cache strategy + invalidation, edge +signed-URL access control, encoding cost/quality tradeoffs. + +### Track 7 — Data layer at scale *(deferred — see exp 0001)* +**Gap:** `jobs`, `event_outbox`, `webhook_deliveries` polled and growing. The load +test showed the DB and outbox have **headroom today** (18 ms mean query, 5/25 +connections, 0 outbox backlog), so blanket partitioning is premature — but +`webhook_deliveries` is the one table that genuinely strained. +**Move:** start narrow — partition/clean `webhook_deliveries` and replace its poll +with `LISTEN/NOTIFY` or a stream consumer (overlaps Track 1b). Broaden to the other +tables (monthly partitions; drop instead of DELETE; read replicas; CDC) only when +volume justifies it. +**Teaches:** partitioning, CDC vs polling, write-heavy index design, pool sizing. + +### Track 8 — Resilience & correctness verification +**Gap:** unit + integration tests exist, but no proof of survival under failure/load. +**Move:** **fault injection / chaos** (kill the worker mid-transcode, pause Redis, +fill the disk — verify processed-once holds); **load tests** with latency budgets in +CI; **webhook contract tests** + replay protection (sign a timestamp, reject stale +deliveries — today a captured payload replays forever). +**Teaches:** failure-mode analysis, exactly-once vs at-least-once in practice, +replay attacks, reliability as a tested property. diff --git a/docs/enhancements/track-01-handoff.md b/docs/enhancements/track-01-handoff.md new file mode 100644 index 0000000..7b51d4e --- /dev/null +++ b/docs/enhancements/track-01-handoff.md @@ -0,0 +1,330 @@ +# Track 1 + 1b — Session Handoff (start here) + +**Purpose:** everything a fresh conversation needs to begin **Track 1 (concurrent +worker + stream recovery + DLQ)** and **Track 1b (webhook delivery throughput)** +without prior context. Read this top to bottom, then open +`track-01-concurrent-worker.md` for the full design. This doc is the *operational* +companion: where things are, how to run them, what the baseline is, and the +landmines already discovered. It assumes **Track 3 is done** — tracing, SLOs, +dashboards, and the k6 harness all exist, so every change here is measurable. + +--- + +## 1. What MPiper is (60-second orientation) + +A media-processing pipeline: a **Go API** (`cmd/server`, `internal/`) accepts +uploads and a **Python worker** (`worker/`) processes them. They communicate over +**Redis Streams** (`media:jobs`, group `worker-group`). **Postgres** is the +durable source of truth; **MinIO** (S3-compatible) stores objects. Webhooks notify +clients of job lifecycle events. + +**Asset flow:** +`POST /api/v1/storage/presign` → client `PUT`s file to MinIO → +`GET /api/v1/assets/{id}/complete` (writes asset `uploaded` + job + outbox row + +`job.starting` webhook rows in one tx) → **outbox relay** (1s poll) publishes to +Redis → **worker** consumes → image (3 webp variants) or video (poster + 720p + +preview) → variants written to MinIO + Postgres, asset `ready` → worker inserts +`job.started`/`job.done` webhook rows → **webhook dispatcher** (2s poll) delivers +signed POSTs. + +--- + +## 2. The goals in one sentence each + +- **Track 1:** make the worker process **N jobs concurrently** (it does 1 today), + recover dead-consumer messages with **Redis Streams' own `XPENDING`/`XAUTOCLAIM`** + instead of a DB scan, and route poison messages to a **dead-letter stream** — so + the worker's service rate scales with cores and a single bad/large job can't stall + the pipeline. +- **Track 1b:** make the **webhook dispatcher deliver concurrently** (it delivers + serially today) and **wire its delivery metrics**, so `webhook_pending` drains + instead of growing unboundedly. + +Both are throughput fixes for the two bottlenecks the Track 3 load test proved. + +--- + +## 3. The baseline to beat (exp 0001, verified) + +From `experiments/0001-worker-saturation.md` (open model, `--rate 10/s`, worker +pinned to 1 CPU). Re-run the **same** profile after each track and compare. + +| Signal | Baseline | Target after the track | +|---|---|---| +| Worker service rate μ | **~1.1 jobs/s** | scales ~N× with the pool (until CPU-bound) | +| Worker CPU | **98%** (1 core, pegged) | utilizes all allotted cores | +| Peak queue depth | **2,544 and growing** | stabilizes (drains at λ ≤ μ) | +| Asset proc p50 / mean | **0.86 s / 1.76 s** | unchanged per-job; throughput is the win | +| `webhook_pending` peak | **~5,901, never drains** | drains to ~0 (Track 1b) | +| Job success rate | 100% | stays 100% (no double-processing) | +| API presign p95 / complete p99 | 48 ms / 358 ms | unaffected (API isn't the bottleneck) | +| DB | 18 ms mean, 5/25 conns, 0 waits | watch pool as worker concurrency rises | + +> **Watch the DB pool** as you add worker concurrency: N concurrent jobs × the +> per-job DB calls will raise in-use connections. The new `mpiper_db_connections_*` +> gauges (added during Track 3 follow-up) will show it. + +--- + +## 4. Exact engineering targets — Track 1 (worker) + +Verify each before editing. + +**The single-threaded loop:** +- `worker/consumer/main.py` `main()` — the loop is `while not shutdown: processed = + consumer.consume(stream); if not processed: sleep(job_poll_interval)`. One message + at a time, inline. +- `worker/consumer/consumer.py` `consume()` — `xreadgroup(..., count=1, block=5000)`, + then dispatches inline via `_handle_job` / `_handle_asset_message`. +- `worker/consumer/config.py` — `max_concurrent_jobs` (`MAX_CONCURRENT_JOBS`, default + 5) **exists but is never used**. This is the semaphore size to honour. + +**Concurrency model (this choice *is* the lesson):** +- Work is **CPU-bound**: Pillow (`images.py`) and ffmpeg (`videos.py`, via + `subprocess`). ffmpeg runs in a separate process (true parallelism regardless of + the GIL); Pillow releases the GIL for most ops. So a **thread pool** may suffice, + but a **process pool** gives guaranteed parallelism for the Python-side work. + Decide and document the tradeoff (GIL, memory, startup cost, picklability). +- Read `count=N` (or keep `count=1` and dispatch to a bounded pool); cap in-flight at + `MAX_CONCURRENT_JOBS`. + +**Invariants that MUST survive concurrency:** +- **Per-message ack.** Today `consume()` acks after the single job. With a pool, + track `msg_id` per task and `XACK` only that message on its own success; leave + failed ones unacked (they stay in the PEL for recovery). +- **Idempotent claim.** `_handle_job` claims a job with `SELECT ... FOR UPDATE` and + checks `status == 'done'`. Concurrent consumers must each claim distinct rows; + don't weaken the row lock. Content-hash dedup (`check_for_duplicate`) also guards + double work. +- **Asset state ownership.** `_handle_job` (not the processor) owns the + `failed`/`ready` transition — preserve that (see DEV-34 comment). +- **Tracing.** The `worker.consume` span + pipeline spans must be started **inside + each task**, carrying that message's extracted `traceparent` (see + `_consume_span`). Don't share one span across concurrent jobs or the Tempo + waterfalls will merge. +- **Per-job metrics.** `wm.record_job` / `wm.record_asset` are already called; keep + them per-task (asset_type label only — never asset_id on a metric). + +**Recovery — replace the homegrown scan:** +- `consumer.py` `_recover_stuck_pending()` does a DB scan + (`status IN ('pending','in_progress') AND updated_at < now() - interval '2 minutes'`) + and re-`XADD`s. Replace with **`XAUTOCLAIM`** (or `XPENDING` + `XCLAIM`) on + `media:jobs` / `worker-group` to reclaim messages idle past a threshold from dead + consumers — using the stream's own delivery state. Keep it time-gated like + `_maybe_recover()`. + +**Dead-letter queue:** +- Today poison messages are marked `failed` and the Redis message is dropped (acked + or abandoned). Add a **dead-letter stream** (e.g. `media:jobs:dlq`): when a job + exceeds `cfg.redis.max_retries`, `XADD` the message (with failure metadata) to the + DLQ and `XACK` the original, instead of silently dropping. Lets you inspect/replay. + +**Head-of-line blocking (optional, in the design):** +- A 60s video blocks short thumbnails behind it. Consider **priority lanes** (e.g. + separate streams or a priority field) so small jobs don't queue behind large + transcodes. + +--- + +## 5. Exact engineering targets — Track 1b (webhook dispatcher) + +**The serial delivery loop:** +- `internal/webhook/dispatcher.go` `tick()` fetches a batch with + `... FOR UPDATE OF wd SKIP LOCKED LIMIT $BatchSize`, then **delivers them one at a + time** in `for _, row := range rows { d.deliver(ctx, row) }`. Each `deliver()` is a + synchronous HTTP POST with `d.client.Timeout`. **This serial loop is the + bottleneck.** +- Config: `WEBHOOK_POLL_INTERVAL` (2s), `WEBHOOK_BATCH_SIZE` (50), `WEBHOOK_TIMEOUT` + (10s), `WEBHOOK_MAX_ATTEMPTS` (5) — in `internal/config/env.go`. + +**The move:** +- Deliver the batch **concurrently** with a bounded pool (e.g. `errgroup` + + semaphore, or a worker-pool of size `WEBHOOK_CONCURRENCY`). HMAC signing, backoff + (`next_attempt_at`), and retry logic in `handleFailure`/`backoff` stay as-is. +- **Wire the metrics.** `WebhookDeliveryTotal`, `WebhookDeliveryDuration`, + `WebhookDeliveryFailures` are **defined in `internal/metrics/metrics.go` but never + recorded** in the dispatcher. `NewDispatcher(db, logger, cfg)` doesn't take + `*metrics.Metrics` — extend it to accept `m`, record per delivery (labels: + `event`, `status` — **never** asset_id), and pass `m` from `cmd/server/main.go` + (where the dispatcher is constructed). +- The SLI rule `sli:webhook_delivery_latency_seconds:p95` already exists; it just + needs the histogram to be recorded. + +**Concurrency-safety note:** `tick()` runs `SELECT ... FOR UPDATE SKIP LOCKED` +**outside an explicit transaction** (`d.db.SelectContext`), so the row locks are +released as soon as the SELECT returns — fine for one dispatcher with internal +goroutines, but it does **not** prevent two *separate* dispatcher processes from +grabbing the same row. If you ever run >1 dispatcher, wrap the claim in a tx or add a +`claimed_at`/`locked_by` column. Document whichever you choose. + +--- + +## 5b. Track 3 follow-ups to fold in (do these first; ~30 min) + +These were flagged in `experiments/0001` and the roadmap; doing them first means the +0002/0003 experiments have clean, artifact-free numbers: + +- **Wire `webhook_delivery_*` metrics** (part of Track 1b above). +- **Wire `storage_operation_*` metrics** (the `pkg/utils/storagex` layer doesn't + record them; the `/complete` MinIO-HEAD cost is currently invisible). +- **Add a fine-bucket view for `db.query.duration`** in `internal/metrics/metrics.go` + (it uses default coarse buckets, so its p95 reads ~4.75 s — an artifact; true mean + is 18 ms). Mirror the existing `http`/`queue.processing.lag` views. +- **Standardize histogram buckets** across worker/API so p95s aren't distorted when + old + new bucket boundaries mix in one query window (this bit the image-ready and + enqueue-lag SLIs). + +--- + +## 6. Environment & topology (host = macOS) + +**Host ports → containers:** +| Service | Host | Notes | +|---|---|---| +| API | 5010 | `/healthz`, `/api/v1/...` | +| Postgres | 5433 | user `mpiper`, db `mpiper`, pw `changeme` | +| Redis | 6380 | stream `media:jobs`, group `worker-group` | +| MinIO API / console | 9000 / 9001 | bucket `mpiper`, minioadmin/minioadmin | +| Grafana | 3000 | anon admin; folder **MPiper** | +| Prometheus | 9090 | remote-write receiver enabled (for k6) | +| Tempo | 3200 | pinned `grafana/tempo:2.6.1` | +| OTel Collector | 8888/8889 | metrics; bridges `mpiper_net` ↔ `mpiper_obs_net` | + +**Container names:** `mpiper-api`, `mpiper-worker`, `mpiper-postgres`, +`mpiper-redis`, `mpiper-minio`, `mpiper-otel-collector`, `mpiper-tempo`, +`mpiper-prometheus`, `mpiper-grafana`, `mpiper-loki`, `mpiper-promtail`. + +**Compose overlays:** `docker-compose.yml` (core) + `docker-compose.observability.yml` +(Tempo/Prom/Loki/Grafana/collector) + `docker-compose.loadtest.yml` (CPU/mem pins + +`TRACE_SAMPLING_RATE=1.0`). `ENCRYPTION_KEY=0123456789abcdef0123456789abcdef`. + +**Metric naming (important):** the collector's Prometheus exporter uses +`namespace: mpiper`. Go API instruments → `mpiper_http_server_request_duration_seconds_*`; +**worker instruments already carry a `mpiper.` prefix → double prefix** +`mpiper_mpiper_job_processing_success_total`, etc. k6 client metrics land under +`k6_*` (custom ones as `k6_mpiper_*`). + +--- + +## 7. Runbook / command cheat sheet + +```bash +# Bring up core + observability + loadtest pins (everything, rebuild): +docker compose -f docker-compose.yml -f docker-compose.observability.yml \ + -f docker-compose.loadtest.yml up -d --build + +# Rebuild just api/worker after code changes: +docker compose -f docker-compose.yml -f docker-compose.observability.yml \ + -f docker-compose.loadtest.yml up -d --build api worker + +# Worker unit tests — the image entrypoint boots the worker, so OVERRIDE it: +docker run --rm --entrypoint python -v "$PWD":/app -w /app mpiper-worker \ + -m unittest discover -s worker/tests -p 'test_*.py' -v + +# Go: build / vet / test (tests/performance_suite_test.go fails w/o PERF_TEST_URL — ignore) +go build ./... && go vet ./internal/... && go test ./internal/... ./pkg/... + +# Load test (baseline profile to compare against exp 0001): +./loadtest/run.sh open --rate 10/s --duration 90s # arrival > service +./loadtest/run.sh closed --vus 10 --duration 2m # find max throughput + +# Query Prometheus history (data persists across `down` WITHOUT -v): +# Tempo retains traces 48h, Prometheus 30d. Instant queries only see the last +# 5 min, so for past runs wrap in last_over_time(metric[12h]) / max_over_time. + +# Inspect Redis stream + consumer group / pending: +docker exec mpiper-redis redis-cli XINFO GROUPS media:jobs +docker exec mpiper-redis redis-cli XPENDING media:jobs worker-group + +# Inspect webhook backlog: +docker exec mpiper-postgres psql -U mpiper -d mpiper -c \ + "SELECT status, count(*) FROM webhook_deliveries GROUP BY status;" + +# UIs: Grafana http://localhost:3000 (Experiment Overview) · Prometheus :9090 · Tempo via Grafana Explore +``` + +--- + +## 8. Landmines (already bit, or will) + +- **Worker tests:** the `mpiper-worker` image has an entrypoint that runs the worker; + you MUST `--entrypoint python` to run unittest, else it tries to boot + migrate and + hits the DB. The local `.venv` lacks deps — always test in the container. +- **Tracing under concurrency:** start the `worker.consume` span (and pipeline spans) + *inside each task* with that message's context. Sharing context across goroutines/ + tasks will corrupt the per-asset waterfalls. Verify in Tempo after. +- **Ack discipline:** only `XACK` a message after *its* job succeeds. With a pool, + don't ack by position — ack by `msg_id`. +- **Mixed histogram buckets:** changing bucket boundaries makes `histogram_quantile` + over a window that spans the change produce garbage p95s. After re-instrumenting, + either reset Prometheus data or wait for the old series to age out before reading. +- **DB pool pressure:** more concurrent jobs → more in-use connections. Pool max is + 25 (`mpiper_db_connections_max_open`). Watch `..._active` and `..._wait_count`. +- **Webhook `SKIP LOCKED` without a tx:** safe for single-dispatcher internal + concurrency, NOT for multiple dispatcher processes (see §5). +- **Operational flakiness seen this session:** an aborted `compose up` (a stray + `mpiper-webhook-receiver` on host :8888 collided with the collector) left + containers with stale port publishing (`docker port` empty) and detached the + collector from `mpiper_obs_net`. Fix = `up -d --force-recreate `. If telemetry + "disappears," check the collector is on both networks and Prometheus targets are up. +- **k6:** no `TextEncoder` in its runtime (use charCodes); client metrics are prefixed + `k6_`; remote-write target is `http://localhost:9090/api/v1/write`. +- **Dedup hides work:** the harness fans out unique bytes per iteration; keep that or + repeat runs do near-zero work. +- **Don't put `asset_id` on a metric label** (high cardinality) — span attribute only. + +--- + +## 9. Acceptance / how we'll know it worked + +- **Track 1:** re-run `open --rate 10/s` → μ rises ~N× (pool size, until CPU-bound), + queue depth **stabilizes/drains** instead of growing, job success stays 100% (no + double-processing — verify via DB job counts and dedup). A killed-mid-job consumer's + message is reclaimed by `XAUTOCLAIM`; a poison message lands in `media:jobs:dlq`. + Write `experiments/0002-concurrent-worker.md` (before/after table + a trace). +- **Track 1b:** under the same load, `webhook_pending` **drains to ~0**; the new + webhook delivery-rate and p95 panels populate; `sli:webhook_delivery_latency_seconds:p95` + renders. Write `experiments/0003-webhook-throughput.md`. + +Each writeup follows the `0001` template: setup (with resource pins) → method → before +numbers → the trace/dashboard evidence → conclusion. Local results are **relative** — +trust deltas and bottleneck location, not absolute throughput. + +--- + +## 10. Repo / git state at handoff + +- **Branch:** `feat/track-03-observability` (cut from `staging`), **10 commits**, + Track 3 work committed (tracing, worker instrumentation, log correlation, metric + fixes + DB pool gauges, observability infra, Grafana provisioning fix, dashboards, + k6 harness, `experiments/0001`). +- **Uncommitted at handoff:** the roadmap README rewrite (`docs/enhancements/README.md`) + and this handoff doc — commit them at the start of the Track 1 session + (`docs(roadmap): mark Track 3 done, re-prioritize from exp 0001`). +- **Not pushed yet.** Decide whether to push `feat/track-03-observability` + open a PR + against `staging` before branching for Track 1, or continue on the same branch. +- **Key reads:** `experiments/0001-worker-saturation.md` (the baseline), + `docs/enhancements/README.md` (re-prioritized roadmap), + `track-01-concurrent-worker.md` (full design — write it out before coding, per the + per-track design-doc philosophy), and `track-03-handoff.md` (the doc that started + the Track 3 session, for format). + +--- + +## 11. Suggested first-session scope + +Do the **§5b follow-ups + Track 1b first** (small, high-value, makes the next +experiment clean), then **Track 1**: + +1. **Warm-up (§5b):** wire `webhook_delivery_*` + `storage_operation_*` metrics, add + the `db.query.duration` view. *Demo:* those panels populate. +2. **Track 1b:** concurrent webhook delivery + pass `m` into the dispatcher. *Demo:* + `webhook_pending` drains under load → `experiments/0003`. +3. **Track 1:** bounded worker pool honouring `MAX_CONCURRENT_JOBS` (pick process vs + thread, document why), preserving ack/idempotency/tracing invariants. *Demo:* μ + scales, queue stabilizes → `experiments/0002`. +4. **Then** `XAUTOCLAIM` recovery + DLQ stream, and (optional) priority lanes. + +That order banks two quick, demoable wins (clean metrics + webhooks draining) before +the larger concurrency change, and every step is provable by re-running the existing +k6 profile against the Track 3 dashboards. diff --git a/docs/enhancements/track-02-handoff.md b/docs/enhancements/track-02-handoff.md new file mode 100644 index 0000000..4645124 --- /dev/null +++ b/docs/enhancements/track-02-handoff.md @@ -0,0 +1,138 @@ +# Track 2 — Queue-depth autoscaling — Session Handoff (start here) + +**Purpose:** everything a fresh conversation needs to begin **Track 2 (scale the +worker fleet on queue lag)** without prior context. Read this top to bottom. It is +the *operational* companion; pair it with a short design doc +(`track-02-autoscaling.md`) written before coding, per the per-track design-doc +philosophy. Assumes **Tracks 3, 1, and 1b are done** — tracing/SLOs/dashboards/k6 +exist, the worker is a bounded concurrent pool, and webhook delivery is concurrent. + +--- + +## 1. What MPiper is (60-second orientation) + +Go **API** (`cmd/server`, `internal/`) accepts uploads; a Python **worker** +(`worker/`) processes them. They talk over **Redis Streams** (`media:jobs`, group +`worker-group`). **Postgres** is the source of truth; **MinIO** stores objects. +Full orientation + topology + runbook live in +[`track-01-handoff.md`](track-01-handoff.md) §1, §6, §7 — reuse them. + +**What Track 1 changed (your starting point):** the worker now runs a bounded +`ThreadPoolExecutor` sized by `MAX_CONCURRENT_JOBS` (honour `mcj ≈ cores-per-pod`), +recovers dead-consumer messages with `XAUTOCLAIM`, and dead-letters poison +messages to `media:jobs:dlq`. Measured **2.37× throughput** at `mcj=4` on 4 cores +(`experiments/0002`). Crucially: **per-pod throughput now scales with cores, so the +next lever is more pods.** + +--- + +## 2. The goal in one sentence + +Make the **number of worker pods** track the **Redis Streams backlog** (queue lag), +so the pipeline absorbs bursts (scale up) and stops wasting capacity when idle +(scale down) — a closed control loop driven by a real saturation signal, not CPU. + +> Why queue-lag, not CPU: CPU-based HPA reacts to *symptom* not *cause*, and lags +> bursty I/O+transcode work. Queue depth / oldest-message-age is the direct +> backpressure signal (Little's Law: `L = λW` — a growing `L` at fixed `W` means +> `λ > μ`, i.e. add workers). + +--- + +## 3. Prerequisites & gotchas verified in code (do these FIRST) + +- **The scaling signal that exists today is `queue.depth = XLEN`, which is NOT a + true backlog.** `RegisterQueueDepthFunc` *is* wired — in + `internal/queue/queue.go` (~L79), not `main.go` — and reports `XLEN media:jobs`. + But `XLEN` counts **all** stream entries, including acked-but-untrimmed ones + (`MaxStreamLength: 10_000` in `queue.NewRedisQueue`), so it stays high even when + the backlog is drained. **Don't autoscale on `queue.depth`.** `queue.processing.lag` + (a histogram, recorded in `queue.go` ~L177) measures per-message wait, not a + scalable gauge either. +- **⚠️ Task 0: expose a true backlog signal.** Add a gauge for the consumer-group + **`lag`** (undelivered entries) and/or the **oldest-pending age** — e.g. + `XINFO GROUPS media:jobs` → `lag`, or `XPENDING` for the idle time of the oldest + pending entry. This is the signal a lag-driven scaler reads; `XLEN` will mislead it. + Decide and document which (lag vs age) drives scaling. +- **k8s manifests already exist** but scale on the wrong signal: `deploy/k8s/worker-deployment.yaml` + has `replicas: 2` and a **`HorizontalPodAutoscaler` (min 2 / max 10) on CPU 75% / + mem 85%** — and **no `terminationGracePeriodSeconds`** (so it defaults to 30s). + Track 2 replaces/augments the HPA with a **lag-driven** scaler. +- **`mcj ≈ cores-per-pod` (Track 1 lesson).** Autoscaling adds *pods*; each pod runs + its own thread pool. Don't crank `MAX_CONCURRENT_JOBS` — set it to the pod's CPU + limit and scale pod count. Oversubscription was measured to *reduce* throughput. +- **Recovery interplay.** New pods join `worker-group` and read `>` (new messages); + a scaled-*down* pod's in-flight work is abandoned and reclaimed by `XAUTOCLAIM` + after `RECOVERY_MIN_IDLE_MS` (default 120s). For responsive scale-down, set the + pod's `terminationGracePeriodSeconds` ≥ the worker's `SHUTDOWN_DRAIN_TIMEOUT` + (default 30s, in `worker/consumer/main.py`) so in-flight jobs drain cleanly + instead of being abandoned and waiting out the 120s reclaim. Both default to 30s + today — tight; widen the grace period if jobs run longer. +- **DB pool pressure.** N pods × `mcj` connections. Each pod sizes its pool to + `mcj + 2` (`worker/consumer/db.py`). At max replicas this can exceed Postgres' + `max_connections` — compute `maxReplicas × (mcj+2) + API pool` and cap accordingly + (watch `mpiper_db_connections_*`). + +--- + +## 4. Engineering targets + +1. **Wire the scaling signal** (Task 0 above): record consumer-group lag (and/or + oldest-pending age) as a gauge, expose it where the scaler can read it. +2. **Choose the scaler.** Options to weigh in the design doc: + - **KEDA `redis-streams` scaler** — purpose-built; scales a Deployment on + `pendingEntriesCount`/lag of a stream+group. Cleanest fit; needs KEDA installed. + - **Prometheus-adapter + HPA on a custom metric** (the lag gauge) — reuses the + existing Prometheus, no new operator, but more wiring. + - **A custom controller** — most work, most lesson; probably overkill. + Recommend KEDA `redis-streams`; document the tradeoff. +3. **Tune the control loop.** Target lag per pod, `pollingInterval`, + `cooldownPeriod`/stabilization, min/max replicas. Avoid flapping (hysteresis). +4. **Graceful scale-down.** Confirm SIGTERM → bounded drain → no lost work (relies on + Track 1's `shutdown(timeout)` + `XAUTOCLAIM` safety net). + +--- + +## 5. Acceptance / how we'll know it worked + +Re-use the k6 harness and the consolidated overlay (`docker-compose.loadtest.yml` +env knobs; `./loadtest/run.sh`). For k8s, run on the cluster the manifests target +(or kind/minikube + KEDA). + +- **Backlog → scale-up → drain cycle:** drive `open --rate` above one pod's μ; the + scaler adds pods; aggregate μ rises ~linearly with pods (until CPU/DB-bound); + **lag rises then drains to ~0**; then load stops → pods **scale back down** after + cooldown. +- **No flapping** under steady load; **no lost/double-processed jobs** across + scale events (verify via DB job counts + dedup; a scaled-down pod's job is + reclaimed, not dropped). +- **DB pool stays under `max_connections`** at `maxReplicas`. +- Write `experiments/0004-autoscaling.md` (0001 template: setup w/ pod & resource + limits → method → backlog/replica/lag timeseries → conclusion). Capture the + replica-count and lag panels. + +--- + +## 6. Suggested first-session scope + +1. **Task 0:** add a **consumer-group lag** gauge (and/or oldest-pending age) — a + *new* observable gauge alongside the existing (misleading) `queue.depth=XLEN`; + wire it like `queue.go` wires `RegisterQueueDepthFunc`, plus a Grafana panel. + Prove it tracks a manually-`XADD`'d backlog and falls to ~0 on drain. *Demo:* + panel moves with `XADD`/drain (and, unlike `queue.depth`, returns to 0). +2. **Design doc** `track-02-autoscaling.md`: signal choice (lag vs depth vs age), + scaler choice (KEDA vs prom-adapter), control-loop params, scale-down safety. +3. **KEDA `ScaledObject`** on the worker Deployment driven by the lag signal; + replace the CPU HPA. Set `mcj` = pod CPU limit; min/max replicas; cooldown. +4. **Load test the cycle** + `experiments/0004`. + +Banks a clean, demoable win (lag-driven scale-up/drain) on top of the now-concurrent +worker, and is provable by re-running the existing k6 profile against the dashboards. + +--- + +## 7. Key reads + +- [`track-01-handoff.md`](track-01-handoff.md) — topology, runbook, env, landmines (reuse). +- [`experiments/0002-concurrent-worker.md`](../../experiments/0002-concurrent-worker.md) — the per-pod baseline μ to multiply by replica count. +- `internal/queue/queue.go` (~L79 `RegisterQueueDepthFunc`→`XLen`, ~L177 `QueueProcessingLag`) — model the new lag gauge on this wiring; `internal/queue/redis.go` (`XLen`, add an `XInfoGroups`/`XPending` helper); `internal/metrics/metrics.go` (instrument defs); `deploy/k8s/worker-deployment.yaml` (current CPU HPA + missing `terminationGracePeriodSeconds`). diff --git a/docs/enhancements/track-03-handoff.md b/docs/enhancements/track-03-handoff.md new file mode 100644 index 0000000..089c030 --- /dev/null +++ b/docs/enhancements/track-03-handoff.md @@ -0,0 +1,222 @@ +# Track 3 — Session Handoff (start here) + +**Purpose:** everything a fresh conversation needs to begin **Track 3 +(end-to-end tracing, SLOs & local load testing)** without prior context. Read +this top to bottom, then open `track-03-observability-and-load.md` for the full +design and phased plan. This doc is the *operational* companion: where things +are, how to run them, and the landmines already discovered. + +--- + +## 1. What MPiper is (60-second orientation) + +A media-processing pipeline: a **Go API** (`cmd/server`, `internal/`) accepts +uploads and a **Python worker** (`worker/`) processes them. They communicate over +**Redis Streams** (`media:jobs`). **Postgres** is the durable source of truth; +**MinIO** (S3-compatible) stores objects. Webhooks notify clients of job +lifecycle events. + +**Asset flow:** +`POST /api/v1/storage/presign` → client `PUT`s file to MinIO → +`GET /api/v1/assets/{id}/complete` (writes asset `uploaded` + job + outbox row + +`job.starting` webhook rows in one tx) → **outbox relay** (1s poll) publishes to +Redis → **worker** consumes → image (3 webp variants) or video (poster + 720p + +preview) → variants written to MinIO + Postgres, asset `ready` → worker inserts +`job.started`/`job.done` webhook rows → **dispatcher** (2s poll) delivers signed POSTs. + +--- + +## 2. The Track 3 goal in one sentence + +Make one **trace per asset** that spans API → Redis → worker → ffmpeg (so queue +wait and per-stage time are visible), define a small set of **SLOs**, and build a +**local k6 load harness** + Grafana dashboards so we can saturate the system on a +laptop and *see* where it bends. Full plan: `track-03-observability-and-load.md`. + +--- + +## 3. Current telemetry state (verified in code) + +- **Go API:** OTel **traces + metrics**, exported OTLP to `otel-collector:4317`. + Tracer init in `internal/metrics/otel.go`; metric instruments in + `internal/metrics/metrics.go`. +- **Python worker:** OTel **metrics only** (`worker/utils/metrics.py`, OTLP to + `otel-collector:4317`). **No tracer, no span creation, no context extraction.** +- **The gap:** the Go side traces the HTTP request and `Enqueue`, but **never + injects a `traceparent`** into the Redis message or the outbox row. The worker + therefore starts fresh with no parent. The trace dies at the queue boundary. +- **Observability stack** (`docker-compose.observability.yml`, configs in + `observability/`): OTel Collector (bridges `mpiper_net` ↔ `mpiper_obs_net`), + **Tempo** (traces), **Prometheus** (metrics), **Loki + Promtail** (logs), + **Grafana** (dashboards, anonymous admin). Collector pipeline: OTLP receiver → + Tempo (traces) + Prometheus exporter `:8889` (metrics). + +> Note: `CLAUDE.md` historically said the worker is "prometheus_client (not OTel)" +> — that's **stale/wrong**; the worker uses OTel metrics. Don't trust that line. + +--- + +## 4. Exact engineering targets for Phase 1 (close the trace gap) + +These are the precise seams to touch. Verify each before editing. + +**Inject context (Go):** +- `internal/queue/queue.go` — `RedisQueue.Enqueue` builds the stream message + (a `map`); the worker reads its fields. Inject `traceparent` (and `tracestate`) + here using the OTel propagator, as top-level message field(s). +- `internal/outbox/relay.go` — `tick()` unmarshals the outbox row payload and calls + `queue.Enqueue(ctx, payload)`. Because enqueue is **store-and-forward**, the + trace context must survive in the **outbox row** too: capture it when the row is + written in `internal/service/asset.go` (`MarkAssetUploaded`, the + `outboxRepo.InsertTx` call), persist it (extend `internal/models/outbox.go` + + `internal/repository/outbox_repo.go` + a migration), and re-inject on relay. +- **Verify the global propagator is set** in `internal/metrics/otel.go` + (`otel.SetTextMapPropagator(propagation.TraceContext{})`). If missing, add it — + without it, injection is a no-op. + +**Extract + continue (Python):** +- Add `worker/utils/tracing.py` mirroring `worker/utils/metrics.py` (tracer init, + OTLP exporter to the same endpoint). Find where `init_metrics(...)` is called and + init the tracer alongside it (same lifecycle). +- `worker/consumer/consumer.py` — in `consume()`, after the message payload is + normalized (note: a `body` field, if present, is JSON-decoded and merged), read + `traceparent` and start the consumer span. Use a **child span with a link** to + the producer context (link is the correct primitive for queue fan-in; child span + keeps the Tempo waterfall readable). + +**Span the stages (Phase 2):** +- `worker/processing/processor.py` — `process_asset_dispatch` (download, dedup check). +- `worker/processing/images.py` — per-variant encode/upload. +- `worker/processing/videos.py` — `run()` wraps each ffmpeg call (poster / transcode_720p / preview). +- Stamp `trace_id`/`span_id` into worker + API structured logs for Loki↔Tempo linking. + +**Message format reminder:** the consumer accepts either `job_id` (canonical) or +`asset_id`. The outbox payload (built in `asset.go`) currently carries `job_id`, +`asset_id`, `event`, `timestamp`. Add trace context as additional field(s); don't +break the existing keys. + +--- + +## 5. Environment & topology facts (host = macOS) + +**Host ports → containers:** +| Service | Host | Container | Notes | +|---|---|---|---| +| API | 5010 | 5010 | `/healthz`, `/api/v1/...` | +| Postgres | 5433 | 5432 | user `mpiper`, db `mpiper`, pw `changeme` | +| Redis | 6380 | 6379 | stream `media:jobs`, group `worker-group` | +| MinIO API | 9000 | 9000 | bucket `mpiper` (anon download on) | +| MinIO console | 9001 | 9001 | minioadmin / minioadmin | +| Grafana | 3000 | 3000 | anon admin | +| Prometheus | 9090 | 9090 | | +| Tempo | 3200 | 3200 | OTLP in on 4317/4318 (obs net) | +| webhook-receiver | 8888 | 8080 | overlay only | + +**Container names:** `mpiper-api`, `mpiper-worker`, `mpiper-postgres`, +`mpiper-redis`, `mpiper-minio`, `mpiper-webhook-receiver`, `mpiper-otel-collector`, +`mpiper-tempo`, `mpiper-prometheus`, `mpiper-grafana`, `mpiper-loki`. + +**Storage split endpoint (implemented):** `S3_ENDPOINT_URL=http://minio:9000` +(internal I/O) vs `S3_PUBLIC_ENDPOINT_URL=http://localhost:9000` (presigned + +public URLs). Don't undo this — host-run load tests depend on it. + +**Telemetry env (`.env.local`):** `OTEL_EXPORTER_OTLP_ENDPOINT=otel-collector:4317`, +`OTEL_TLS_INSECURE=true`, `TRACE_SAMPLING_RATE` (default 0.1 in code — **set to +1.0 locally** so every asset traces). `ENCRYPTION_KEY=0123456789abcdef0123456789abcdef` +(32 bytes; used for auth tokens AND webhook secrets). + +--- + +## 6. Runbook / command cheat sheet + +```bash +# Bring up core + observability (+ webhooks if you want webhook traces too) +docker compose -f docker-compose.yml -f docker-compose.observability.yml up -d --build +# add: -f docker-compose.webhooks.yml (for webhook receiver) + +# End-to-end smoke (host-run; image + video + webhooks; 23 checks) +./scripts/demo-e2e.sh + +# Go: build / vet / tests (tests/performance_suite_test.go FAILS unless PERF_TEST_URL set — ignore) +go build ./... && go vet ./... && go test ./... + +# Worker tests: the local .venv (py3.14) LACKS psycopg_pool/pytest/cryptography. +# Run them INSIDE the worker container instead: +docker exec -w /app mpiper-worker python -m unittest discover -s worker/tests -p 'test_*.py' -v + +# Mint an auth token from the host (system python3 has `cryptography`; venv does not): +TOKEN=$(python3 - <<'PY' +import base64, os +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +key=b"0123456789abcdef0123456789abcdef"; nonce=os.urandom(12) +print(base64.urlsafe_b64encode(nonce+AESGCM(key).encrypt(nonce,b"demo-user",None)).rstrip(b"=").decode()) +PY +) + +# Inspect DB +docker exec mpiper-postgres psql -U mpiper -d mpiper -c "SELECT asset_id,status,type FROM assets ORDER BY created_at DESC LIMIT 5;" + +# Reset all state (assets/variants/objects accumulate across runs) +docker compose -f docker-compose.yml -f docker-compose.observability.yml down -v + +# UIs: Grafana http://localhost:3000 · Prometheus http://localhost:9090 · Tempo via Grafana Explore +``` + +--- + +## 7. Landmines (things that already bit, or will) + +- **Worker is single-threaded.** `MAX_CONCURRENT_JOBS` is in `worker/consumer/ + config.py` but **never used**; `consume()` does `count=1` and processes inline. + This is the expected bottleneck Phase 5 should prove — don't "fix" it in Track 3. +- **Recovery is homegrown.** A 2-min DB scan re-`XADD`s stale jobs; no + `XPENDING`/`XAUTOCLAIM`; poison messages are marked `failed` and dropped (no DLQ). + That's Track 1, not Track 3. +- **Global propagator may be unset** in Go — injection silently no-ops without it. Check first. +- **Sampling.** Code default `TRACE_SAMPLING_RATE=0.1`. Set 1.0 locally or you'll + lose most traces and think propagation is broken. +- **Dedup hides work.** Identical fixtures dedup after the first asset → near-zero + work on repeats. The load harness must **fan out unique bytes** to measure real + per-job cost. +- **Cardinality.** asset_id is fine as a *trace/span attribute*; **never** put it on + a *metric* label. +- **Health check.** `cmd/server --health-check` is now a real `/healthz` probe + (was previously booting a second server and failing to bind 5010 → api unhealthy + → worker wouldn't start). If you change startup, keep that path lightweight. +- **Rebuild after code changes.** api/worker run from built images: + `docker compose ... build api worker && docker compose ... up -d`. +- **Local ≠ prod.** Trust bottleneck *location* and before/after deltas, not + absolute throughput numbers. + +--- + +## 8. Suggested first-session scope + +Do **Phase 0 + Phase 1** together (highest value, gets a real cross-boundary trace fast): + +1. **Phase 0:** add `deploy.resources.limits` (cpu/mem) to `api` + `worker` in a + compose overlay; set `TRACE_SAMPLING_RATE=1.0`; bring up with the observability + overlay; capture a baseline `demo-e2e.sh` run and confirm spans land in Tempo. +2. **Phase 1:** Go `traceparent` injection (enqueue + outbox row + migration) → + worker tracer + extraction in `consume()`. + +**Acceptance:** open one asset in Grafana/Tempo and see a single trace from +`POST /storage/presign` through `enqueue` → (visible queue-wait gap) → worker +`consume` span. That alone is a satisfying, demoable win. + +Then continue with Phases 2–5 from the design doc (pipeline spans + log +correlation → SLO recording rules + dashboards → k6 harness → first experiment +writeup that names the worker bottleneck, feeding Track 1). + +--- + +## 9. Repo / git state at handoff + +- Branch: `feat/webhook-notifications`; open **PR #18**. +- Demo-readiness + split-endpoint work is committed (`9404c7a`) and pushed. +- `docs/enhancements/` (this file + `README.md` + `track-03-observability-and-load.md`) + may be **uncommitted** — commit them at the start of the Track 3 session. +- Key docs to read: `docs/enhancements/README.md` (catalog), + `track-03-observability-and-load.md` (plan), `docs/arch/*` (existing outbox/ + reliability design notes), `CLAUDE.md` (repo conventions; note the stale worker- + telemetry line). diff --git a/docs/enhancements/track-03-observability-and-load.md b/docs/enhancements/track-03-observability-and-load.md new file mode 100644 index 0000000..43ff9c9 --- /dev/null +++ b/docs/enhancements/track-03-observability-and-load.md @@ -0,0 +1,214 @@ +# Track 3 — End-to-end tracing, SLOs & local load testing + +**Status:** planning · **Prereq:** none · **Unlocks:** makes every other track measurable + +## 1. Problem + +We can't improve what we can't see. Right now: + +- The **distributed trace breaks at the Redis boundary.** The Go API traces the + HTTP request and the `Enqueue` call, but it never injects a `traceparent` into + the stream message. The worker has OTel **metrics** but **no tracer** and does + no context extraction. So we cannot answer "for *this* asset, where did the 40 + seconds go?" as a single trace spanning API → outbox → Redis → worker → ffmpeg → + variant write. +- We have metrics but **no SLOs** — no agreed definition of "good", so no way to + say whether a change helped. +- We have **no way to generate controlled load**, so we've never seen the system + bend. The single-threaded worker (Track 1) is an invisible bottleneck until + something pushes on it. + +The user's real question: *this is a local project — how do we test under load +and actually understand what's working, failing, and needs optimization?* + +That question is answered in §3. + +## 2. Goals / Non-goals + +**Goals** +- One trace per asset, end to end, across the queue boundary, viewable in Tempo. +- A small, explicit set of **SLIs and SLOs** for the pipeline. +- A repeatable **local load harness** that can saturate the system on a laptop. +- Grafana dashboards (RED for the API, USE for the worker/host, a pipeline-latency + funnel, queue lag) wired so a metric spike links to an example trace (exemplars). +- A written **bottleneck-analysis loop**: load → observe → locate → optimize → re-run → compare. + +**Non-goals** +- Production-scale absolute numbers. Local results are **relative** — they reveal + bottlenecks and validate *direction*, not real-world capacity (see §7). +- Alerting/paging infrastructure (note SLO burn-rate alerts as a follow-up). +- Replacing the existing stack — we extend the bundled Tempo/Prometheus/Loki/Grafana. + +## 3. Can you load-test meaningfully on a laptop? Yes — here's the methodology + +The misconception is that load testing requires cloud scale. It doesn't. Load +testing is about **saturating the system relative to its own capacity** and +watching where it bends. A single-threaded worker on a laptop saturates at a +handful of concurrent jobs — you can absolutely push it past that locally. + +The thing that makes local results *interpretable* is **pinning resources** so +runs are reproducible and the bottleneck isn't hidden by spare laptop cores. We +add CPU/memory limits to the `api` and `worker` containers (compose `deploy. +resources.limits`) so "the worker is the bottleneck" is a stable, observable fact +rather than something that moves run to run. + +**The loop we're building:** + +``` + ┌─────────────────────────────────────────────┐ + │ 1. Define SLIs/SLOs (what "good" means) │ + └───────────────┬─────────────────────────────┘ + ▼ + ┌─────────────────────────────────────────────┐ + │ 2. Instrument end-to-end (close the trace gap)│ + └───────────────┬─────────────────────────────┘ + ▼ + ┌────────────┐ generate ┌─────────────────────────┐ + │ k6 (host) │ ────────────▶ │ MPiper (CPU-pinned) │ + │ load model │ presign→PUT │ API + worker │ + └────────────┘ →complete └───────────┬─────────────┘ + │ client-side metrics │ app OTel traces+metrics + ▼ ▼ + ┌─────────────────────────────────────────────────────┐ + │ 3. Observe in Grafana: RED, USE, pipeline funnel, │ + │ queue lag — metric spike → exemplar trace in Tempo │ + └───────────────┬─────────────────────────────────────┘ + ▼ + ┌─────────────────────────────────────────────────────┐ + │ 4. Locate bottleneck (trace waterfall + USE) → │ + │ optimize → re-run same profile → compare │ + └─────────────────────────────────────────────────────┘ +``` + +### Load model (this is the subtle part) + +- **Closed model (fixed VUs):** N virtual users each loop presign→upload→complete + as fast as they can. Good for finding max throughput and saturation point. +- **Open model (fixed arrival rate):** X new uploads/sec regardless of how fast the + system responds. Good for finding the **latency knee** and watching queue lag + grow when arrival rate > service rate (a live demonstration of Little's Law: + `L = λW`). + +We use **k6** run from the **host** (like `scripts/demo-e2e.sh`): the host can +reach both the API (`localhost:5010`) and MinIO (`localhost:9000`), so k6 performs +the *real* client flow — presign, `PUT` the file to the public endpoint, then +`complete`. k6 uploads real fixtures (the existing image + `tests/test_assets/ +sample.mp4`), optionally fanning out copies with unique bytes to defeat content-hash +dedup when we want true per-job work. + +Two views of the same run: +- **Client view** (k6's own metrics): request rate, error rate, client-side + latency percentiles → remote-written to the bundled Prometheus. +- **Server view** (MPiper's OTel): the pipeline's internal spans and metrics — + this is the point of the track, and what we'll mostly read. + +## 4. Design + +### 4.1 Close the trace gap (the core engineering work) + +1. **Inject context on enqueue (Go).** When the outbox relay (or `RedisQueue. + Enqueue`) publishes, inject the active span context as a `traceparent` field in + the stream message using the OTel propagator. The outbox row should carry the + trace context too (so the trace survives the store-and-forward hop). +2. **Extract + continue on consume (Python).** Add an OTel **tracer** to the worker + (mirroring `worker/utils/metrics.py`). In `consume()`, extract `traceparent` + from the message and start the consumer span as a **child** (a span link is the + correct primitive for queue fan-in; we'll use a child span with a link to keep + the waterfall readable). +3. **Span the pipeline stages.** Wrap `process_asset_dispatch`, download, + dedup-check, each image variant, and each ffmpeg invocation (poster / transcode / + preview) in spans with attributes (asset_id, type, bytes, role, ffmpeg rc). +4. **Correlate logs.** Stamp `trace_id`/`span_id` into worker + API structured logs + so Loki ↔ Tempo cross-linking works in Grafana. + +End result: open an asset in Tempo and see `HTTP POST /presign … → enqueue → +(time in queue) → worker consume → download → transcode_720p → write variant`, +with the **queue wait time** visible as the gap between enqueue and consume. + +### 4.2 SLIs / SLOs (initial, deliberately small) + +| SLI | Definition | Initial SLO (local) | +|-----|------------|---------------------| +| Presign latency | p95 of `POST /storage/presign` | < 150 ms | +| Image ready latency | p95 (complete → asset `ready`) for images | < 5 s | +| Video ready latency | p95 (complete → asset `ready`) for videos | < 60 s | +| Queue wait | p95 (enqueue → consume start) | < 2 s | +| Job success rate | done / (done + failed) | > 99% | +| Webhook delivery latency | p95 (event row created → delivered) | < 10 s | + +These come straight from spans/metrics we'll have. The numbers are starting +guesses; the *point* is to make them explicit, then move them based on data. + +### 4.3 Dashboards (Grafana, provisioned in `observability/grafana/dashboards`) + +- **API — RED:** request **R**ate, **E**rror rate, **D**uration (p50/p95/p99) per route. +- **Worker/host — USE:** CPU/mem **U**tilization, **S**aturation (queue depth, + in-flight jobs), **E**rrors. (cAdvisor/node metrics or the collector's own.) +- **Pipeline funnel:** uploaded → processing → ready/failed counts + the + per-stage latency breakdown (from spans). +- **Queue health:** stream length, oldest-pending age, outbox relay lag (metric + already exists), webhook pending gauge (already exists). +- **Exemplars:** histogram panels link a bucket spike to a concrete Tempo trace. + +### 4.4 Bottleneck-analysis loop (documented runbook) + +For each experiment: fix a load profile, run it, then read in order — (1) is the +SLO breached? (2) USE: is the worker CPU-saturated or queue-saturated? (3) open an +exemplar trace: which span dominates? (4) form a hypothesis, change one thing, +re-run the **same** profile, compare. Record results in an `experiments/` log so +"the transcode span dropped from 38s→6s after X" is captured. + +## 5. Phased implementation plan + +Each phase is independently demoable. + +- **Phase 0 — Resource pinning & baseline.** Add `deploy.resources.limits` to api/ + worker; bring up the observability overlay; capture a one-shot baseline with the + existing `demo-e2e.sh`. *Demo:* Grafana shows the run; numbers are reproducible. +- **Phase 1 — Trace propagation.** Inject `traceparent` on enqueue (Go) + outbox + row; extract + continue in the worker; add the worker tracer. *Demo:* a single + Tempo trace spans API→worker for one asset, with visible queue wait. +- **Phase 2 — Pipeline spans + log correlation.** Span dispatch/download/dedup/ + each variant/each ffmpeg call; add trace IDs to logs. *Demo:* trace waterfall + shows per-stage timing; click a log line → its trace. +- **Phase 3 — SLO recording rules + dashboards.** Prometheus recording rules for + the SLIs in §4.2; provision the four dashboards. *Demo:* a dashboard shows each + SLI vs its SLO target. +- **Phase 4 — k6 load harness.** `loadtest/` with closed- and open-model scripts, + a host-run wrapper, fixture fan-out, and k6→Prometheus remote write. *Demo:* + `./loadtest/run.sh open --rate 5/s --duration 3m` drives the system; Grafana + shows queue lag climbing and the latency knee. +- **Phase 5 — First experiment writeup.** Run a saturating profile, capture the + bottleneck (expected: the single-threaded worker), and write it up as the + motivating evidence for **Track 1**. *Demo:* `experiments/0001-worker-saturation.md` + with before numbers + the trace proving where time goes. + +## 6. How we'll know it works (acceptance) + +- A Tempo trace for any asset includes both API and worker spans, with queue wait + time visible. +- Every SLI in §4.2 renders on a dashboard against its target. +- `loadtest/run.sh` reproducibly drives the system into SLO breach, and the + responsible stage is identifiable from a trace within ~2 minutes of looking. +- Phase 5 writeup names the bottleneck with evidence — the input to Track 1. + +## 7. Risks & honest caveats + +- **Local ≠ production.** Absolute numbers are not portable (laptop CPU, no network + latency, single-node Redis/PG). Treat results as **relative**: bottleneck + location and before/after deltas are trustworthy; "we do N uploads/sec" is not. +- **Noisy neighbor.** k6, the app, and the observability stack share the laptop. + Pin app resources and keep k6 modest; consider running k6 with `--throw` budgets. +- **Container CPU limits change behavior** (e.g. ffmpeg threads). That's fine — it's + what makes runs comparable — but document the limits with each experiment. +- **Trace cardinality / sampling.** Asset-ID attributes are high-cardinality on + *traces* (OK) but must never become metric labels. Keep `TRACE_SAMPLING_RATE` + in mind; sample at 100% locally, lower in prod. +- **Dedup hides work.** Identical fixtures dedup after first processing; the load + harness must fan out unique bytes when measuring real per-job cost. + +## 8. Follow-ups (out of scope here) + +- SLO **burn-rate alerting** (multi-window) once SLOs stabilize. +- Continuous profiling (Pyroscope) to attribute CPU *inside* a span. +- CI smoke load test with a latency budget (feeds Track 8). diff --git a/docs/enhancements/track-04-handoff.md b/docs/enhancements/track-04-handoff.md new file mode 100644 index 0000000..74d7391 --- /dev/null +++ b/docs/enhancements/track-04-handoff.md @@ -0,0 +1,128 @@ +# Track 4 — Multi-tenancy, auth & quotas — Session Handoff (start here) + +**Purpose:** everything a fresh conversation needs to begin **Track 4** without +prior context. Read top to bottom, then write a short design doc +(`track-04-multitenancy-auth.md`) before coding, per the per-track philosophy. +**Fully local — no k8s required** (this is why it was picked ahead of Track 2, +which needs a cluster). Assumes Tracks 3, 1, 1b done. + +--- + +## 1. What MPiper is (60-second orientation) + +Go **API** (`cmd/server`, `internal/`) + Python **worker** (`worker/`) over +**Redis Streams**; **Postgres** is source of truth; **MinIO** stores objects. +Full orientation, topology, runbook, env, landmines: reuse +[`track-01-handoff.md`](track-01-handoff.md) §1, §6, §7. + +--- + +## 2. The goal in one sentence + +Turn the single-user, best-effort API into one that safely serves **multiple +tenants** — real authN/authZ (expiring, rotatable credentials), **tenant +isolation** on every asset read/write, **idempotency keys** so client retries +don't duplicate work, and **per-tenant quotas/rate limits**. + +--- + +## 3. Current state & gotchas — verified in code (do these FIRST) + +- **Auth is a homegrown AES-256-GCM token with no expiry or rotation.** + `pkg/utils/crypt.go` `GenerateToken/DecryptToken` encrypts *just the userID*; + `internal/middleware/authorization.go` decrypts it with `config.MustGet().EncryptionKey`. + Problems: **no `exp`/issued-at**, no key rotation, opaque (no claims), and the + middleware comment says "Invalid or expired token" but **nothing ever expires**. +- **⚠️ One key signs everything.** The same `ENCRYPTION_KEY` (exactly 32 bytes) + encrypts **auth tokens AND webhook secrets** (webhook secrets are stored + encrypted with it; see `internal/service/webhook.go` + the dispatcher's + `DecryptToken`). Leaking it compromises both. **Separate the webhook-signing key + from the auth-signing key** early — it touches stored data, so plan a migration. +- **Tenant tagging exists but is shallow.** `assets.owner_id` was added + (`migrations/000004_assets_owner_id.up.sql`) and `internal/service/asset.go` + (~L128) sets it from `middleware.GetUserID(ctx)` on create. The webhook→asset + join already scopes by it (`JOIN assets a ON a.owner_id = wr.user_id`). +- **⚠️ But reads/writes are NOT consistently owner-scoped — likely IDOR.** Verify + every asset path (`GET /assets/{id}/complete`, any asset fetch, variant lookups) + filters by `owner_id` = caller. The repo queries (`internal/repository/asset_repo.go`) + fetch by `asset_id` alone in places. **Task: enforce tenant scoping at the + repository layer** so a caller can never touch another tenant's asset by ID. +- **No idempotency keys.** A retried `POST /storage/presign` creates a **duplicate + asset** every time (confirmed gap — `docs/arch/reliability-and-correctness.md` + §"Idempotency today", gap #7). There is no `Idempotency-Key` handling and no + store for replaying prior responses. +- **Flat tenancy + single bucket.** No org→project hierarchy; one MinIO bucket with + path prefixes (`media/raw/`). No per-tenant prefix/credentials. +- **No quotas or per-tenant rate limits.** Any token can submit unbounded work. + +--- + +## 4. Engineering targets (suggested order — highest value / lowest risk first) + +1. **Idempotency keys (Stripe-style).** Accept an `Idempotency-Key` header on + `presign` (and `complete`); store `(tenant, key) → asset_id/response` with a TTL; + on replay within TTL return the **same** asset + response instead of creating a + new one. Decide: key storage table + TTL, response replay vs just dedup, scope + (per-tenant). *Teaches: the idempotency pattern, retry-safety.* +2. **Tenant isolation at the repository layer.** Thread tenant id through context + → every asset query gets a `WHERE owner_id = $tenant` (or `tenant_id`) guard; + add tests that a cross-tenant fetch 404s. Close the IDOR. Add per-tenant storage + **prefixes** (`media//raw/...`). +3. **Real auth.** Either **JWT** (asymmetric keys, `exp`, JWKS rotation) or scoped + **API keys** (hashed at rest, revocable). Add expiry + rotation; **split the + webhook-signing secret from the auth key** (migration for existing encrypted + webhook secrets). Keep the middleware contract (`GetUserID` → now `GetTenant`/claims). +4. **Quotas + rate limits.** Per-tenant request rate limit (middleware) and usage + accounting (e.g. assets/storage per tenant) with enforcement on `presign`. + *Teaches: backpressure at the edge, usage metering.* + +> Pick a tenancy model up front and document it: minimal is keep `owner_id` = +> tenant; fuller is `org → project → asset` with row scoping. Don't over-build — +> the IDOR fix + idempotency are the high-value core. + +## 5. Acceptance / how we'll know it worked + +- **Idempotency:** same `Idempotency-Key` replayed → one asset, identical response; + different key → new asset. Test under concurrent duplicate requests (no race dupes). +- **Isolation:** tenant A cannot read/complete/lookup tenant B's asset by ID + (returns 404/403); storage objects land under the tenant prefix. Add repo-level + + HTTP-level tests. +- **Auth:** expired token rejected; rotated signing key still validates + unexpired tokens (JWKS/keyset); webhook secrets decrypt with their *own* key + post-migration. +- **Quotas:** a tenant over its limit is throttled/429'd; usage metric per tenant. +- No load test needed, but add a security-focused test suite. Optionally write + `experiments/0004-tenancy.md` documenting the IDOR-before/after. + +## 6. Landmines + +- **Key-split migration:** existing `webhook_registrations.secret` rows are + encrypted with `ENCRYPTION_KEY`. Splitting keys means re-encrypting them — plan a + one-time migration or dual-read window. Don't strand existing registrations. +- **Don't break the local token-minting path:** `scripts/demo-e2e.sh`, + `loadtest/run.sh`, and the README all mint the current AES token inline with a + Python snippet. If you change the token format, update all three or provide a + compatibility shim, or the demo + load harness break. +- **Context plumbing:** `middleware.GetUserID(ctx)` is the single chokepoint — + extend it to carry tenant/claims rather than scattering token parsing. +- **Worker side:** the worker also reads `owner_id` (webhook join in + `worker/webhooks.py`); a tenancy-column rename ripples into the worker SQL + its + tests. Grep both services. +- **`ENCRYPTION_KEY` is required at boot** (config panics without it, exactly 32 + bytes) — keep that contract or update config validation + all envs. + +## 7. First-session scope + +1. **Design doc** `track-04-multitenancy-auth.md`: tenancy model (flat owner vs + org/project), auth choice (JWT vs API keys), idempotency-key storage + TTL, + key-split migration plan. +2. **Idempotency keys** on `presign`/`complete` (highest value, self-contained). + *Demo:* replayed key → one asset. +3. **Repository-layer tenant scoping** + the IDOR test. *Demo:* cross-tenant fetch 404s. +4. **Auth hardening** (expiry + rotation + key split) and **quotas** as follow-ups. + +## 8. Key reads + +- [`track-01-handoff.md`](track-01-handoff.md) — topology/runbook/env (reuse). +- `docs/arch/reliability-and-correctness.md` — §"Idempotency today" + the gap table (gap #7 client idempotency keys; replay protection). +- `pkg/utils/crypt.go` (token gen/decrypt), `internal/middleware/authorization.go` (`GetUserID` chokepoint), `internal/service/asset.go` (~L128 owner_id set; presign/complete flow), `internal/repository/asset_repo.go` (asset queries to scope), `internal/service/webhook.go` + `internal/webhook/dispatcher.go` (shared-key webhook secrets), `migrations/000004_assets_owner_id.*`. diff --git a/docs/enhancements/track-04-multitenancy-auth.md b/docs/enhancements/track-04-multitenancy-auth.md new file mode 100644 index 0000000..19d6148 --- /dev/null +++ b/docs/enhancements/track-04-multitenancy-auth.md @@ -0,0 +1,161 @@ +# Track 4 — Multi-tenancy, Auth & Quotas — Design + +**Status:** accepted · **Scope:** fully local (no k8s) · **Predecessor:** see +[`track-04-handoff.md`](track-04-handoff.md). + +This document records the confirmed design decisions for Track 4 before +implementation. It turns the single-user, best-effort API into one that safely +serves multiple tenants: real authN/authZ, tenant isolation on every asset +operation, idempotency keys, and per-tenant quotas/rate limits. + +--- + +## 1. Tenancy model — flat `owner_id` + +**Decision:** flat tenancy. `assets.owner_id` (TEXT) **is** the tenant +identifier. No `org → project → asset` hierarchy. + +- The column **keeps its name** (`owner_id`). Renaming it to `tenant_id` would + ripple into the worker (`worker/webhooks.py` and its test assert the exact SQL + `JOIN assets a ON a.owner_id = wr.user_id`). Keeping the name means **zero + worker churn**. "tenant" is only the in-process *concept*; on disk it stays + `owner_id` / `webhook_registrations.user_id`. +- The tenant identifier is a free-form TEXT string sourced from the API key (see + §2). There is no separate `users`/`tenants` table — the `api_keys` table is the + identity source of record. + +**Rejected:** `org → project` hierarchy — more schema + bigger worker/webhook +ripple for no near-term value. The high-value core is the IDOR fix + +idempotency, not a richer tenancy graph. + +## 2. Auth — scoped API keys (drop the AES token) + +**Decision:** replace the homegrown AES-256-GCM token with **scoped API keys**. + +- **Wire format:** `mp__` where `prefix` is a short public + identifier (used to narrow lookups and shown in listings) and `secret` is + high-entropy random. +- **At rest:** only the **SHA-256 hash** of the full key is stored + (`api_keys.key_hash`, UNIQUE). API keys are high-entropy, so a fast hash with + an indexed equality lookup is appropriate (bcrypt is for low-entropy + passwords and cannot be indexed). The plaintext key is shown **once** at mint + time and never persisted. +- **Lifecycle:** keys carry optional `expires_at` and `revoked_at`. The auth + middleware rejects missing/unknown/expired/revoked keys with `401`. +- **Scopes:** `scopes JSONB` is carried for future authorization granularity; + initially keys are minted with a broad scope. +- **Context contract:** the single chokepoint `middleware.GetUserID` is renamed + to `middleware.GetTenant`, returning the tenant id (and scopes) from the + validated key. All call sites move to `GetTenant`. +- **No HTTP admin surface.** Keys are minted out-of-band via a CLI + (`cmd/mint-api-key`, wrapped by `scripts/mint-api-key.sh`) that inserts a row + and prints the plaintext once. + +The old AES auth path (`utils.GenerateToken`/`DecryptToken` for auth) is +**removed**. The demo/loadtest/test-webhooks scripts and the README are cut over +to mint and use an API key. + +### `api_keys` schema + +| column | type | notes | +|--------------|---------------|----------------------------------------| +| `id` | UUID PK | `gen_random_uuid()` / `uuid_generate_v4()` | +| `tenant_id` | TEXT NOT NULL | the tenant identifier (== `owner_id`) | +| `key_hash` | TEXT UNIQUE NOT NULL | SHA-256 hex of full `mp_..._...` key | +| `prefix` | TEXT NOT NULL | public, indexed; narrows lookup | +| `scopes` | JSONB NOT NULL DEFAULT `'[]'` | reserved for authz | +| `expires_at` | TIMESTAMPTZ | NULL = never expires | +| `revoked_at` | TIMESTAMPTZ | NULL = active | +| `created_at` | TIMESTAMPTZ NOT NULL DEFAULT now() | | + +Indexes on `prefix` and `key_hash`. + +## 3. Key split — separate webhook-signing key from auth key + +**Decision:** introduce `WEBHOOK_ENCRYPTION_KEY` (32 bytes). Webhook secrets are +encrypted/decrypted with it (`service/webhook.go` + `webhook/dispatcher.go`) +instead of the shared `ENCRYPTION_KEY`. After the API-key cutover, `ENCRYPTION_KEY` +is no longer used for auth; the webhook key fully owns webhook-secret encryption, +so a leak of one no longer compromises the other. + +**Migration:** because the project is **pre-launch and local**, existing +`webhook_registrations` rows are **truncated** rather than re-encrypted — no +dual-read window or one-time re-encrypt pass is needed. + +## 4. Tenant isolation — repository-layer scoping (close the IDOR) + +**Decision:** enforce `WHERE owner_id = $tenant` at the repository layer so a +caller can never touch another tenant's asset by ID. + +- The verified IDOR surface today is the **`complete` write path**: + `MarkAssetUploadedTx` updates by `asset_id` alone. The owner guard is added: + `WHERE asset_id = $1 AND owner_id = $tenant AND status = 'uploading'`. +- The service maps "0 rows / not owned" to a **404** (indistinguishable from a + non-existent asset — no cross-tenant existence leak). +- The tenant id is threaded from context (`GetTenant`) into the asset + service/repo calls. + +**Migration:** delete existing `assets` (and dependent rows) — pre-launch local +data — then `ALTER COLUMN owner_id SET NOT NULL` and add index `idx_assets_owner`. + +### Deferred: per-tenant storage prefix + +Per-tenant object prefixes (`media//raw/...`) are **out of scope** for +this track. The worker reconstructs `media/raw/` +(`worker/processing/processor.py`) and its processed-output keys from `asset_id` +without selecting `owner_id`; prefixing would break the worker's download/upload +paths and require threading tenant through the worker SQL + key construction + +tests — exactly the worker churn the flat-tenancy decision avoids. Asset IDs are +UUIDs, access is gated by presigned URLs, and the IDOR is closed at the DB layer, +so per-tenant prefixing is defense-in-depth, deferrable to a later track. + +## 5. Idempotency keys — full response replay, 24h TTL + +**Decision:** Stripe-style idempotency on `presign` (and `complete` when the +header is present), scoped per-tenant, with **full response replay**. + +- **Header:** `Idempotency-Key`. Absent → behave exactly as today (no-op). +- **Storage:** `idempotency_keys` table, PK `(tenant_id, key)`: + + | column | type | notes | + |-----------------------|--------------|--------------------------------------| + | `tenant_id` | TEXT | part of PK | + | `key` | TEXT | part of PK | + | `request_fingerprint` | TEXT | hash of method+path+body | + | `status` | TEXT | `pending` / `done` | + | `response_status` | INT | replayed HTTP status | + | `response_body` | JSONB | replayed body | + | `asset_id` | UUID | created asset (nullable) | + | `created_at` | TIMESTAMPTZ | | + | `expires_at` | TIMESTAMPTZ | `created_at + 24h` | + +- **Concurrency:** the first request inserts a `pending` row; the PK unique + constraint is the lock. Concurrent duplicates that collide on the `pending` + row get `409` (in-flight). Once `done`, replays within TTL return the stored + response **verbatim**. +- **Reuse with a different payload:** same `(tenant, key)` but a different + `request_fingerprint` → `422` (key reused for a different request). +- **TTL:** 24h. A background sweep deletes expired rows. + +## 6. Quotas + rate limits — per-tenant + +**Decision:** per-tenant token-bucket rate limiting (keyed by `tenant_id`, +replacing/extending the existing per-IP `presignRateLimiter`) returning `429` +with `Retry-After`, plus usage accounting (assets and/or bytes per tenant) +checked on `presign` with over-quota rejection. Limits are config-driven with +sane defaults, and a per-tenant usage metric is exposed. + +## 7. Sequencing + +Auth (Tasks 1–3) lands first because idempotency and tenant scoping both depend +on a real tenant in context, and no identity table existed before. Then tenant +scoping (Task 4) closes the IDOR, and idempotency (Task 5) + quotas (Task 6) +layer on top. Each task is independently demoable. + +## 8. Compatibility / landmines + +- `ENCRYPTION_KEY` remains a required 32-byte boot config (webhook key split + adds `WEBHOOK_ENCRYPTION_KEY` alongside it). +- Three scripts + README mint the old AES token inline; all are cut over to API + keys in Task 2/3. +- `owner_id` column name is preserved → worker untouched. diff --git a/experiments/0001-worker-saturation.md b/experiments/0001-worker-saturation.md new file mode 100644 index 0000000..8214ee6 --- /dev/null +++ b/experiments/0001-worker-saturation.md @@ -0,0 +1,117 @@ +# Experiment 0001 — Worker saturation under load + +**Date:** 2026-06-30 · **Track:** 3 (observability & load) · **Feeds:** Track 1 (concurrent worker) +**Status:** complete + +## Hypothesis + +The Python worker is single-threaded (`consume()` reads one message, `count=1`, +and processes it inline; `MAX_CONCURRENT_JOBS` exists in config but is unused). +Under sustained upload load the worker — not the API — should be the bottleneck, +with the Redis stream growing without bound once arrival rate exceeds the +worker's service rate (Little's Law, `L = λW`). + +## Setup (record this with every run) + +- **Resource pinning** (`docker-compose.loadtest.yml`): `api` = 1.0 CPU / 512 MB, + `worker` = **1.0 CPU** / 1 GB. The single-CPU pin makes the bottleneck a stable, + observable fact rather than something that moves with spare laptop cores. +- **Sampling:** `TRACE_SAMPLING_RATE=1.0` (every asset traced). +- **Stack:** core + observability + loadtest overlays, all up. +- **Workload:** images only, unique bytes per iteration (dedup defeated — see + `loadtest/lib.js`). Fixture `worker/tests/test_assets/image.jpg`, 3 webp + variants per asset. +- **Profile:** open model, `./loadtest/run.sh open --rate 10/s --duration 60s` + (fixed arrival rate; λ = 10 uploads/s). + +> Local results are **relative**. Trust the bottleneck location and the +> before/after deltas, not the absolute throughput — laptop CPU, no network +> latency, single-node Redis/Postgres. + +## Method (the loop) + +1. Is an SLO breached? 2. USE — is the worker CPU- or queue-saturated? +3. Open an exemplar trace — which span dominates? 4. Form a hypothesis, change +one thing, re-run the **same** profile, compare. + +## Results (before — no optimisation yet) + +| Signal | Value | Source | +|--------|-------|--------| +| Arrival rate (λ) | 10.0 uploads/s | k6 `mpiper_assets_submitted` | +| Worker service rate (μ) | **1.13 jobs/s** | `rate(mpiper_mpiper_job_processing_success_total[2m])` | +| Mean asset processing time | 0.81 s | `…_duration_seconds_sum / …_count` | +| Queue depth before → after | 3985 → 4370 (↑) | `sli:queue_depth:current` | +| Worker CPU | **98.5 %** (pinned at 1 CPU) | `docker stats` | +| API CPU | 0.4 % | `docker stats` | +| Presign p95 (API) | 48 ms (SLO < 150 ms ✅) | `sli:presign_latency_seconds:p95` | +| Job success rate | 1.0 (SLO > 99 % ✅) | `sli:job_success_ratio:ratio_rate5m` | + +**Reading:** λ (10/s) ≫ μ (1.13/s). The queue grows monotonically; the system is +**unstable for any arrival rate above ≈ 1.1 uploads/s**. The API is essentially +idle (0.4 % CPU, presign well inside SLO) while the worker is pinned at 98.5 %. +The bottleneck is unambiguously the worker, and specifically its +**single-threaded, one-job-at-a-time** processing loop — not CPU work that is +inherently slow (a single image is ~0.8 s), but the complete absence of +concurrency. + +## Trace evidence (where the time goes) + +With the trace gap now closed (Track 3, Phase 1), one asset is a single trace +from the API through the queue into the worker — example, 19 spans: + +``` +/api/v1/assets/{id}/complete (API HTTP request) +└ AssetHandler.MarkAssetUploaded + └ AssetService.MarkAssetUploaded + ├ StorageClient.GetObjectAttrs → S3.GetObjectAttrs + └ Database.Transaction + ├ AssetRepo.MarkAssetUploadedTx + ├ AssetRepo.InsertProcessAssetJobTx + └ OutboxRepo.InsertTx + └ outbox.publish (relay re-activates stored context) + └ RedisQueue.Enqueue (injects traceparent into the message) + ├ RedisQueue.doXAddWithRetry + └ worker.consume (── crosses the Redis boundary ──) + └ process.dispatch + ├ process.download + ├ process.dedup_check + └ image.variant × 3 +``` + +The **gap between `RedisQueue.Enqueue` and `worker.consume`** is the queue wait — +the time an asset spends backed up behind the single worker. Under this profile +that gap dominates end-to-end latency, and it grows for every asset because the +backlog only ever increases. The in-worker stages (download, dedup, 3 variants) +are individually fast; the cost is waiting for a free worker, not the work itself. + +## Conclusion + +The single-threaded worker is the bottleneck, with a service rate of ~1.1 +jobs/s. The pipeline cannot keep up with anything beyond a trickle of uploads, +and the deficit manifests as an unbounded Redis backlog and ever-growing +queue-wait latency — while the API and host CPU sit idle. This is the motivating +evidence for **Track 1 (concurrent worker + stream recovery + DLQ)**: honour +`MAX_CONCURRENT_JOBS` as a real concurrency limit (process pool for the +CPU-bound Pillow/ffmpeg work) so μ scales with available cores instead of being +fixed at one. + +## Reproduce + +```bash +docker compose -f docker-compose.yml -f docker-compose.observability.yml \ + -f docker-compose.loadtest.yml up -d --build +./loadtest/run.sh open --rate 10/s --duration 60s +# Grafana http://localhost:3000 → MPiper folder: +# - "Worker / App Saturation (USE)": queue depth climbing, in-flight pinned +# - "Pipeline Funnel": ready/s flat at ~1.1 while uploaded/s tracks arrival +# Tempo (Explore): TraceQL `{ name="worker.consume" }` → open one → see the +# enqueue→consume queue-wait gap and the per-stage breakdown. +``` + +## Next experiment + +After Track 1 lands a bounded worker pool, re-run this **exact** profile and +compare: μ should rise roughly with the pool size (until CPU-bound), queue depth +should stabilise instead of growing, and the enqueue→consume gap should shrink. +Record results as `0002-concurrent-worker.md`. diff --git a/experiments/0002-concurrent-worker.md b/experiments/0002-concurrent-worker.md new file mode 100644 index 0000000..2ecceda --- /dev/null +++ b/experiments/0002-concurrent-worker.md @@ -0,0 +1,200 @@ +# Experiment 0002 — Concurrent worker + +**Date:** 2026-06-30 · **Track:** 1 (concurrent worker + recovery + DLQ) · **Follows:** 0001 +**Status:** implementation complete; **after-load numbers pending a live run** (see *Results (after)*). + +## Hypothesis + +0001 proved the worker is the bottleneck: a single-threaded loop with a service +rate μ ≈ 1.1 jobs/s while the API sits idle and the Redis backlog grows without +bound. `MAX_CONCURRENT_JOBS` existed in config but was never used. + +Honouring `MAX_CONCURRENT_JOBS` as a real concurrency limit (a bounded worker +pool) should raise μ roughly **N×** (until the worker becomes CPU-bound on its +allotted cores), at which point the queue **stabilises/drains** for any arrival +rate λ ≤ μ instead of growing. Per-job latency is unchanged — the win is +throughput. Job success must stay 100 % (no double-processing). + +## What changed (the implementation under test) + +- **Bounded thread pool.** The consumer now owns a + `ThreadPoolExecutor(max_workers=MAX_CONCURRENT_JOBS)`. `consume()` reads only up + to the current **free capacity** (`MAX_CONCURRENT_JOBS − in-flight`) and submits + each message to the pool; at capacity it returns immediately (no blocking read + held open while full). (`worker/consumer/consumer.py`, `main.py`) + - **Why threads, not processes:** per-job work is I/O + subprocess heavy — + object-store download/upload (releases GIL), ffmpeg via `subprocess` (true + parallelism), Pillow (releases GIL for most ops), psycopg (I/O). Threads give + real concurrency here while sharing one thread-safe `psycopg_pool` and one set + of (thread-safe) OTel instruments. A process pool would force per-process + DB/Redis pools, pickling the storage client, and per-process OTel init. + **GIL escalation path** (documented in the module): if profiling later shows + GIL-bound Python sections dominate, move only the transform stage to a + `ProcessPoolExecutor` (hybrid), not the whole consumer. +- **Invariants preserved.** Per-`msg_id` ack (each task acks only its own message + on success; failures stay in the PEL); the `SELECT … FOR UPDATE` job claim and + `status == 'done'` short-circuit are untouched; `_handle_job` still owns the + asset `failed`/`ready` transition (DEV-34); each task starts its **own** + `worker.consume` span with that message's extracted `traceparent` (no shared + spans); per-task metrics (`record_consume`/`record_job`/`record_asset`), no + `asset_id` on any metric label. +- **Bounded shutdown drain.** On SIGTERM the loop stops reading and + `consumer.shutdown(timeout=SHUTDOWN_DRAIN_TIMEOUT, default 30 s)` waits for + in-flight jobs, then stops. Anything still running is abandoned and safely + reclaimed by recovery (below). Keep the timeout ≤ the container + `stop_grace_period`. +- **DB pool scales with concurrency.** `PgPool` is now sized + `MAX_CONCURRENT_JOBS + 2`; each in-flight job holds at most one connection, so + the pool no longer silently caps concurrency. (`worker/consumer/db.py`, `main.py`) +- **XAUTOCLAIM recovery.** The old DB-scan + `XADD` requeue is replaced by + `XAUTOCLAIM` on `media:jobs` / `worker-group`: messages idle past + `RECOVERY_MIN_IDLE_MS` (default 120 000) are reclaimed from dead consumers and + re-dispatched through the same bounded pool, capped at free capacity. +- **Dead-letter stream.** Permanent failures (non-retryable, or attempts ≥ + `max_retries`) `XADD` to `media:jobs:dlq` with failure metadata and `XACK` the + original (previously left unacked and reclaimed forever). A message reclaimed + more times than `max_retries` is also dead-lettered. DLQ depth is exposed as the + `mpiper.dlq.depth` observable gauge with a panel on **Queue Health**. + +## Setup (record this with every run) + +- **A/B via env knobs on `docker-compose.loadtest.yml`** (no new overlays, same + binary): `WORKER_CPUS=4` on **both** sides (give the pool real cores), vary + `MAX_CONCURRENT_JOBS` (1 = serial baseline → 4/8 = concurrent). + `TRACE_SAMPLING_RATE=1.0`. API = 1.0 CPU / 512 MB. +- **Stack:** core + observability + loadtest + webhooks overlays. +- **Workload:** images, unique bytes per iteration (dedup defeated). 3 webp + variants per asset. +- **Measurement:** `./loadtest/run.sh closed --vus 20 --duration 2m` to apply a + saturating load, then μ = Δ`jobs.status='done'` over 30 s while draining (clean + steady-state, free of restart-ramp and API contention). `./loadtest/run.sh + capture "