Skip to content

Drop egress_id label from handler counters/histograms; merge in service#1269

Merged
biglittlebigben merged 14 commits into
mainfrom
benjamin/handler_metrics
Jun 23, 2026
Merged

Drop egress_id label from handler counters/histograms; merge in service#1269
biglittlebigben merged 14 commits into
mainfrom
benjamin/handler_metrics

Conversation

@biglittlebigben

@biglittlebigben biglittlebigben commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Motivation

Egress runs many nodes and many concurrent egresses per node. Every
counter and histogram emitted by a handler subprocess carried an
egress_id const label, so the instantaneous count of distinct
(metric, egress_id) series across the fleet was
#metrics × #buckets × #concurrent egresses cluster-wide — even with
every series promptly cleaned up on handler exit, the live snapshot was
already too large for the Prometheus server to index comfortably, and
every node-level rate(...) aggregation had to fan out across all
currently-running egresses to produce one number. The label was also
useless to the consumer at that aggregation level.

This PR drops egress_id from handler-side counters and histograms so
each (type, status) is one series per node, regardless of how many
egresses are running on it. The two channel-size gauges keep egress_id
because their per-egress value is the whole point — they let you spot
one stuck egress.

Because two handler subprocesses now emit the same counter/histogram
series name with identical labels, the service can no longer rely on
prometheus.Gatherers.Gather() to combine them (it rejects duplicates)
— hence the explicit accumulator + one-shot pending buffer described
below. The pending buffer is not the cardinality fix; it's the
mechanism that preserves per-egress gauge visibility once across the
start/end handover. The cardinality fix is dropping the label.

Summary

  • Handler counters/histograms lose the egress_id const label;
    channel-size gauges keep it.
  • Service-side MetricsService now owns an explicit merge over three
    pools: live handlers, a persistent endedAccumulator of running
    totals, and a one-shot pendingMetrics buffer for the per-egress
    final snapshot.
  • A finished handler's counter/histogram tally is folded into
    endedAccumulator via MergeInAccumulator, which is idempotent
    (guarded by an atomic finalize flag) and runs from both the
    HandlerFinished path and process teardown. This means a handler that
    dies without sending HandlerFinished (SIGKILL/OOM/panic) still
    contributes its last-scraped tally, so the shared counter never drops
    and Prometheus doesn't see a phantom counter reset.
  • The whole scrape (gatherHandlerMetrics) runs under one mutex,
    including the live IPC gather, so a handler can't be moved from the
    live pool into the accumulator in between the two reads of a single
    scrape (the double-counting race).
  • mergeFamilies returns ErrCannotMergeGauges instead of silently
    summing colliding gauges; the error is logged on the gather path so a
    regression in future label conventions becomes loud rather than wrong.
  • Handler subprocess also deregisters NewProcessCollector (it was
    already deregistering NewGoCollector), so prometheus.Gatherers.Gather()
    stops rejecting the scrape with "collected before with the same name and
    label values" when one or more handlers are live.

Design

Components

┌───────────── handler subprocess (one per egress) ─────────────┐
│ HandlerMonitor                                                │
│   * pipeline_uploads{type,status}                  ← counter  │
│   * pipeline_upload_response_time_ms{type,status}  ← histogr. │
│   * backup_storage_writes{output_type}             ← counter  │
│   * segments_uploads_channel_size{egress_id,…}     ← gauge    │
│   * playlist_uploads_channel_size{egress_id,…}     ← gauge    │
└───────────────────────┬──────────────────────────────┬────────┘
            scrape via  │            ipc HandlerFinished
            ipc GetMetrics                             │ (final snapshot)
                        ▼                              ▼
┌───────────────────── service process ────────────────────────┐
│ MetricsService                                                │
│   endedAccumulator []*MetricFamily   (persistent totals)      │
│   pendingMetrics   []*MetricFamily   (one-shot, drained on    │
│                                       next scrape)            │
│                                                               │
│ ProcessManager                                                │
│   activeHandlers map[egressID]*Process                        │
│     Process.lastAccumulatableMetrics  (cache of last scrape)  │
│     Process.metricsFinalized          (atomic, one-way)       │
└───────────────────────────────────────────────────────────────┘

The handler-side HandlerMonitor only sets egress_id as a const label on
the two channel-size gauges. Counters and histograms have no egress_id,
so two handler subprocesses emit the same series name with identical
labels — they cannot be combined by prometheus.Gatherers.Gather() (it
rejects duplicates), which is why the service performs an explicit merge.

Service-side gather

MetricsService.gatherHandlerMetrics holds s.mu for the whole scrape and
collects three pools of metrics, feeding them into mergeFamilies:

  1. Live handlers — each Process.Gather() returns whatever its IPC peer
    currently reports, or an empty slice if the handler has been finalized
    (see below) or if its IPC call fails. The IPC call is bounded by
    metricsGatherTimeout (2s) so a wedged handler can't stall the scrape,
    which now holds s.mu for the duration. On each successful gather the
    accumulator-eligible portion of the result is split out and cached on
    the Process (lastAccumulatableMetrics); this is the value promoted
    if the handler later dies without reporting (see "Lifecycle"). Per-egress
    gauges (segments_uploads_channel_size, playlist_uploads_channel_size)
    appear here while the handler is alive.
  2. Persistent accumulator (endedAccumulator) — running totals of
    counter/histogram values from all handlers that have ended. Bounded in
    size by label cardinality of accumulator-eligible families (no
    egress_id), which is why we partition at the fold boundary — see
    "Accumulator partition" below.
  3. One-shot pending buffer (pendingMetrics) — the final snapshot of
    any metric that is not accumulator-eligible (gauges, anything with
    egress_id). Appended on StoreProcessEndedMetrics, drained and
    nil'd by gatherHandlerMetrics under the same mutex, so each entry is
    exposed on the first scrape after the handler exits and then gone.

mergeFamilies groups by family name, then within each family groups
metrics by their full label set, then aggregates:

Metric type Aggregation
Counter sum values
Gauge not addable — colliding label sets return ErrCannotMergeGauges (first-write-wins so the output is still usable). Live-pool gauges use distinct egress_ids so they pass through unmerged in practice.
Untyped sum values
Histogram sum sample_count, sample_sum, per-bucket cumulative_count matched by upper_bound
Summary sum sample_count and sample_sum; drop quantiles

mergeFamilies treats inputs as read-only and returns freshly-allocated
families/metrics. This is what lets us safely re-merge the persistent
accumulator on every gather without aliasing the accumulator's state.

Accumulator partition

Before folding a finished handler's parsed families,
StoreProcessEndedMetrics runs them through splitForAccumulator, which
routes each metric to one of two destinations:

  • Pending (one-shot) — exposed on the next scrape via pendingMetrics:
    • any family of type GAUGE (instantaneous level — has no meaning
      once its handler is gone, and mergeFamilies refuses to merge
      colliding gauges; routing them to pending sidesteps any future
      collision entirely);
    • any individual metric whose label set contains egress_id (unique
      per handler — would grow endedAccumulator linearly with handler
      history without ever merging).
  • Accumulator — everything else (non-gauge metrics with no egress_id
    label) is folded into endedAccumulator.

A family with a mix of egress_id and non-egress_id metrics is split:
the non-egress_id metrics go to the accumulator, the egress_id ones
go to pending. The two routing rules above are independent so the
partition catches both gauges and any future per-egress
counter/histogram.

The pending buffer keeps the visibility property of the previous design —
when a handler exits, its final per-egress values (e.g. depth of the
upload queue at shutdown) show up on the next /metrics scrape and then
disappear, matching the gauge semantics that no queue exists for a dead
egress.

The default Prometheus registry (the service's own go_*, process_*,
and promhttp_* metrics) is folded in via
prometheus.Gatherers{...}.Gather() as a separate gatherer; it does not
pass through the merge step. This is safe only because handler
subprocesses deregister NewGoCollector and NewProcessCollector in
NewHandler (see assumption 3 below).

Lifecycle of an ending handler

The accumulator fold is centralised in MergeInAccumulator, which is
idempotent: it calls pm.FinalizeMetrics(egressID), which atomically
test-and-sets the handler's metricsFinalized flag (Swap(true)) and
returns the handler's cached accumulatable tally. The first caller to
finalize a handler folds its tally into endedAccumulator; any later
caller gets alreadyFinalized == true and is a no-op. Once finalized,
Process.Gather() returns empty.

Graceful exit (HandlerFinished delivered):

1. handler subprocess sends HandlerFinished IPC (carrying its final metric
   text) to the service
   └─ service.StoreProcessEndedMetrics(egressID, text)   [under s.mu]
        ├─ parses the text
        ├─ splitForAccumulator: partition into accumulator-eligible vs pending
        ├─ pm.StoreAccumulatableMetrics → cache the final tally on the Process
        ├─ pendingMetrics = append(pendingMetrics, pending)
        └─ mergeInAccumulatorLocked(egressID)
             └─ FinalizeMetrics: flag flips false→true, returns cached tally
                endedAccumulator = merge(endedAccumulator, tally)

2. handler subprocess exits

3. service's cmd.Wait goroutine returns, calls processEnded
   ├─ s.MergeInAccumulator(egressID)   → FinalizeMetrics returns
   │                                      alreadyFinalized; no-op
   └─ pm.ProcessFinished(egressID)     → delete activeHandlers[egressID]

Hard exit (no HandlerFinished — SIGKILL/OOM/panic/failed IPC):

1. (no HandlerFinished arrives; flag stays false, no pending appended)
2. handler subprocess dies
3. service's cmd.Wait goroutine returns, calls processEnded
   ├─ s.MergeInAccumulator(egressID)   → FinalizeMetrics flips the flag and
   │                                      returns the tally cached by the last
   │                                      successful Process.Gather(); that
   │                                      tally is folded into endedAccumulator
   └─ pm.ProcessFinished(egressID)     → delete activeHandlers[egressID]

Why this avoids double-counting and counter resets:

  • No double count within a scrape. gatherHandlerMetrics holds s.mu
    across both the live gather and the accumulator read, and the
    finalize-and-fold in mergeInAccumulatorLocked is a single critical
    section under the same s.mu. So a scrape observes a handler either as
    live (not yet finalized → Gather() returns its values, accumulator
    doesn't yet include it) or as accumulated (finalized → Gather()
    returns empty, accumulator includes it) — never both, never neither.
    This closes the race where a scrape read a handler's live value an
    instant before it was promoted and then also read the freshly-folded
    accumulator.
  • No double fold across callers. Both StoreProcessEndedMetrics and
    the teardown MergeInAccumulator route through the same atomic
    finalize, so the tally is folded exactly once regardless of ordering.
  • No counter reset on hard exit. Because the teardown path always
    promotes the last-cached tally before the handler is removed from
    activeHandlers, the handler's contribution never silently drops out
    of the shared series when it dies without reporting.

The merged emitted value is therefore non-decreasing across consecutive
scrapes (sum of monotonic series), so rate() is meaningful.

Restart behaviour

endedAccumulator is in-memory. A service restart loses it, and live
handlers are children of the service so they restart from zero too.
Prometheus sees one counter-reset event per service restart, which is the
same behaviour as before this change and is handled correctly by
rate().

Concurrency

Three locks, always taken in the order s.mu → pm.mu → Process.metricsMu
(no path takes them in the opposite order, so there's no cycle):

  • MetricsService.mu (s.mu) — guards endedAccumulator / pendingMetrics
    and is held across the entire scrape and the entire finalize-and-fold.
  • processManager.mu (pm.mu) — guards the activeHandlers map only.
    StoreAccumulatableMetrics / FinalizeMetrics take it as a read lock
    because they only look the handler up; the per-handler field they touch
    has its own lock.
  • Process.metricsMu — guards lastAccumulatableMetrics, written by
    Process.Gather() (scrape) and read by FinalizeMetrics, so the cache
    is self-protecting regardless of caller.

Assumptions

  1. Identical metric registration across handler subprocesses. Every
    handler registers the same counter / histogram names with the same
    bucket boundaries. Histogram merge matches buckets by upper_bound; a
    bucket present in one handler but absent in another would be carried
    through unmerged and could produce skewed quantile estimates
    downstream.
  2. A handler that exits without HandlerFinished loses at most a tail
    of increments.
    The teardown path promotes the tally cached by the
    last successful Process.Gather(), so the handler's contribution up to
    its last scrape is preserved and the shared counter does not drop. Only
    increments produced between the last scrape and the hard exit are lost
    — acceptable, and far better than losing the whole tally (which would
    register as a counter reset on the shared series).
  3. Handler subprocesses deregister the Go and process collectors.
    client_golang auto-registers NewGoCollector and
    NewProcessCollector on the default registry; pkg/handler/handler.go
    explicitly Unregisters both during NewHandler. This keeps go_* /
    process_* flowing only from the service's own
    prometheus.DefaultRegisterer, so prometheus.Gatherers.Gather()
    (which rejects duplicate (name, label set) pairs across its child
    gatherers) does not see the same series from both the service and one
    or more live handlers.
  4. All handler-side counters are pure counters. The merge sums values
    within a family. For counters and histograms this is correct. Gauges
    are routed to pendingMetrics instead of endedAccumulator (see
    "Accumulator partition"), so a new gauge does not need a per-handler
    distinguishing label to stay safe across the start-end handover. On
    the live side, two running handlers exposing the same gauge with the
    same label set would no longer be silently summed — mergeFamilies
    returns ErrCannotMergeGauges (the error is logged by
    gatherHandlerMetrics, first-write-wins values are still returned).
    Any new gauge that represents a shared (rather than per-egress)
    instantaneous level still needs its own distinguishing label or a
    different aggregation rule.
  5. Quantiles on summaries are not consumed downstream.
    mergeFamilies drops them since quantiles are not addable across
    handlers. The egress pipeline does not currently expose summaries, so
    this is a guard for future use.
  6. endedAccumulator cardinality is bounded. Combined with the
    accumulator partition above, the only metrics that enter the
    accumulator are non-gauge metrics with no egress_id label, so
    cardinality scales with the static label products (e.g.
    len(type) × len(status) for pipeline_uploads) and not with handler
    history. Total memory is negligible.
  7. pendingMetrics is drained on every scrape. Memory there is
    bounded by the number of handler exits between two consecutive
    scrapes. In normal operation that's a small number; the buffer is
    nil'd under the mutex on each gatherHandlerMetrics call.

Per-egress gauges

segments_uploads_channel_size and playlist_uploads_channel_size keep
egress_id as a const label because they describe per-egress
instantaneous level (depth of an unbounded channel). Summing them across
egresses is meaningful (total pending across the node), but the
per-egress values are also useful for spotting one stuck egress, so we
keep both options open by leaving the cardinality on the handler side.

Visibility over a gauge's lifecycle:

  • Handler alive — values flow from the live pool, one series per
    egress_id, updated every scrape.
  • Handler just exitedsplitForAccumulator routes the final
    snapshot to pendingMetrics. The next scrape after exit shows that
    final value exactly once.
  • Subsequent scrapespendingMetrics has been drained; the gauge
    series for that egress is no longer reported. The accumulator never
    carries it, so cardinality does not grow with handler history.

Test plan

  • go test ./pkg/service/ (incl. -race) — covers mergeFamilies,
    accumulateInto, cloneMetric, splitForAccumulator,
    familyWithMetrics, hasLabel, labelSetKey, plus the ended-handler
    paths: FinalizeMetrics (cache return + idempotent finalize),
    MergeInAccumulator (fold, idempotency, hard-exit-with-no-cache,
    sum across handlers), StoreProcessEndedMetrics (immediate promote +
    pending routing), and a no-double-count-after-promote scrape.
  • Manual: confirm /metrics succeeds with one or more live handlers
    after the NewProcessCollector deregistration in
    pkg/handler/handler.go.
  • Manual: verify that a per-egress channel-size gauge is visible
    while the handler is alive, appears once on the first scrape after
    the handler exits, and disappears thereafter.
  • Manual: spot-check pipeline_uploads_total cardinality — should
    be one series per (type, status), not per egress_id.
  • Manual: kill a handler hard (SIGKILL) and confirm
    pipeline_uploads_total does not drop / produce a phantom rate()
    spike.

🤖 Generated with Claude Code

biglittlebigben and others added 8 commits June 17, 2026 15:19
The HandlerMonitor counters and histograms no longer carry egress_id, so
each (type, status) combination is one time series across the whole node
instead of one per egress.

The service-side gatherer aggregates live handler metrics together with a
persistent accumulator of finished-handler totals and any per-egress staged
metrics. Counter monotonicity across the start-end handover is preserved
by folding staged values into the accumulator atomically inside
ProcessFinished (via a hook held under pm.mu), and by a metricsFinalized
flag on Process that suppresses live IPC values once the handler has
reported its final tally.

Channel-size gauges keep egress_id since they represent per-egress
instantaneous level.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drops the endedPending staging map and the SetOnProcessFinished hook.
StoreProcessEndedMetrics now merges parsed families straight into
endedAccumulator after MarkMetricsFinalized, which is sufficient to
keep live and finished contributions from overlapping. HandlerFinished
is invoked exactly once per handler, so the staging layer wasn't
buying idempotency in practice.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Design notes moved into the PR description.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread pkg/service/metrics.go Outdated
egressLabelPair := &dto.LabelPair{
Name: &egressIDLabel,
Value: &egressID,
// ErrCannotMergeGauges is returned when mergeFamilies encounters two gauge

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of these comments are pretty excessive, I've been asking it to only keep comments on exported functions, large helpers, or when something needs to be explained or has side effects

@biglittlebigben

Copy link
Copy Markdown
Contributor Author

This is a potential approach to reduce the metrics cardinality. It has pros and cons, the main con being the error prone manual merging or metrics in the service. This is currently extensively unit tested but is making assumptions about metrics merging semantics. A possible alternative suggested previously was to remove some of the prometheus metrics from the handler and send corresponding events/gauges using a separate RPC in a different format, with the service creating a prometheus metrics from these events.

@github-actions

github-actions Bot commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

AV-sync stats summary: view in run #28001266831

@milos-lk

Copy link
Copy Markdown
Contributor

What happens when a handler exits without sending HandlerFinished — SIGKILL/OOM/panic, or a failed HandlerFinished IPC? As I read it, ProcessFinished still deletes it from activeHandlers, so its live contribution drops out of the sum, but its tally was never promoted into the accumulator. So the shared _total decreases on the next scrape.

Since these are counters, wouldn't Prometheus treat that drop as a counter reset and produce a phantom rate()/increase() spike — and because everything is now one shared series, wouldn't that pollute the node-level rate for every egress on the box, not just the one that died?

If that's right, is the intent to cache last-gathered values and promote them in ProcessFinished regardless of HandlerFinished, or is there a path I'm missing that covers the hard-exit case?

Comment thread pkg/service/metrics.go

collected := make([]*dto.MetricFamily, 0)
for _, g := range live {
f, err := g.Gather()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this somehow turns on my race condition radar wrt concurrency with finalize goroutine - there might be cases where we double count or undercount - let me try to pin that down...

@milos-lk milos-lk Jun 22, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


  STATE: X.finalized=false, accumulator=0, collected=[]

  S1  scrape: live = [X]
  S2  scrape: X.Gather() → G1 flag is false → G2 IPC → returns 75
                                            collected = [75]          ← scrape has now "locked in" X's live value
                                            (X.finalized still false)

      ── handler X finishes now; finalize starts ──
  F3  finalize: MarkMetricsFinalized(X)     X.finalized = true        ← too late: scrape already read 75 at S2
  F4  finalize: lock s.mu
  F6  finalize: merge(0, 75)
  F7  finalize: accumulator = 75            accumulator = 75
  F8  finalize: unlock
      ── finalize done ──
  
  S3  scrape: lock s.mu
  S4  scrape: append accumulator (75)       collected = [75, 75]      ← live 75 AND accumulator 75
  S6  scrape: unlock
  S7  scrape: merge → both are pipeline_uploads with identical labels (egress_id is gone!) → 75 + 75
  S8  scrape: returns 150 to Prometheus     EXPOSED = 150  ✗

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be addressed, at the cost of holding a lock during the metric gathering IPC. A timeout has been added to avoid the worse effects of a deadlock.

Comment thread pkg/service/metrics_test.go Outdated
// dst has bucket 50 that src doesn't have; src has bucket 100 that dst
// doesn't have. Only the matched buckets are merged; the dst-only bucket
// keeps its original cumulative count, and the src-only bucket is dropped
// (consistent with assumption #1 in METRICS.md that handlers register

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is probably the pre-impl plan you made - I don't see the md in the diff - did you want to add it? if not we can get rid of the reference.

@milos-lk

Copy link
Copy Markdown
Contributor

A possible alternative suggested previously was to remove some of the prometheus metrics from the handler and send corresponding events/gauges using a separate RPC in a different format, with the service creating a prometheus metrics from these events.

That one would give correctness by construction - there could be no races, aggregation is just .Add() on a single authoritative counter in the main process. There's no cross-process merge, no live+accumulator reconciliation, no finalize/suppress dance. Also doesn't require understanding dto.MetricFamily proto internals, counter-reset semantics, histogram bucket structure, and the input-immutability rules which is less of maintenance burden IMO.
It is indeed more verbose wrt proto defintion but that ensures no issues on updates.

Nice thing about this PR is that it doesn't change the existing mechanism and is easy to keep adding new metrics in the same way as we did it for now. So functionally the change set (with feedbacks addressed) puts us in the better place than what we have now. 👍

@milos-lk milos-lk self-assigned this Jun 22, 2026
@biglittlebigben

Copy link
Copy Markdown
Contributor Author

What happens when a handler exits without sending HandlerFinished — SIGKILL/OOM/panic, or a failed HandlerFinished IPC? As I read it, ProcessFinished still deletes it from activeHandlers, so its live contribution drops out of the sum, but its tally was never promoted into the accumulator. So the shared _total decreases on the next scrape.

Since these are counters, wouldn't Prometheus treat that drop as a counter reset and produce a phantom rate()/increase() spike — and because everything is now one shared series, wouldn't that pollute the node-level rate for every egress on the box, not just the one that died?

If that's right, is the intent to cache last-gathered values and promote them in ProcessFinished regardless of HandlerFinished, or is there a path I'm missing that covers the hard-exit case?

This should be addressed

@biglittlebigben

Copy link
Copy Markdown
Contributor Author

A possible alternative suggested previously was to remove some of the prometheus metrics from the handler and send corresponding events/gauges using a separate RPC in a different format, with the service creating a prometheus metrics from these events.

That one would give correctness by construction - there could be no races, aggregation is just .Add() on a single authoritative counter in the main process. There's no cross-process merge, no live+accumulator reconciliation, no finalize/suppress dance. Also doesn't require understanding dto.MetricFamily proto internals, counter-reset semantics, histogram bucket structure, and the input-immutability rules which is less of maintenance burden IMO. It is indeed more verbose wrt proto defintion but that ensures no issues on updates.

Nice thing about this PR is that it doesn't change the existing mechanism and is easy to keep adding new metrics in the same way as we did it for now. So functionally the change set (with feedbacks addressed) puts us in the better place than what we have now. 👍

Just trying to unlock some other work and avoid making worse the existing performance issues. We can revisit this later. The other proposed solution is a deeper refactoring IMHO. It also requires splitting the way we send metrics (summarizes counters vs per egress gauges) and quite a bit of plumbing needs to be added.

@biglittlebigben biglittlebigben merged commit 5619fff into main Jun 23, 2026
18 checks passed
@biglittlebigben biglittlebigben deleted the benjamin/handler_metrics branch June 23, 2026 16:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants