From ae0eed1f6516fa8bb82ee19820a7667af395e2be Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 17 Dec 2025 14:23:26 +0100 Subject: [PATCH 1/8] Introduced common base class to group all kind of data that can be used to compute a flow metric. --- .../instrument/metrics/DatapointCapture.java | 41 +++++++++++++++++++ .../metrics/ExtendedFlowMetric.java | 28 ++++++------- .../instrument/metrics/FlowCapture.java | 16 ++------ 3 files changed, 58 insertions(+), 27 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java new file mode 100644 index 00000000000..db45ad49e04 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.instrument.metrics; + +/** + * A {@link DatapointCapture} provides access to timing information + * captured at a specific point in time. + */ +abstract class DatapointCapture { + + private final long nanoTime; + + DatapointCapture(final long nanoTime) { + this.nanoTime = nanoTime; + } + + /** + * @return the nanoTime of this capture, as provided at time + * of capture by the {@link FlowMetric}. + */ + public long nanoTime() { + return nanoTime; + } +} diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java index e93655f3d1f..80fc58ce9f0 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java @@ -100,7 +100,7 @@ public Map getValue() { this.retentionWindows.get() .forEach(window -> window.baseline(currentCapture.nanoTime()) .or(() -> windowDefaultBaseline(window)) - .map((baseline) -> calculateRate(currentCapture, baseline)) + .map((baseline) -> calculateRate(currentCapture, (FlowCapture) baseline)) .orElseGet(OptionalDouble::empty) .ifPresent((rate) -> rates.put(window.policy.policyName(), rate))); @@ -135,7 +135,7 @@ private static void injectIntoRetentionWindows(final List reten /** * Internal tooling to select the younger of two captures */ - private static FlowCapture selectNewerCapture(final FlowCapture existing, final FlowCapture proposed) { + private static DatapointCapture selectNewerCapture(final DatapointCapture existing, final DatapointCapture proposed) { if (existing == null) { return proposed; } if (proposed == null) { return existing; } @@ -182,7 +182,7 @@ Duration estimateExcessRetained(final ToLongFunction } /** - * A {@link RetentionWindow} efficiently holds sufficient {@link FlowCapture}s to + * A {@link RetentionWindow} efficiently holds sufficient {@link DatapointCapture}s to * meet its {@link FlowMetricRetentionPolicy}, providing access to the youngest capture * that is older than the policy's allowed retention (if any). * The implementation is similar to a singly-linked list whose youngest captures are at @@ -191,12 +191,12 @@ Duration estimateExcessRetained(final ToLongFunction * Both reads and writes are non-blocking and concurrency-safe. */ private static class RetentionWindow { - private final AtomicReference stagedCapture = new AtomicReference<>(); + private final AtomicReference stagedCapture = new AtomicReference<>(); private final AtomicReference tail; private final AtomicReference head; private final FlowMetricRetentionPolicy policy; - RetentionWindow(final FlowMetricRetentionPolicy policy, final FlowCapture zeroCapture) { + RetentionWindow(final FlowMetricRetentionPolicy policy, final DatapointCapture zeroCapture) { this.policy = policy; final Node zeroNode = new Node(zeroCapture); this.head = new AtomicReference<>(zeroNode); @@ -204,20 +204,20 @@ private static class RetentionWindow { } /** - * Append the newest {@link FlowCapture} into this {@link RetentionWindow}, + * Append the newest {@link DatapointCapture} into this {@link RetentionWindow}, * while respecting our {@link FlowMetricRetentionPolicy}. - * We tolerate minor jitter in the provided {@link FlowCapture#nanoTime()}, but + * We tolerate minor jitter in the provided {@link DatapointCapture#nanoTime()}, but * expect callers of this method to minimize lag between instantiating the capture * and appending it. * * @param newestCapture the newest capture to stage */ - private void append(final FlowCapture newestCapture) { + private void append(final DatapointCapture newestCapture) { final Node casTail = this.tail.getAcquire(); // for CAS final long newestCaptureNanoTime = newestCapture.nanoTime(); // stage our newest capture unless it is older than the currently-staged capture - final FlowCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, ExtendedFlowMetric::selectNewerCapture); + final DatapointCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, ExtendedFlowMetric::selectNewerCapture); // promote our previously-staged capture IFF our newest capture is too far // ahead of the current tail to support policy's resolution. @@ -256,11 +256,11 @@ public String toString() { /** * @param nanoTime the nanoTime of the capture for which we are retrieving a baseline. - * @return an {@link Optional} that contains the youngest {@link FlowCapture} that is older + * @return an {@link Optional} that contains the youngest {@link DatapointCapture} that is older * than this window's {@link FlowMetricRetentionPolicy} allowed retention if one * exists, and is otherwise empty. */ - public Optional baseline(final long nanoTime) { + public Optional baseline(final long nanoTime) { final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos()); final Node head = compactHead(barrier); if (head.captureNanoTime() <= barrier) { @@ -309,7 +309,7 @@ private long excessRetained(final long currentNanoTime, final ToLongFunction policy.forceCompactionNanos()) { - final Node compactHead = compactHead(Math.subtractExact(newestCaptureNanoTime, policy.retentionNanos())); - if (LOGGER.isDebugEnabled()) { - final long compactHeadAgeNanos = Math.subtractExact(newestCaptureNanoTime, compactHead.captureNanoTime()); - LOGGER.debug("{} forced-compaction result (captures: `{}` span: `{}`)", this, estimateSize(compactHead), Duration.ofNanos(compactHeadAgeNanos)); - } - } - } - } - } - - @Override - public String toString() { - return "RetentionWindow{" + - "policy=" + policy.policyName() + - " id=" + System.identityHashCode(this) + - '}'; - } - - /** - * @param nanoTime the nanoTime of the capture for which we are retrieving a baseline. - * @return an {@link Optional} that contains the youngest {@link DatapointCapture} that is older - * than this window's {@link FlowMetricRetentionPolicy} allowed retention if one - * exists, and is otherwise empty. - */ - public Optional baseline(final long nanoTime) { - final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos()); - final Node head = compactHead(barrier); - if (head.captureNanoTime() <= barrier) { - return Optional.of(head.capture); - } else { - return Optional.empty(); - } - } - - /** - * @return a computationally-expensive estimate of the number of captures in this window, - * using plain memory access. This should NOT be run in unguarded production code. - */ - private static int estimateSize(final Node headNode) { - int i = 1; // assume we have one additional staged - // NOTE: we chase the provided headNode's tail with plain-gets, - // which tolerates missed appends from other threads. - for (Node current = headNode; current != null; current = current.getNextPlain()) { i++; } - return i; - } - - /** - * @see RetentionWindow#estimateSize(Node) - */ - private int estimateSize() { - return estimateSize(this.head.getPlain()); - } - - /** - * @param barrier a nanoTime that will NOT be crossed during compaction - * @return the head node after compaction up to the provided barrier. - */ - private Node compactHead(final long barrier) { - return this.head.updateAndGet((existingHead) -> { - final Node proposedHead = existingHead.seekWithoutCrossing(barrier); - return Objects.requireNonNullElse(proposedHead, existingHead); - }); - } - - /** - * Internal testing support - */ - private long excessRetained(final long currentNanoTime, final ToLongFunction retentionWindowFunction) { - final long barrier = Math.subtractExact(currentNanoTime, retentionWindowFunction.applyAsLong(this.policy)); - return Math.max(0L, Math.subtractExact(barrier, this.head.getPlain().captureNanoTime())); - } - - /** - * A {@link Node} holds a single {@link DatapointCapture} and - * may link ahead to the next {@link Node}. - * It is an implementation detail of {@link RetentionWindow}. - */ - private static class Node { - private static final VarHandle NEXT; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - NEXT = l.findVarHandle(Node.class, "next", Node.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - } - - private final DatapointCapture capture; - private volatile Node next; - - Node(final DatapointCapture capture) { - this.capture = capture; - } - - Node seekWithoutCrossing(final long barrier) { - Node newestOlderThanThreshold = null; - Node candidate = this; - - while(candidate != null && candidate.captureNanoTime() < barrier) { - newestOlderThanThreshold = candidate; - candidate = candidate.getNext(); - } - return newestOlderThanThreshold; - } - - long captureNanoTime() { - return this.capture.nanoTime(); - } - - void setNext(final Node nextNode) { - next = nextNode; - } - - Node getNext() { - return next; - } - - Node getNextPlain() { - return (Node)NEXT.get(this); - } - } - } } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java new file mode 100644 index 00000000000..53aae96953f --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java @@ -0,0 +1,191 @@ +package org.logstash.instrument.metrics; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.ToLongFunction; + +/** + * A {@link RetentionWindow} efficiently holds sufficient {@link DatapointCapture}s to + * meet its {@link FlowMetricRetentionPolicy}, providing access to the youngest capture + * that is older than the policy's allowed retention (if any). + * The implementation is similar to a singly-linked list whose youngest captures are at + * the tail and oldest captures are at the head, with an additional pre-tail stage. + * Compaction is always done at read-time and occasionally at write-time. + * Both reads and writes are non-blocking and concurrency-safe. + */ +class RetentionWindow { + private final AtomicReference stagedCapture = new AtomicReference<>(); + private final AtomicReference tail; + private final AtomicReference head; + final FlowMetricRetentionPolicy policy; + + RetentionWindow(final FlowMetricRetentionPolicy policy, final DatapointCapture zeroCapture) { + this.policy = policy; + final Node zeroNode = new Node(zeroCapture); + this.head = new AtomicReference<>(zeroNode); + this.tail = new AtomicReference<>(zeroNode); + } + + /** + * Append the newest {@link DatapointCapture} into this {@link RetentionWindow}, + * while respecting our {@link FlowMetricRetentionPolicy}. + * We tolerate minor jitter in the provided {@link DatapointCapture#nanoTime()}, but + * expect callers of this method to minimize lag between instantiating the capture + * and appending it. + * + * @param newestCapture the newest capture to stage + */ + void append(final DatapointCapture newestCapture) { + final Node casTail = this.tail.getAcquire(); // for CAS + final long newestCaptureNanoTime = newestCapture.nanoTime(); + + // stage our newest capture unless it is older than the currently-staged capture + final DatapointCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, DatapointCapture::selectNewerCapture); + + // promote our previously-staged capture IFF our newest capture is too far + // ahead of the current tail to support policy's resolution. + if (previouslyStaged != null && Math.subtractExact(newestCaptureNanoTime, casTail.captureNanoTime()) > policy.resolutionNanos()) { + // attempt to set an _unlinked_ Node to our tail + final Node proposedNode = new Node(previouslyStaged); + if (this.tail.compareAndSet(casTail, proposedNode)) { + // if we succeeded at setting an unlinked node, link to it from our old tail + casTail.setNext(proposedNode); + + // perform a force-compaction of our head if necessary, + // detected using plain memory access + final Node currentHead = head.getPlain(); + final long headAgeNanos = Math.subtractExact(newestCaptureNanoTime, currentHead.captureNanoTime()); + if (ExtendedFlowMetric.LOGGER.isTraceEnabled()) { + ExtendedFlowMetric.LOGGER.trace("{} post-append result (captures: `{}` span: `{}` }", this, estimateSize(currentHead), Duration.ofNanos(headAgeNanos)); + } + if (headAgeNanos > policy.forceCompactionNanos()) { + final Node compactHead = compactHead(Math.subtractExact(newestCaptureNanoTime, policy.retentionNanos())); + if (ExtendedFlowMetric.LOGGER.isDebugEnabled()) { + final long compactHeadAgeNanos = Math.subtractExact(newestCaptureNanoTime, compactHead.captureNanoTime()); + ExtendedFlowMetric.LOGGER.debug("{} forced-compaction result (captures: `{}` span: `{}`)", this, estimateSize(compactHead), Duration.ofNanos(compactHeadAgeNanos)); + } + } + } + } + } + + @Override + public String toString() { + return "RetentionWindow{" + + "policy=" + policy.policyName() + + " id=" + System.identityHashCode(this) + + '}'; + } + + /** + * @param nanoTime the nanoTime of the capture for which we are retrieving a baseline. + * @return an {@link Optional} that contains the youngest {@link DatapointCapture} that is older + * than this window's {@link FlowMetricRetentionPolicy} allowed retention if one + * exists, and is otherwise empty. + */ + public Optional baseline(final long nanoTime) { + final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos()); + final Node head = compactHead(barrier); + if (head.captureNanoTime() <= barrier) { + return Optional.of(head.capture); + } else { + return Optional.empty(); + } + } + + /** + * @return a computationally-expensive estimate of the number of captures in this window, + * using plain memory access. This should NOT be run in unguarded production code. + */ + private static int estimateSize(final Node headNode) { + int i = 1; // assume we have one additional staged + // NOTE: we chase the provided headNode's tail with plain-gets, + // which tolerates missed appends from other threads. + for (Node current = headNode; current != null; current = current.getNextPlain()) { + i++; + } + return i; + } + + /** + * @see RetentionWindow#estimateSize(Node) + */ + int estimateSize() { + return estimateSize(this.head.getPlain()); + } + + /** + * @param barrier a nanoTime that will NOT be crossed during compaction + * @return the head node after compaction up to the provided barrier. + */ + private Node compactHead(final long barrier) { + return this.head.updateAndGet((existingHead) -> { + final Node proposedHead = existingHead.seekWithoutCrossing(barrier); + return Objects.requireNonNullElse(proposedHead, existingHead); + }); + } + + /** + * Internal testing support + */ + long excessRetained(final long currentNanoTime, final ToLongFunction retentionWindowFunction) { + final long barrier = Math.subtractExact(currentNanoTime, retentionWindowFunction.applyAsLong(this.policy)); + return Math.max(0L, Math.subtractExact(barrier, this.head.getPlain().captureNanoTime())); + } + + /** + * A {@link Node} holds a single {@link DatapointCapture} and + * may link ahead to the next {@link Node}. + * It is an implementation detail of {@link RetentionWindow}. + */ + private static class Node { + private static final VarHandle NEXT; + + static { + try { + MethodHandles.Lookup l = MethodHandles.lookup(); + NEXT = l.findVarHandle(Node.class, "next", Node.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + + private final DatapointCapture capture; + private volatile Node next; + + Node(final DatapointCapture capture) { + this.capture = capture; + } + + Node seekWithoutCrossing(final long barrier) { + Node newestOlderThanThreshold = null; + Node candidate = this; + + while (candidate != null && candidate.captureNanoTime() < barrier) { + newestOlderThanThreshold = candidate; + candidate = candidate.getNext(); + } + return newestOlderThanThreshold; + } + + long captureNanoTime() { + return this.capture.nanoTime(); + } + + void setNext(final Node nextNode) { + next = nextNode; + } + + Node getNext() { + return next; + } + + Node getNextPlain() { + return (Node) NEXT.get(this); + } + } +} From ad3f56f6a765a33d08de673288e51514da6692e6 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 17 Dec 2025 14:57:47 +0100 Subject: [PATCH 3/8] Moved selectNewerCapture to newly created DatapointCapture --- .../instrument/metrics/DatapointCapture.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java index 0e36edd7984..921debab087 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java @@ -39,13 +39,22 @@ public long nanoTime() { return nanoTime; } + DatapointCapture selectNewestCapture(final DatapointCapture proposed) { + if (proposed == null) { + return this; + } + + return (this.nanoTime() > proposed.nanoTime()) ? this : proposed; + } + /** * Internal tooling to select the younger of two captures */ static DatapointCapture selectNewerCapture(final DatapointCapture existing, final DatapointCapture proposed) { - if (existing == null) { return proposed; } - if (proposed == null) { return existing; } + if (existing == null) { + return proposed; + } - return (existing.nanoTime() > proposed.nanoTime()) ? existing : proposed; + return existing.selectNewestCapture(proposed); } } From a9caff4efef26f3bef1b13f0e8e43fa5a2ef0ca7 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 23 Dec 2025 10:51:54 +0100 Subject: [PATCH 4/8] Fixed method naming --- .../java/org/logstash/instrument/metrics/DatapointCapture.java | 2 +- .../java/org/logstash/instrument/metrics/RetentionWindow.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java index 921debab087..4d309afb3a8 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java @@ -50,7 +50,7 @@ DatapointCapture selectNewestCapture(final DatapointCapture proposed) { /** * Internal tooling to select the younger of two captures */ - static DatapointCapture selectNewerCapture(final DatapointCapture existing, final DatapointCapture proposed) { + static DatapointCapture selectNewestCaptureOf(final DatapointCapture existing, final DatapointCapture proposed) { if (existing == null) { return proposed; } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java index 53aae96953f..2ab3da8f13b 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java @@ -44,7 +44,7 @@ void append(final DatapointCapture newestCapture) { final long newestCaptureNanoTime = newestCapture.nanoTime(); // stage our newest capture unless it is older than the currently-staged capture - final DatapointCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, DatapointCapture::selectNewerCapture); + final DatapointCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, DatapointCapture::selectNewestCapture); // promote our previously-staged capture IFF our newest capture is too far // ahead of the current tail to support policy's resolution. From 747e63764ac98c812786a3138da3f30a9d1a5e9a Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 23 Dec 2025 11:26:25 +0100 Subject: [PATCH 5/8] Covered type cast with warn log (really unrealistic) --- .../instrument/metrics/ExtendedFlowMetric.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java index 7bc24f6a47f..28e81fc60a1 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java @@ -96,7 +96,15 @@ public Map getValue() { this.retentionWindows.get() .forEach(window -> window.baseline(currentCapture.nanoTime()) .or(() -> windowDefaultBaseline(window)) - .map((baseline) -> calculateRate(currentCapture, (FlowCapture) baseline)) + .map(b -> { + if (b instanceof FlowCapture baseline) { + return calculateRate(currentCapture, baseline); + } else { + LOGGER.warn("Retention window `{}` provided a non-FlowCapture baseline `{}`; skipping rate calculation", + window.policy.policyName(), b); + return OptionalDouble.empty(); + } + }) .orElseGet(OptionalDouble::empty) .ifPresent((rate) -> rates.put(window.policy.policyName(), rate))); From 6208424e98fea9ab6f539f14ed440777faead699 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 23 Dec 2025 11:27:28 +0100 Subject: [PATCH 6/8] Added license header to source code file --- .../instrument/metrics/RetentionWindow.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java index 2ab3da8f13b..91555dbfe5e 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java @@ -1,3 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.logstash.instrument.metrics; import java.lang.invoke.MethodHandles; From b0b07219db62f5682480168b5487b030418855ea Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 23 Dec 2025 11:42:30 +0100 Subject: [PATCH 7/8] Minor, added javadoc comment --- .../org/logstash/instrument/metrics/DatapointCapture.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java index 4d309afb3a8..ed291144b96 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java @@ -39,6 +39,9 @@ public long nanoTime() { return nanoTime; } + /** + * Internal tooling to select the younger capture between and this and proposed. + */ DatapointCapture selectNewestCapture(final DatapointCapture proposed) { if (proposed == null) { return this; @@ -48,7 +51,7 @@ DatapointCapture selectNewestCapture(final DatapointCapture proposed) { } /** - * Internal tooling to select the younger of two captures + * Internal tooling to select the younger of two captures provided. */ static DatapointCapture selectNewestCaptureOf(final DatapointCapture existing, final DatapointCapture proposed) { if (existing == null) { From 77c8b7ac37953bb6697d4667aad643b5aa370084 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 23 Dec 2025 12:19:22 +0100 Subject: [PATCH 8/8] Fixed bad method renaming --- .../java/org/logstash/instrument/metrics/RetentionWindow.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java index 91555dbfe5e..4e77224a0e4 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java @@ -63,7 +63,7 @@ void append(final DatapointCapture newestCapture) { final long newestCaptureNanoTime = newestCapture.nanoTime(); // stage our newest capture unless it is older than the currently-staged capture - final DatapointCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, DatapointCapture::selectNewestCapture); + final DatapointCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, DatapointCapture::selectNewestCaptureOf); // promote our previously-staged capture IFF our newest capture is too far // ahead of the current tail to support policy's resolution.