Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9dcc1e3
add label truncation for held metrics
timggggggg Oct 7, 2025
696dcae
add label truncation for other metrics
timggggggg Oct 7, 2025
08d9b16
fix lint
timggggggg Oct 8, 2025
84aa460
fix tests
timggggggg Oct 8, 2025
87863a5
refactor + add metrics section in pipeline settings
timggggggg Oct 10, 2025
4bfcb29
fix tests
timggggggg Oct 10, 2025
3c0ba16
update doc
timggggggg Oct 10, 2025
4f9e5e4
update doc
timggggggg Oct 10, 2025
c9d2090
Merge branch 'master' of https://github.com/ozontech/file.d into trim…
timggggggg Oct 17, 2025
a02a95d
fix settings
timggggggg Oct 17, 2025
3ef8fa1
default metrics max label value length = 0
timggggggg Oct 22, 2025
38eed27
fix doc
timggggggg Oct 22, 2025
e0f8cb4
fix
timggggggg Oct 23, 2025
824c17b
mask plugin readme header fix
timggggggg Oct 23, 2025
0141574
remove metric_ prefix for new metric section in settings
timggggggg Oct 24, 2025
cabcf24
wrap CounterVec that uses WithLabelValues
timggggggg Oct 27, 2025
d874495
fix lint
timggggggg Oct 27, 2025
330c553
fix tests
timggggggg Oct 27, 2025
10874af
fix: don't apply SetDefaultValues on unexported fields
timggggggg Oct 27, 2025
5acfb52
Merge branch 'master' of https://github.com/ozontech/file.d into trim…
timggggggg Feb 3, 2026
bb6f01d
refactor after merging metric ctl with holder
timggggggg Feb 3, 2026
dfce128
truncate labels inside held metrics store
timggggggg Feb 3, 2026
6969cf4
fix
timggggggg Feb 3, 2026
4e0c4ac
refactor
timggggggg Feb 5, 2026
64639fd
fix
timggggggg Feb 5, 2026
4df95ab
remove p.params from s3 test
timggggggg Feb 5, 2026
f4cb010
add test
timggggggg Feb 5, 2026
490cd0d
Merge branch 'master' of https://github.com/ozontech/file.d into trim…
timggggggg Feb 9, 2026
f90373e
remove config.go changes
timggggggg Feb 9, 2026
631d2b2
handle metricMaxLabelValueLength negative values
timggggggg Feb 9, 2026
b19bb1a
fix lint
timggggggg Feb 9, 2026
652c6c3
Merge branch 'master' of https://github.com/ozontech/file.d into trim…
timggggggg Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (f *FileD) Start() {
}

func (f *FileD) initMetrics() {
f.metricCtl = metric.NewCtl("file_d", f.registry, 0)
f.metricCtl = metric.NewCtl("file_d", f.registry, 0, 0)
f.versionMetric = f.metricCtl.RegisterGaugeVec("version", "", "version")
f.versionMetric.WithLabelValues(buildinfo.Version).Inc()
}
Expand Down
25 changes: 20 additions & 5 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
decoderParams := make(map[string]any)
isStrict := pipeline.DefaultIsStrict
eventTimeout := pipeline.DefaultEventTimeout
metricHoldDuration := pipeline.DefaultMetricHoldDuration
metaCacheSize := pipeline.DefaultMetaCacheSize
pool := ""

metricHoldDuration := pipeline.DefaultMetricHoldDuration
metricMaxLabelValueLength := pipeline.DefaultMetricMaxLabelValueLength

if settings != nil {
val := settings.Get("capacity").MustInt()
if val != 0 {
Expand Down Expand Up @@ -104,7 +106,15 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
sourceNameMetaField = settings.Get("source_name_meta_field").MustString()
isStrict = settings.Get("is_strict").MustBool()

str = settings.Get("metric_hold_duration").MustString()
if str := settings.Get("pool").MustString(); str != "" {
pool = str
}

metrics := settings.Get("metrics")
str = metrics.Get("hold_duration").MustString()
if str == "" {
str = settings.Get("metric_hold_duration").MustString()
}
if str != "" {
i, err := time.ParseDuration(str)
if err != nil {
Expand All @@ -113,8 +123,10 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
metricHoldDuration = i
}

if str := settings.Get("pool").MustString(); str != "" {
pool = str
metricMaxLabelValueLength = metrics.Get("max_label_value_length").MustInt()
if metricMaxLabelValueLength < 0 {
logger.Warn("negative max_label_value_length value, metric label truncation is disabled")
metricMaxLabelValueLength = 0
}
}

Expand All @@ -134,8 +146,11 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
EventTimeout: eventTimeout,
StreamField: streamField,
IsStrict: isStrict,
MetricHoldDuration: metricHoldDuration,
Pool: pipeline.PoolType(pool),
Metric: &pipeline.MetricSettings{
HoldDuration: metricHoldDuration,
MaxLabelValueLength: metricMaxLabelValueLength,
},
}
}

Expand Down
22 changes: 12 additions & 10 deletions metric/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ type Ctl struct {
subsystem string
register *prometheus.Registry

holder *Holder
metrics map[string]prometheus.Collector
mu sync.RWMutex
holder *Holder
metrics map[string]prometheus.Collector
metricMaxLabelValueLength int
mu sync.RWMutex
}

func NewCtl(subsystem string, registry *prometheus.Registry, metricHoldDuration time.Duration) *Ctl {
func NewCtl(subsystem string, registry *prometheus.Registry, metricHoldDuration time.Duration, metricMaxLabelValueLength int) *Ctl {
ctl := &Ctl{
subsystem: subsystem,
register: registry,
metrics: make(map[string]prometheus.Collector),
subsystem: subsystem,
register: registry,
metrics: make(map[string]prometheus.Collector),
metricMaxLabelValueLength: metricMaxLabelValueLength,
}

if metricHoldDuration != 0 {
Expand Down Expand Up @@ -75,7 +77,7 @@ func (mc *Ctl) RegisterCounterVec(name, help string, labels ...string) *CounterV
Help: help,
}, labels)

return newCounterVec(mc.registerMetric(name, counterVec).(*prometheus.CounterVec))
return newCounterVec(mc.registerMetric(name, counterVec).(*prometheus.CounterVec), mc.metricMaxLabelValueLength)
}

func (mc *Ctl) RegisterGauge(name, help string) *Gauge {
Expand All @@ -97,7 +99,7 @@ func (mc *Ctl) RegisterGaugeVec(name, help string, labels ...string) *GaugeVec {
Help: help,
}, labels)

return newGaugeVec(mc.registerMetric(name, gaugeVec).(*prometheus.GaugeVec))
return newGaugeVec(mc.registerMetric(name, gaugeVec).(*prometheus.GaugeVec), mc.metricMaxLabelValueLength)
}

func (mc *Ctl) RegisterHistogram(name, help string, buckets []float64) *Histogram {
Expand All @@ -121,7 +123,7 @@ func (mc *Ctl) RegisterHistogramVec(name, help string, buckets []float64, labels
Buckets: buckets,
}, labels)

return newHistogramVec(mc.registerMetric(name, histogramVec).(*prometheus.HistogramVec))
return newHistogramVec(mc.registerMetric(name, histogramVec).(*prometheus.HistogramVec), mc.metricMaxLabelValueLength)
}

func (mc *Ctl) registerMetric(name string, newMetric prometheus.Collector) prometheus.Collector {
Expand Down
4 changes: 2 additions & 2 deletions metric/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ type CounterVec struct {
vec *prometheus.CounterVec
}

func newCounterVec(cv *prometheus.CounterVec) *CounterVec {
func newCounterVec(cv *prometheus.CounterVec, maxLabelValueLength int) *CounterVec {
return &CounterVec{
vec: cv,
store: newHeldMetricsStore[prometheus.Counter](),
store: newHeldMetricsStore[prometheus.Counter](maxLabelValueLength),
}
}

Expand Down
4 changes: 2 additions & 2 deletions metric/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ type GaugeVec struct {
vec *prometheus.GaugeVec
}

func newGaugeVec(gv *prometheus.GaugeVec) *GaugeVec {
func newGaugeVec(gv *prometheus.GaugeVec, maxLabelValueLength int) *GaugeVec {
return &GaugeVec{
vec: gv,
store: newHeldMetricsStore[prometheus.Gauge](),
store: newHeldMetricsStore[prometheus.Gauge](maxLabelValueLength),
}
}

Expand Down
4 changes: 2 additions & 2 deletions metric/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ type HistogramVec struct {
vec *prometheus.HistogramVec
}

func newHistogramVec(hv *prometheus.HistogramVec) *HistogramVec {
func newHistogramVec(hv *prometheus.HistogramVec, maxLabelValueLength int) *HistogramVec {
return &HistogramVec{
vec: hv,
store: newHeldMetricsStore[prometheus.Histogram](),
store: newHeldMetricsStore[prometheus.Histogram](maxLabelValueLength),
}
}

Expand Down
26 changes: 21 additions & 5 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,21 @@ func (h *heldMetric[T]) updateUsage() {
}

type heldMetricsStore[T prometheus.Metric] struct {
mu sync.RWMutex
metricsByHash map[uint64][]*heldMetric[T]
mu sync.RWMutex
metricsByHash map[uint64][]*heldMetric[T]
metricMaxLabelValueLength int
}

func newHeldMetricsStore[T prometheus.Metric]() *heldMetricsStore[T] {
func newHeldMetricsStore[T prometheus.Metric](metricMaxLabelValueLength int) *heldMetricsStore[T] {
return &heldMetricsStore[T]{
mu: sync.RWMutex{},
metricsByHash: make(map[uint64][]*heldMetric[T]),
mu: sync.RWMutex{},
metricsByHash: make(map[uint64][]*heldMetric[T]),
metricMaxLabelValueLength: metricMaxLabelValueLength,
}
}

func (h *heldMetricsStore[T]) GetOrCreate(labels []string, newPromMetric func(...string) T) *heldMetric[T] {
h.truncateLabels(labels)
hash := computeStringsHash(labels)
// fast path - metric exists
h.mu.RLock()
Expand All @@ -66,6 +69,7 @@ func (h *heldMetricsStore[T]) GetOrCreate(labels []string, newPromMetric func(..
}

func (h *heldMetricsStore[T]) Delete(labels []string, deleter metricDeleter) bool {
h.truncateLabels(labels)
hash := computeStringsHash(labels)

h.mu.Lock()
Expand Down Expand Up @@ -159,6 +163,18 @@ func (h *heldMetricsStore[T]) DeleteOldMetrics(holdDuration time.Duration, delet
}
}

func (h *heldMetricsStore[T]) truncateLabels(lvs []string) {
if h.metricMaxLabelValueLength == 0 {
return
}

for i, label := range lvs {
if len(label) > h.metricMaxLabelValueLength {
lvs[i] = label[:h.metricMaxLabelValueLength]
}
}
}

func findHeldMetricIndex[T prometheus.Metric](hMetrics []*heldMetric[T], labels []string) int {
idx := -1
for i := range hMetrics {
Expand Down
23 changes: 19 additions & 4 deletions metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestLabelExpiration(t *testing.T) {
r := require.New(t)

ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute)
ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, 0)
c := ctl.RegisterCounterVec("errors", "", "level")

now := time.Now().UnixNano()
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestUnsafeStringInMetric(t *testing.T) {
bytes := []byte("hello world")
unsafeString := unsafe.String(unsafe.SliceData(bytes), len(bytes))

store := newHeldMetricsStore[prometheus.Counter]()
store := newHeldMetricsStore[prometheus.Counter](0)

labels := []string{unsafeString}
m := store.GetOrCreate([]string{unsafeString}, func(s ...string) prometheus.Counter {
Expand All @@ -66,6 +66,21 @@ func TestUnsafeStringInMetric(t *testing.T) {
r.Equal([]string{"hello world"}, m.labels)
}

func TestLabelTruncation(t *testing.T) {
r := require.New(t)

maxLabelValueLength := 10
label := "some_long_label_value"
ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, maxLabelValueLength)
c := ctl.RegisterCounterVec("errors", "", "level")

c.WithLabelValues(label).Inc()
r.Equal(float64(1), c.WithLabelValues("some_long_").ToFloat64())

c.DeleteLabelValues(label)
r.Equal(float64(0), c.WithLabelValues("some_long_").ToFloat64())
}

var holderBenchCases = []struct {
Labels []string
LabelValues [][]string
Expand Down Expand Up @@ -98,7 +113,7 @@ var holderBenchCases = []struct {

func BenchmarkMetricHolder(b *testing.B) {
for _, benchCase := range holderBenchCases {
ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute)
ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, 0)

counter := ctl.RegisterCounterVec("test_name", "", benchCase.Labels...)
ctl.AddToHolder(counter)
Expand All @@ -117,7 +132,7 @@ func BenchmarkMetricHolder(b *testing.B) {

func BenchmarkPromVec(b *testing.B) {
for _, benchCase := range holderBenchCases {
ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute)
ctl := NewCtl("test", prometheus.NewRegistry(), time.Minute, 0)
counter := ctl.RegisterCounterVec("test_name", "", benchCase.Labels...)
name := strings.Join(benchCase.Labels, "_")

Expand Down
27 changes: 27 additions & 0 deletions pipeline/README.idoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ Whether to fatal on decoding error.

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

> ⚠ DEPRECATED. Use `hold_duration` in `metrics` section instead.

<br>

**`pool`** *`string`* *`options=std|low_memory`*
Expand All @@ -131,6 +133,31 @@ Default pool is `low_memory`.

<br>

## Metrics

Section for metrics in settings. Example:

```yaml
pipelines:
test:
settings:
metrics:
hold_duration: 1h
max_label_value_length: 100
```

**`hold_duration`** *`string`* *`default=30m`*

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

<br>

**`max_label_value_length`** *`int`* *`default=0`*

Maximum length of custom metric labels in action plugins. If zero, no limit is set.

<br>

## Datetime parse formats

Most of the plugins which work with parsing datetime call `pipeline.ParseTime` function. It accepts datetime layouts the same way as Go [time.Parse](https://pkg.go.dev/time#Parse) (in format of datetime like `2006-01-02T15:04:05.999999999Z07:00`) except unix timestamp formats, they can only be specified via aliases.
Expand Down
27 changes: 27 additions & 0 deletions pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ Whether to fatal on decoding error.

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

> ⚠ DEPRECATED. Use `hold_duration` in `metrics` section instead.

<br>

**`pool`** *`string`* *`options=std|low_memory`*
Expand All @@ -131,6 +133,31 @@ Default pool is `low_memory`.

<br>

## Metrics

Section for metrics in settings. Example:

```yaml
pipelines:
test:
settings:
metrics:
hold_duration: 1h
max_label_value_length: 100
```

**`hold_duration`** *`string`* *`default=30m`*

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

<br>

**`max_label_value_length`** *`int`* *`default=0`*

Maximum length of custom metric labels in action plugins. If zero, no limit is set.

<br>

## Datetime parse formats

Most of the plugins which work with parsing datetime call `pipeline.ParseTime` function. It accepts datetime layouts the same way as Go [time.Parse](https://pkg.go.dev/time#Parse) (in format of datetime like `2006-01-02T15:04:05.999999999Z07:00`) except unix timestamp formats, they can only be specified via aliases.
Expand Down
2 changes: 1 addition & 1 deletion pipeline/antispam/antispammer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func newAntispammer(threshold, unbanIterations int, maintenanceInterval time.Dur
Threshold: threshold,
UnbanIterations: unbanIterations,
Logger: logger.Instance.Named("antispam").Desugar(),
MetricsController: metric.NewCtl("test", prometheus.NewRegistry(), time.Minute),
MetricsController: metric.NewCtl("test", prometheus.NewRegistry(), time.Minute, 0),
})
}

Expand Down
6 changes: 3 additions & 3 deletions pipeline/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestBackoff(t *testing.T) {

batcherBackoff := NewRetriableBatcher(
&BatcherOptions{
MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute),
MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0),
},
func(workerData *WorkerData, batch *Batch) error {
eventCount.Inc()
Expand Down Expand Up @@ -54,7 +54,7 @@ func TestBackoffWithError(t *testing.T) {

batcherBackoff := NewRetriableBatcher(
&BatcherOptions{
MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute),
MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0),
},
func(workerData *WorkerData, batch *Batch) error {
return errors.New("some error")
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestBackoffWithErrorWithDeadQueue(t *testing.T) {

batcherBackoff := NewRetriableBatcher(
&BatcherOptions{
MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute),
MetricCtl: metric.NewCtl("", prometheus.NewRegistry(), time.Minute, 0),
},
func(workerData *WorkerData, batch *Batch) error {
return errors.New("some error")
Expand Down
Loading