Skip to content

feat: concurrent worker + webhook throughput + observability#20

Merged
RndmCodeGuy20 merged 21 commits into
stagingfrom
feat/track-01-concurrent-worker
Jul 1, 2026
Merged

feat: concurrent worker + webhook throughput + observability#20
RndmCodeGuy20 merged 21 commits into
stagingfrom
feat/track-01-concurrent-worker

Conversation

@RndmCodeGuy20

Copy link
Copy Markdown
Owner

Summary

Bundles three roadmap tracks that were developed on this branch (Track 3 was never
merged to staging, so it rides along):

  • Track 3 — Observability & load testing: end-to-end tracing (API → outbox →
    Redis → worker → ffmpeg), OTel metrics on API + worker, SLO recording rules,
    provisioned Grafana dashboards, and a host-run k6 harness.
  • Track 1 — Concurrent worker + recovery + DLQ: bounded ThreadPoolExecutor
    honouring MAX_CONCURRENT_JOBS, XAUTOCLAIM dead-consumer recovery, and a
    media:jobs:dlq dead-letter stream with a depth gauge.
  • Track 1b — Webhook throughput: concurrent errgroup delivery fan-out
    (WEBHOOK_CONCURRENCY), tuned HTTP transport, and the previously-unrecorded
    webhook_delivery_* metrics wired.

Measured result (live A/B, same binary)

μ (jobs/s) Worker CPU
Serial (mcj=1) 0.73 ~1 core
Concurrent (mcj=4) 1.73 (2.37×) ~3.2 cores

100% job success, 0 DB pool waits, live DLQ verified. Lesson banked: set
MAX_CONCURRENT_JOBS ≈ cores (mcj=8 oversubscribed and was slower). See
experiments/0002 (worker) and experiments/0003 (webhooks).

Tested

  • Go: go build/vet, go test ./internal/... ./pkg/..., webhook integration compiles (-tags integration).
  • Worker: 32 unit tests pass in-container; new test_consumer_pool.py, test_db_pool.py + XAUTOCLAIM/DLQ coverage.
  • Live load test via the k6 harness (docker-compose.loadtest.yml env knobs).

Notes for reviewers

  • db.py crash fix beyond the original plan: psycopg_pool defaults min_size=4, which crash-looped the worker at small pools (mcj=1); now clamps min_size ≤ max_size.
  • Honest webhook finding: at local scale (fast receiver, CPU-pinned API) the dispatcher kept up even serially, so Track 1b is observability + headroom here, not a measured throughput win — documented in experiments/0003.
  • Next track prepared: docs/enhancements/track-02-handoff.md (autoscaling) flags that the queue-depth gauge (RegisterQueueDepthFunc) is defined but never wired — prerequisite for scaling.

Persist the producer span context in the event_outbox.traceparent column,
re-activate it in the relay (so enqueue rejoins the request trace), and inject
traceparent into the Redis stream message so the worker can continue the trace.
Add an OTel tracer (worker/utils/tracing.py), extract the producer context in
consume() and start worker.consume as a child-with-link, and span the pipeline
stages (dispatch, download, dedup, image variants, ffmpeg). Also wire the
previously-uncalled init_metrics and record job/asset/queue metrics so the
worker SLIs have data.
Stamp trace_id/span_id onto API request logs (TracingMiddleware now runs before
the logger) and worker logs, and broaden the Loki derived-field regex to link
log lines to their Tempo trace across formats.
Read the chi route pattern after routing (http_route was always 'unknown'),
cut the metric export interval to 15s, add finer histogram buckets for the
queue-lag SLI, and export DB connection-pool gauges via db.Stats().
Add the loadtest overlay (CPU/mem pinning + full sampling), Prometheus SLO
recording rules, remote-write receiver and exemplar storage, and pin Tempo to
2.6.1 (latest had an incompatible config schema).
Move the dashboard provider config into the dashboards provisioning dir (it was
misplaced under datasources/, so no dashboards ever loaded) and repair the
legacy metrics dashboard's datasource binding and stale metric names.
API RED, worker/app saturation (USE), pipeline funnel, queue health, and a
consolidated experiment overview combining k6 client load with server-side
pipeline, worker, queue and DB metrics.
Closed- and open-model scripts running the real presign→upload→complete client
flow with per-iteration unique bytes (dedup defeat), SLO-mapped thresholds, a
host-run wrapper, and Prometheus remote-write of client metrics.
First bottleneck-analysis writeup: single-threaded worker saturates at ~1.1
jobs/s while the API stays idle, motivating Track 1.
…s helper

