Drop egress_id label from handler counters/histograms; merge in service#1269
Conversation
The HandlerMonitor counters and histograms no longer carry egress_id, so each (type, status) combination is one time series across the whole node instead of one per egress. The service-side gatherer aggregates live handler metrics together with a persistent accumulator of finished-handler totals and any per-egress staged metrics. Counter monotonicity across the start-end handover is preserved by folding staged values into the accumulator atomically inside ProcessFinished (via a hook held under pm.mu), and by a metricsFinalized flag on Process that suppresses live IPC values once the handler has reported its final tally. Channel-size gauges keep egress_id since they represent per-egress instantaneous level. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drops the endedPending staging map and the SetOnProcessFinished hook. StoreProcessEndedMetrics now merges parsed families straight into endedAccumulator after MarkMetricsFinalized, which is sufficient to keep live and finished contributions from overlapping. HandlerFinished is invoked exactly once per handler, so the staging layer wasn't buying idempotency in practice. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Design notes moved into the PR description. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| egressLabelPair := &dto.LabelPair{ | ||
| Name: &egressIDLabel, | ||
| Value: &egressID, | ||
| // ErrCannotMergeGauges is returned when mergeFamilies encounters two gauge |
There was a problem hiding this comment.
some of these comments are pretty excessive, I've been asking it to only keep comments on exported functions, large helpers, or when something needs to be explained or has side effects
|
This is a potential approach to reduce the metrics cardinality. It has pros and cons, the main con being the error prone manual merging or metrics in the service. This is currently extensively unit tested but is making assumptions about metrics merging semantics. A possible alternative suggested previously was to remove some of the prometheus metrics from the handler and send corresponding events/gauges using a separate RPC in a different format, with the service creating a prometheus metrics from these events. |
|
AV-sync stats summary: view in run #28001266831 |
|
What happens when a handler exits without sending Since these are counters, wouldn't Prometheus treat that drop as a counter reset and produce a phantom If that's right, is the intent to cache last-gathered values and promote them in |
|
|
||
| collected := make([]*dto.MetricFamily, 0) | ||
| for _, g := range live { | ||
| f, err := g.Gather() |
There was a problem hiding this comment.
this somehow turns on my race condition radar wrt concurrency with finalize goroutine - there might be cases where we double count or undercount - let me try to pin that down...
There was a problem hiding this comment.
STATE: X.finalized=false, accumulator=0, collected=[]
S1 scrape: live = [X]
S2 scrape: X.Gather() → G1 flag is false → G2 IPC → returns 75
collected = [75] ← scrape has now "locked in" X's live value
(X.finalized still false)
── handler X finishes now; finalize starts ──
F3 finalize: MarkMetricsFinalized(X) X.finalized = true ← too late: scrape already read 75 at S2
F4 finalize: lock s.mu
F6 finalize: merge(0, 75)
F7 finalize: accumulator = 75 accumulator = 75
F8 finalize: unlock
── finalize done ──
S3 scrape: lock s.mu
S4 scrape: append accumulator (75) collected = [75, 75] ← live 75 AND accumulator 75
S6 scrape: unlock
S7 scrape: merge → both are pipeline_uploads with identical labels (egress_id is gone!) → 75 + 75
S8 scrape: returns 150 to Prometheus EXPOSED = 150 ✗
There was a problem hiding this comment.
This should be addressed, at the cost of holding a lock during the metric gathering IPC. A timeout has been added to avoid the worse effects of a deadlock.
| // dst has bucket 50 that src doesn't have; src has bucket 100 that dst | ||
| // doesn't have. Only the matched buckets are merged; the dst-only bucket | ||
| // keeps its original cumulative count, and the src-only bucket is dropped | ||
| // (consistent with assumption #1 in METRICS.md that handlers register |
There was a problem hiding this comment.
nit: this is probably the pre-impl plan you made - I don't see the md in the diff - did you want to add it? if not we can get rid of the reference.
That one would give correctness by construction - there could be no races, aggregation is just .Add() on a single authoritative counter in the main process. There's no cross-process merge, no live+accumulator reconciliation, no finalize/suppress dance. Also doesn't require understanding dto.MetricFamily proto internals, counter-reset semantics, histogram bucket structure, and the input-immutability rules which is less of maintenance burden IMO. Nice thing about this PR is that it doesn't change the existing mechanism and is easy to keep adding new metrics in the same way as we did it for now. So functionally the change set (with feedbacks addressed) puts us in the better place than what we have now. 👍 |
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This should be addressed |
Just trying to unlock some other work and avoid making worse the existing performance issues. We can revisit this later. The other proposed solution is a deeper refactoring IMHO. It also requires splitting the way we send metrics (summarizes counters vs per egress gauges) and quite a bit of plumbing needs to be added. |
Motivation
Egress runs many nodes and many concurrent egresses per node. Every
counter and histogram emitted by a handler subprocess carried an
egress_idconst label, so the instantaneous count of distinct(metric, egress_id)series across the fleet was#metrics × #buckets × #concurrent egresses cluster-wide— even withevery series promptly cleaned up on handler exit, the live snapshot was
already too large for the Prometheus server to index comfortably, and
every node-level
rate(...)aggregation had to fan out across allcurrently-running egresses to produce one number. The label was also
useless to the consumer at that aggregation level.
This PR drops
egress_idfrom handler-side counters and histograms soeach
(type, status)is one series per node, regardless of how manyegresses are running on it. The two channel-size gauges keep
egress_idbecause their per-egress value is the whole point — they let you spot
one stuck egress.
Because two handler subprocesses now emit the same counter/histogram
series name with identical labels, the service can no longer rely on
prometheus.Gatherers.Gather()to combine them (it rejects duplicates)— hence the explicit accumulator + one-shot pending buffer described
below. The pending buffer is not the cardinality fix; it's the
mechanism that preserves per-egress gauge visibility once across the
start/end handover. The cardinality fix is dropping the label.
Summary
egress_idconst label;channel-size gauges keep it.
MetricsServicenow owns an explicit merge over threepools: live handlers, a persistent
endedAccumulatorof runningtotals, and a one-shot
pendingMetricsbuffer for the per-egressfinal snapshot.
endedAccumulatorviaMergeInAccumulator, which is idempotent(guarded by an atomic finalize flag) and runs from both the
HandlerFinishedpath and process teardown. This means a handler thatdies without sending
HandlerFinished(SIGKILL/OOM/panic) stillcontributes its last-scraped tally, so the shared counter never drops
and Prometheus doesn't see a phantom counter reset.
gatherHandlerMetrics) runs under one mutex,including the live IPC gather, so a handler can't be moved from the
live pool into the accumulator in between the two reads of a single
scrape (the double-counting race).
mergeFamiliesreturnsErrCannotMergeGaugesinstead of silentlysumming colliding gauges; the error is logged on the gather path so a
regression in future label conventions becomes loud rather than wrong.
NewProcessCollector(it wasalready deregistering
NewGoCollector), soprometheus.Gatherers.Gather()stops rejecting the scrape with "collected before with the same name and
label values" when one or more handlers are live.
Design
Components
The handler-side
HandlerMonitoronly setsegress_idas a const label onthe two channel-size gauges. Counters and histograms have no
egress_id,so two handler subprocesses emit the same series name with identical
labels — they cannot be combined by
prometheus.Gatherers.Gather()(itrejects duplicates), which is why the service performs an explicit merge.
Service-side gather
MetricsService.gatherHandlerMetricsholdss.mufor the whole scrape andcollects three pools of metrics, feeding them into
mergeFamilies:Process.Gather()returns whatever its IPC peercurrently reports, or an empty slice if the handler has been finalized
(see below) or if its IPC call fails. The IPC call is bounded by
metricsGatherTimeout(2s) so a wedged handler can't stall the scrape,which now holds
s.mufor the duration. On each successful gather theaccumulator-eligible portion of the result is split out and cached on
the
Process(lastAccumulatableMetrics); this is the value promotedif the handler later dies without reporting (see "Lifecycle"). Per-egress
gauges (
segments_uploads_channel_size,playlist_uploads_channel_size)appear here while the handler is alive.
endedAccumulator) — running totals ofcounter/histogram values from all handlers that have ended. Bounded in
size by label cardinality of accumulator-eligible families (no
egress_id), which is why we partition at the fold boundary — see"Accumulator partition" below.
pendingMetrics) — the final snapshot ofany metric that is not accumulator-eligible (gauges, anything with
egress_id). Appended onStoreProcessEndedMetrics, drained andnil'd by
gatherHandlerMetricsunder the same mutex, so each entry isexposed on the first scrape after the handler exits and then gone.
mergeFamiliesgroups by family name, then within each family groupsmetrics by their full label set, then aggregates:
ErrCannotMergeGauges(first-write-wins so the output is still usable). Live-pool gauges use distinctegress_ids so they pass through unmerged in practice.sample_count,sample_sum, per-bucketcumulative_countmatched byupper_boundsample_countandsample_sum; drop quantilesmergeFamiliestreats inputs as read-only and returns freshly-allocatedfamilies/metrics. This is what lets us safely re-merge the persistent
accumulator on every gather without aliasing the accumulator's state.
Accumulator partition
Before folding a finished handler's parsed families,
StoreProcessEndedMetricsruns them throughsplitForAccumulator, whichroutes each metric to one of two destinations:
pendingMetrics:GAUGE(instantaneous level — has no meaningonce its handler is gone, and
mergeFamiliesrefuses to mergecolliding gauges; routing them to pending sidesteps any future
collision entirely);
egress_id(uniqueper handler — would grow
endedAccumulatorlinearly with handlerhistory without ever merging).
egress_idlabel) is folded into
endedAccumulator.A family with a mix of
egress_idand non-egress_idmetrics is split:the non-
egress_idmetrics go to the accumulator, theegress_idonesgo to pending. The two routing rules above are independent so the
partition catches both gauges and any future per-egress
counter/histogram.
The pending buffer keeps the visibility property of the previous design —
when a handler exits, its final per-egress values (e.g. depth of the
upload queue at shutdown) show up on the next
/metricsscrape and thendisappear, matching the gauge semantics that no queue exists for a dead
egress.
The default Prometheus registry (the service's own
go_*,process_*,and
promhttp_*metrics) is folded in viaprometheus.Gatherers{...}.Gather()as a separate gatherer; it does notpass through the merge step. This is safe only because handler
subprocesses deregister
NewGoCollectorandNewProcessCollectorinNewHandler(see assumption 3 below).Lifecycle of an ending handler
The accumulator fold is centralised in
MergeInAccumulator, which isidempotent: it calls
pm.FinalizeMetrics(egressID), which atomicallytest-and-sets the handler's
metricsFinalizedflag (Swap(true)) andreturns the handler's cached accumulatable tally. The first caller to
finalize a handler folds its tally into
endedAccumulator; any latercaller gets
alreadyFinalized == trueand is a no-op. Once finalized,Process.Gather()returns empty.Graceful exit (
HandlerFinisheddelivered):Hard exit (no
HandlerFinished— SIGKILL/OOM/panic/failed IPC):Why this avoids double-counting and counter resets:
gatherHandlerMetricsholdss.muacross both the live gather and the accumulator read, and the
finalize-and-fold in
mergeInAccumulatorLockedis a single criticalsection under the same
s.mu. So a scrape observes a handler either aslive (not yet finalized →
Gather()returns its values, accumulatordoesn't yet include it) or as accumulated (finalized →
Gather()returns empty, accumulator includes it) — never both, never neither.
This closes the race where a scrape read a handler's live value an
instant before it was promoted and then also read the freshly-folded
accumulator.
StoreProcessEndedMetricsandthe teardown
MergeInAccumulatorroute through the same atomicfinalize, so the tally is folded exactly once regardless of ordering.
promotes the last-cached tally before the handler is removed from
activeHandlers, the handler's contribution never silently drops outof the shared series when it dies without reporting.
The merged emitted value is therefore non-decreasing across consecutive
scrapes (sum of monotonic series), so
rate()is meaningful.Restart behaviour
endedAccumulatoris in-memory. A service restart loses it, and livehandlers are children of the service so they restart from zero too.
Prometheus sees one counter-reset event per service restart, which is the
same behaviour as before this change and is handled correctly by
rate().Concurrency
Three locks, always taken in the order
s.mu → pm.mu → Process.metricsMu(no path takes them in the opposite order, so there's no cycle):
MetricsService.mu(s.mu) — guardsendedAccumulator/pendingMetricsand is held across the entire scrape and the entire finalize-and-fold.
processManager.mu(pm.mu) — guards theactiveHandlersmap only.StoreAccumulatableMetrics/FinalizeMetricstake it as a read lockbecause they only look the handler up; the per-handler field they touch
has its own lock.
Process.metricsMu— guardslastAccumulatableMetrics, written byProcess.Gather()(scrape) and read byFinalizeMetrics, so the cacheis self-protecting regardless of caller.
Assumptions
handler registers the same counter / histogram names with the same
bucket boundaries. Histogram merge matches buckets by
upper_bound; abucket present in one handler but absent in another would be carried
through unmerged and could produce skewed quantile estimates
downstream.
HandlerFinishedloses at most a tailof increments. The teardown path promotes the tally cached by the
last successful
Process.Gather(), so the handler's contribution up toits last scrape is preserved and the shared counter does not drop. Only
increments produced between the last scrape and the hard exit are lost
— acceptable, and far better than losing the whole tally (which would
register as a counter reset on the shared series).
client_golangauto-registersNewGoCollectorandNewProcessCollectoron the default registry;pkg/handler/handler.goexplicitly
Unregisters both duringNewHandler. This keepsgo_*/process_*flowing only from the service's ownprometheus.DefaultRegisterer, soprometheus.Gatherers.Gather()(which rejects duplicate
(name, label set)pairs across its childgatherers) does not see the same series from both the service and one
or more live handlers.
within a family. For counters and histograms this is correct. Gauges
are routed to
pendingMetricsinstead ofendedAccumulator(see"Accumulator partition"), so a new gauge does not need a per-handler
distinguishing label to stay safe across the start-end handover. On
the live side, two running handlers exposing the same gauge with the
same label set would no longer be silently summed —
mergeFamiliesreturns
ErrCannotMergeGauges(the error is logged bygatherHandlerMetrics, first-write-wins values are still returned).Any new gauge that represents a shared (rather than per-egress)
instantaneous level still needs its own distinguishing label or a
different aggregation rule.
mergeFamiliesdrops them since quantiles are not addable acrosshandlers. The egress pipeline does not currently expose summaries, so
this is a guard for future use.
endedAccumulatorcardinality is bounded. Combined with theaccumulator partition above, the only metrics that enter the
accumulator are non-gauge metrics with no
egress_idlabel, socardinality scales with the static label products (e.g.
len(type) × len(status)forpipeline_uploads) and not with handlerhistory. Total memory is negligible.
pendingMetricsis drained on every scrape. Memory there isbounded by the number of handler exits between two consecutive
scrapes. In normal operation that's a small number; the buffer is
nil'd under the mutex on each
gatherHandlerMetricscall.Per-egress gauges
segments_uploads_channel_sizeandplaylist_uploads_channel_sizekeepegress_idas a const label because they describe per-egressinstantaneous level (depth of an unbounded channel). Summing them across
egresses is meaningful (total pending across the node), but the
per-egress values are also useful for spotting one stuck egress, so we
keep both options open by leaving the cardinality on the handler side.
Visibility over a gauge's lifecycle:
egress_id, updated every scrape.
splitForAccumulatorroutes the finalsnapshot to
pendingMetrics. The next scrape after exit shows thatfinal value exactly once.
pendingMetricshas been drained; the gaugeseries for that egress is no longer reported. The accumulator never
carries it, so cardinality does not grow with handler history.
Test plan
go test ./pkg/service/(incl.-race) — coversmergeFamilies,accumulateInto,cloneMetric,splitForAccumulator,familyWithMetrics,hasLabel,labelSetKey, plus the ended-handlerpaths:
FinalizeMetrics(cache return + idempotent finalize),MergeInAccumulator(fold, idempotency, hard-exit-with-no-cache,sum across handlers),
StoreProcessEndedMetrics(immediate promote +pending routing), and a no-double-count-after-promote scrape.
/metricssucceeds with one or more live handlersafter the
NewProcessCollectorderegistration inpkg/handler/handler.go.while the handler is alive, appears once on the first scrape after
the handler exits, and disappears thereafter.
pipeline_uploads_totalcardinality — shouldbe one series per
(type, status), not peregress_id.pipeline_uploads_totaldoes not drop / produce a phantomrate()spike.
🤖 Generated with Claude Code