feat: concurrent worker + webhook throughput + observability#20
Merged
Conversation
Persist the producer span context in the event_outbox.traceparent column, re-activate it in the relay (so enqueue rejoins the request trace), and inject traceparent into the Redis stream message so the worker can continue the trace.
Add an OTel tracer (worker/utils/tracing.py), extract the producer context in consume() and start worker.consume as a child-with-link, and span the pipeline stages (dispatch, download, dedup, image variants, ffmpeg). Also wire the previously-uncalled init_metrics and record job/asset/queue metrics so the worker SLIs have data.
Stamp trace_id/span_id onto API request logs (TracingMiddleware now runs before the logger) and worker logs, and broaden the Loki derived-field regex to link log lines to their Tempo trace across formats.
Read the chi route pattern after routing (http_route was always 'unknown'), cut the metric export interval to 15s, add finer histogram buckets for the queue-lag SLI, and export DB connection-pool gauges via db.Stats().
Add the loadtest overlay (CPU/mem pinning + full sampling), Prometheus SLO recording rules, remote-write receiver and exemplar storage, and pin Tempo to 2.6.1 (latest had an incompatible config schema).
Move the dashboard provider config into the dashboards provisioning dir (it was misplaced under datasources/, so no dashboards ever loaded) and repair the legacy metrics dashboard's datasource binding and stale metric names.
API RED, worker/app saturation (USE), pipeline funnel, queue health, and a consolidated experiment overview combining k6 client load with server-side pipeline, worker, queue and DB metrics.
Closed- and open-model scripts running the real presign→upload→complete client flow with per-iteration unique bytes (dedup defeat), SLO-mapped thresholds, a host-run wrapper, and Prometheus remote-write of client metrics.
First bottleneck-analysis writeup: single-threaded worker saturates at ~1.1 jobs/s while the API stays idle, motivating Track 1.
…s helper db.query.duration used default coarse buckets, inflating its p95 to ~4.75s (true mean ~18ms). Add a fine-bucket view mirroring the http/queue views. Add an exported NewTestMetrics() backed by a ManualReader so other packages' tests can assert recorded metrics without an OTLP exporter.
tick() now fans the claimed batch out across an errgroup bounded by WEBHOOK_CONCURRENCY (default 10) instead of delivering serially. The http.Client gets a tuned transport (MaxIdleConnsPerHost/MaxConnsPerHost = concurrency) so concurrent POSTs to one host reuse connections. NewDispatcher takes *metrics.Metrics and records delivery total/duration/failures per delivery (labels event,status). Documents the single-dispatcher SKIP-LOCKED-without-tx assumption.
…ack 1) Worker now honours MAX_CONCURRENT_JOBS via a ThreadPoolExecutor: consume() reads only free capacity and dispatches per-task with per-msg_id ack, per-task spans, and a bounded SIGTERM drain. PgPool sizes to the pool (+2) and clamps min_size<=max_size (fixes a crash at small pools). Recovery replaced with XAUTOCLAIM; permanent/over-reclaimed failures route to media:jobs:dlq with metadata + ack (no more unacked-forever). DLQ depth exposed as mpiper.dlq.depth gauge with a Queue Health panel. Measured 2.37x steady-state throughput (0.73->1.73 jobs/s) at mcj=4 on 4 cores; mcj should track core count.
…ceiver port docker-compose.loadtest.yml exposes WORKER_CPUS/WORKER_MEM/MAX_CONCURRENT_JOBS/WEBHOOK_CONCURRENCY env knobs (baseline defaults) so a concurrency A/B runs on the same binary with no new overlays. run.sh gains a 'capture' mode that snapshots headline Prometheus signals. webhook-receiver host port 8888->8899 (8888 collided with the otel-collector).
0002 records the measured A/B: 2.37x worker throughput, multi-core utilisation, the mcj-vs-cores tuning lesson, and a live DLQ demo. 0003 honestly reports webhooks were not the bottleneck at local scale (fast receiver, CPU-pinned API) and what would be needed to surface it. Also tracks the Track 1 handoff doc.
…doff Update the enhancements roadmap to reflect the measured Track 1 win (2.37x) and Track 1b completion, mark the resolved seams, and re-point 'next' to Track 2. Add track-02-handoff.md grounded in current code — flags that RegisterQueueDepthFunc is defined but never wired in main.go (the scaling signal must be recorded first) and carries forward the mcj-per-pod lesson.
…ghput Update Features, architecture flow, worker/webhook env knobs (incl. MAX_CONCURRENT_JOBS tuning note, RECOVERY_MIN_IDLE_MS, STREAM_DLQ_NAME, SHUTDOWN_DRAIN_TIMEOUT, WEBHOOK_CONCURRENCY), and the roadmap to match the shipped Track 1/1b/3 work.
RegisterQueueDepthFunc IS wired (in queue.go via XLen), not unwired. The real gap is that XLEN counts acked-but-untrimmed entries so it's not a true backlog; Task 0 is to add a consumer-group lag / oldest-pending-age gauge. Also note worker-deployment has no terminationGracePeriodSeconds (defaults to 30s = SHUTDOWN_DRAIN_TIMEOUT).
Track 2 (autoscaling) requires a k8s cluster which isn't available locally, so Track 4 (fully local: auth, tenant isolation, idempotency keys, quotas) is prepped to run meanwhile. Handoff grounded in code: AES token has no expiry/rotation and shares ENCRYPTION_KEY with webhook secrets; owner_id exists but reads aren't consistently scoped (likely IDOR); no idempotency keys (retried presign dupes assets).
…orage isolation - Add API key repository with hashed secrets, expiry, scopes and revocation - Add JWT/API key authorization middleware with tenant context injection - Add per-tenant rate limiting and asset quota middleware - Add Stripe-style idempotency key middleware for presign/complete - Split webhook signing key from auth encryption key - Enforce repository-layer tenant isolation on assets - Use per-tenant storage prefixes in API and worker - Gate destructive migrations 7/8 behind MIGRATION_ALLOW_DESTRUCTIVE - Fix handler error responses to avoid leaking internal errors - Add tenant slug validation and mint-api-key CLI - Add integration tests for tenant scoping and idempotency
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Bundles three roadmap tracks that were developed on this branch (Track 3 was never
merged to
staging, so it rides along):Redis → worker → ffmpeg), OTel metrics on API + worker, SLO recording rules,
provisioned Grafana dashboards, and a host-run k6 harness.
ThreadPoolExecutorhonouring
MAX_CONCURRENT_JOBS,XAUTOCLAIMdead-consumer recovery, and amedia:jobs:dlqdead-letter stream with a depth gauge.errgroupdelivery fan-out(
WEBHOOK_CONCURRENCY), tuned HTTP transport, and the previously-unrecordedwebhook_delivery_*metrics wired.Measured result (live A/B, same binary)
mcj=1)mcj=4)100% job success, 0 DB pool waits, live DLQ verified. Lesson banked: set
MAX_CONCURRENT_JOBS ≈ cores(mcj=8oversubscribed and was slower). Seeexperiments/0002(worker) andexperiments/0003(webhooks).Tested
go build/vet,go test ./internal/... ./pkg/..., webhook integration compiles (-tags integration).test_consumer_pool.py,test_db_pool.py+ XAUTOCLAIM/DLQ coverage.docker-compose.loadtest.ymlenv knobs).Notes for reviewers
db.pycrash fix beyond the original plan:psycopg_pooldefaultsmin_size=4, which crash-looped the worker at small pools (mcj=1); now clampsmin_size ≤ max_size.experiments/0003.docs/enhancements/track-02-handoff.md(autoscaling) flags that the queue-depth gauge (RegisterQueueDepthFunc) is defined but never wired — prerequisite for scaling.