db.query.duration used default coarse buckets, inflating its p95 to ~4.75s (true mean ~18ms). Add a fine-bucket view mirroring the http/queue views. Add an exported NewTestMetrics() backed by a ManualReader so other packages' tests can assert recorded metrics without an OTLP exporter.
tick() now fans the claimed batch out across an errgroup bounded by WEBHOOK_CONCURRENCY (default 10) instead of delivering serially. The http.Client gets a tuned transport (MaxIdleConnsPerHost/MaxConnsPerHost = concurrency) so concurrent POSTs to one host reuse connections. NewDispatcher takes *metrics.Metrics and records delivery total/duration/failures per delivery (labels event,status). Documents the single-dispatcher SKIP-LOCKED-without-tx assumption.
…ack 1)

Worker now honours MAX_CONCURRENT_JOBS via a ThreadPoolExecutor: consume() reads only free capacity and dispatches per-task with per-msg_id ack, per-task spans, and a bounded SIGTERM drain. PgPool sizes to the pool (+2) and clamps min_size<=max_size (fixes a crash at small pools). Recovery replaced with XAUTOCLAIM; permanent/over-reclaimed failures route to media:jobs:dlq with metadata + ack (no more unacked-forever). DLQ depth exposed as mpiper.dlq.depth gauge with a Queue Health panel. Measured 2.37x steady-state throughput (0.73->1.73 jobs/s) at mcj=4 on 4 cores; mcj should track core count.
…ceiver port

docker-compose.loadtest.yml exposes WORKER_CPUS/WORKER_MEM/MAX_CONCURRENT_JOBS/WEBHOOK_CONCURRENCY env knobs (baseline defaults) so a concurrency A/B runs on the same binary with no new overlays. run.sh gains a 'capture' mode that snapshots headline Prometheus signals. webhook-receiver host port 8888->8899 (8888 collided with the otel-collector).
0002 records the measured A/B: 2.37x worker throughput, multi-core utilisation, the mcj-vs-cores tuning lesson, and a live DLQ demo. 0003 honestly reports webhooks were not the bottleneck at local scale (fast receiver, CPU-pinned API) and what would be needed to surface it. Also tracks the Track 1 handoff doc.
…doff

Update the enhancements roadmap to reflect the measured Track 1 win (2.37x) and Track 1b completion, mark the resolved seams, and re-point 'next' to Track 2. Add track-02-handoff.md grounded in current code — flags that RegisterQueueDepthFunc is defined but never wired in main.go (the scaling signal must be recorded first) and carries forward the mcj-per-pod lesson.
…ghput

Update Features, architecture flow, worker/webhook env knobs (incl. MAX_CONCURRENT_JOBS tuning note, RECOVERY_MIN_IDLE_MS, STREAM_DLQ_NAME, SHUTDOWN_DRAIN_TIMEOUT, WEBHOOK_CONCURRENCY), and the roadmap to match the shipped Track 1/1b/3 work.
RegisterQueueDepthFunc IS wired (in queue.go via XLen), not unwired. The real gap is that XLEN counts acked-but-untrimmed entries so it's not a true backlog; Task 0 is to add a consumer-group lag / oldest-pending-age gauge. Also note worker-deployment has no terminationGracePeriodSeconds (defaults to 30s = SHUTDOWN_DRAIN_TIMEOUT).
Track 2 (autoscaling) requires a k8s cluster which isn't available locally, so Track 4 (fully local: auth, tenant isolation, idempotency keys, quotas) is prepped to run meanwhile. Handoff grounded in code: AES token has no expiry/rotation and shares ENCRYPTION_KEY with webhook secrets; owner_id exists but reads aren't consistently scoped (likely IDOR); no idempotency keys (retried presign dupes assets).
…orage isolation

- Add API key repository with hashed secrets, expiry, scopes and revocation
- Add JWT/API key authorization middleware with tenant context injection
- Add per-tenant rate limiting and asset quota middleware
- Add Stripe-style idempotency key middleware for presign/complete
- Split webhook signing key from auth encryption key
- Enforce repository-layer tenant isolation on assets
- Use per-tenant storage prefixes in API and worker
- Gate destructive migrations 7/8 behind MIGRATION_ALLOW_DESTRUCTIVE
- Fix handler error responses to avoid leaking internal errors
- Add tenant slug validation and mint-api-key CLI
- Add integration tests for tenant scoping and idempotency
@RndmCodeGuy20 RndmCodeGuy20 merged commit a3bf592 into staging Jul 1, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant