diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java new file mode 100644 index 00000000000..ed291144b96 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/DatapointCapture.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.instrument.metrics; + +/** + * A {@link DatapointCapture} provides access to timing information + * captured at a specific point in time. + */ +abstract class DatapointCapture { + + private final long nanoTime; + + DatapointCapture(final long nanoTime) { + this.nanoTime = nanoTime; + } + + /** + * @return the nanoTime of this capture, as provided at time + * of capture by the {@link FlowMetric}. + */ + public long nanoTime() { + return nanoTime; + } + + /** + * Internal tooling to select the younger capture between and this and proposed. + */ + DatapointCapture selectNewestCapture(final DatapointCapture proposed) { + if (proposed == null) { + return this; + } + + return (this.nanoTime() > proposed.nanoTime()) ? this : proposed; + } + + /** + * Internal tooling to select the younger of two captures provided. + */ + static DatapointCapture selectNewestCaptureOf(final DatapointCapture existing, final DatapointCapture proposed) { + if (existing == null) { + return proposed; + } + + return existing.selectNewestCapture(proposed); + } +} diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java index e93655f3d1f..28e81fc60a1 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/ExtendedFlowMetric.java @@ -23,8 +23,6 @@ import org.apache.logging.log4j.Logger; import org.logstash.util.SetOnceReference; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -32,10 +30,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.OptionalDouble; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import java.util.function.ToLongFunction; import java.util.stream.Collectors; @@ -100,7 +96,15 @@ public Map getValue() { this.retentionWindows.get() .forEach(window -> window.baseline(currentCapture.nanoTime()) .or(() -> windowDefaultBaseline(window)) - .map((baseline) -> calculateRate(currentCapture, baseline)) + .map(b -> { + if (b instanceof FlowCapture baseline) { + return calculateRate(currentCapture, baseline); + } else { + LOGGER.warn("Retention window `{}` provided a non-FlowCapture baseline `{}`; skipping rate calculation", + window.policy.policyName(), b); + return OptionalDouble.empty(); + } + }) .orElseGet(OptionalDouble::empty) .ifPresent((rate) -> rates.put(window.policy.policyName(), rate))); @@ -132,16 +136,6 @@ private static void injectIntoRetentionWindows(final List reten retentionWindows.forEach((rw) -> rw.append(capture)); } - /** - * Internal tooling to select the younger of two captures - */ - private static FlowCapture selectNewerCapture(final FlowCapture existing, final FlowCapture proposed) { - if (existing == null) { return proposed; } - if (proposed == null) { return existing; } - - return (existing.nanoTime() > proposed.nanoTime()) ? existing : proposed; - } - /** * If a window's policy allows it to report before its retention has been reached, * use our lifetime baseline as a default. @@ -180,183 +174,4 @@ Duration estimateExcessRetained(final ToLongFunction .sum(); return Duration.ofNanos(cumulativeExcessRetained); } - - /** - * A {@link RetentionWindow} efficiently holds sufficient {@link FlowCapture}s to - * meet its {@link FlowMetricRetentionPolicy}, providing access to the youngest capture - * that is older than the policy's allowed retention (if any). - * The implementation is similar to a singly-linked list whose youngest captures are at - * the tail and oldest captures are at the head, with an additional pre-tail stage. - * Compaction is always done at read-time and occasionally at write-time. - * Both reads and writes are non-blocking and concurrency-safe. - */ - private static class RetentionWindow { - private final AtomicReference stagedCapture = new AtomicReference<>(); - private final AtomicReference tail; - private final AtomicReference head; - private final FlowMetricRetentionPolicy policy; - - RetentionWindow(final FlowMetricRetentionPolicy policy, final FlowCapture zeroCapture) { - this.policy = policy; - final Node zeroNode = new Node(zeroCapture); - this.head = new AtomicReference<>(zeroNode); - this.tail = new AtomicReference<>(zeroNode); - } - - /** - * Append the newest {@link FlowCapture} into this {@link RetentionWindow}, - * while respecting our {@link FlowMetricRetentionPolicy}. - * We tolerate minor jitter in the provided {@link FlowCapture#nanoTime()}, but - * expect callers of this method to minimize lag between instantiating the capture - * and appending it. - * - * @param newestCapture the newest capture to stage - */ - private void append(final FlowCapture newestCapture) { - final Node casTail = this.tail.getAcquire(); // for CAS - final long newestCaptureNanoTime = newestCapture.nanoTime(); - - // stage our newest capture unless it is older than the currently-staged capture - final FlowCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, ExtendedFlowMetric::selectNewerCapture); - - // promote our previously-staged capture IFF our newest capture is too far - // ahead of the current tail to support policy's resolution. - if (previouslyStaged != null && Math.subtractExact(newestCaptureNanoTime, casTail.captureNanoTime()) > policy.resolutionNanos()) { - // attempt to set an _unlinked_ Node to our tail - final Node proposedNode = new Node(previouslyStaged); - if (this.tail.compareAndSet(casTail, proposedNode)) { - // if we succeeded at setting an unlinked node, link to it from our old tail - casTail.setNext(proposedNode); - - // perform a force-compaction of our head if necessary, - // detected using plain memory access - final Node currentHead = head.getPlain(); - final long headAgeNanos = Math.subtractExact(newestCaptureNanoTime, currentHead.captureNanoTime()); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("{} post-append result (captures: `{}` span: `{}` }", this, estimateSize(currentHead), Duration.ofNanos(headAgeNanos)); - } - if (headAgeNanos > policy.forceCompactionNanos()) { - final Node compactHead = compactHead(Math.subtractExact(newestCaptureNanoTime, policy.retentionNanos())); - if (LOGGER.isDebugEnabled()) { - final long compactHeadAgeNanos = Math.subtractExact(newestCaptureNanoTime, compactHead.captureNanoTime()); - LOGGER.debug("{} forced-compaction result (captures: `{}` span: `{}`)", this, estimateSize(compactHead), Duration.ofNanos(compactHeadAgeNanos)); - } - } - } - } - } - - @Override - public String toString() { - return "RetentionWindow{" + - "policy=" + policy.policyName() + - " id=" + System.identityHashCode(this) + - '}'; - } - - /** - * @param nanoTime the nanoTime of the capture for which we are retrieving a baseline. - * @return an {@link Optional} that contains the youngest {@link FlowCapture} that is older - * than this window's {@link FlowMetricRetentionPolicy} allowed retention if one - * exists, and is otherwise empty. - */ - public Optional baseline(final long nanoTime) { - final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos()); - final Node head = compactHead(barrier); - if (head.captureNanoTime() <= barrier) { - return Optional.of(head.capture); - } else { - return Optional.empty(); - } - } - - /** - * @return a computationally-expensive estimate of the number of captures in this window, - * using plain memory access. This should NOT be run in unguarded production code. - */ - private static int estimateSize(final Node headNode) { - int i = 1; // assume we have one additional staged - // NOTE: we chase the provided headNode's tail with plain-gets, - // which tolerates missed appends from other threads. - for (Node current = headNode; current != null; current = current.getNextPlain()) { i++; } - return i; - } - - /** - * @see RetentionWindow#estimateSize(Node) - */ - private int estimateSize() { - return estimateSize(this.head.getPlain()); - } - - /** - * @param barrier a nanoTime that will NOT be crossed during compaction - * @return the head node after compaction up to the provided barrier. - */ - private Node compactHead(final long barrier) { - return this.head.updateAndGet((existingHead) -> { - final Node proposedHead = existingHead.seekWithoutCrossing(barrier); - return Objects.requireNonNullElse(proposedHead, existingHead); - }); - } - - /** - * Internal testing support - */ - private long excessRetained(final long currentNanoTime, final ToLongFunction retentionWindowFunction) { - final long barrier = Math.subtractExact(currentNanoTime, retentionWindowFunction.applyAsLong(this.policy)); - return Math.max(0L, Math.subtractExact(barrier, this.head.getPlain().captureNanoTime())); - } - - /** - * A {@link Node} holds a single {@link FlowCapture} and - * may link ahead to the next {@link Node}. - * It is an implementation detail of {@link RetentionWindow}. - */ - private static class Node { - private static final VarHandle NEXT; - static { - try { - MethodHandles.Lookup l = MethodHandles.lookup(); - NEXT = l.findVarHandle(Node.class, "next", Node.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - } - - private final FlowCapture capture; - private volatile Node next; - - Node(final FlowCapture capture) { - this.capture = capture; - } - - Node seekWithoutCrossing(final long barrier) { - Node newestOlderThanThreshold = null; - Node candidate = this; - - while(candidate != null && candidate.captureNanoTime() < barrier) { - newestOlderThanThreshold = candidate; - candidate = candidate.getNext(); - } - return newestOlderThanThreshold; - } - - long captureNanoTime() { - return this.capture.nanoTime(); - } - - void setNext(final Node nextNode) { - next = nextNode; - } - - Node getNext() { - return next; - } - - Node getNextPlain() { - return (Node)NEXT.get(this); - } - } - } } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowCapture.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowCapture.java index 3b8444cd02b..d8bc5303bd7 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowCapture.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowCapture.java @@ -26,26 +26,16 @@ * point-in-time data for a pair of {@link Metric}s. * It is immutable. */ -class FlowCapture { +class FlowCapture extends DatapointCapture { private final Number numerator; private final Number denominator; - private final long nanoTime; - FlowCapture(final long nanoTime, final Number numerator, final Number denominator) { + super(nanoTime); this.numerator = numerator; this.denominator = denominator; - this.nanoTime = nanoTime; - } - - /** - * @return the nanoTime of this capture, as provided at time - * of capture by the {@link FlowMetric}. - */ - public long nanoTime() { - return nanoTime; } /** @@ -65,7 +55,7 @@ public BigDecimal denominator() { @Override public String toString() { return getClass().getSimpleName() +"{" + - "nanoTimestamp=" + nanoTime + + "nanoTimestamp=" + nanoTime() + " numerator=" + numerator() + " denominator=" + denominator() + '}'; diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java new file mode 100644 index 00000000000..4e77224a0e4 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/RetentionWindow.java @@ -0,0 +1,210 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.instrument.metrics; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.ToLongFunction; + +/** + * A {@link RetentionWindow} efficiently holds sufficient {@link DatapointCapture}s to + * meet its {@link FlowMetricRetentionPolicy}, providing access to the youngest capture + * that is older than the policy's allowed retention (if any). + * The implementation is similar to a singly-linked list whose youngest captures are at + * the tail and oldest captures are at the head, with an additional pre-tail stage. + * Compaction is always done at read-time and occasionally at write-time. + * Both reads and writes are non-blocking and concurrency-safe. + */ +class RetentionWindow { + private final AtomicReference stagedCapture = new AtomicReference<>(); + private final AtomicReference tail; + private final AtomicReference head; + final FlowMetricRetentionPolicy policy; + + RetentionWindow(final FlowMetricRetentionPolicy policy, final DatapointCapture zeroCapture) { + this.policy = policy; + final Node zeroNode = new Node(zeroCapture); + this.head = new AtomicReference<>(zeroNode); + this.tail = new AtomicReference<>(zeroNode); + } + + /** + * Append the newest {@link DatapointCapture} into this {@link RetentionWindow}, + * while respecting our {@link FlowMetricRetentionPolicy}. + * We tolerate minor jitter in the provided {@link DatapointCapture#nanoTime()}, but + * expect callers of this method to minimize lag between instantiating the capture + * and appending it. + * + * @param newestCapture the newest capture to stage + */ + void append(final DatapointCapture newestCapture) { + final Node casTail = this.tail.getAcquire(); // for CAS + final long newestCaptureNanoTime = newestCapture.nanoTime(); + + // stage our newest capture unless it is older than the currently-staged capture + final DatapointCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, DatapointCapture::selectNewestCaptureOf); + + // promote our previously-staged capture IFF our newest capture is too far + // ahead of the current tail to support policy's resolution. + if (previouslyStaged != null && Math.subtractExact(newestCaptureNanoTime, casTail.captureNanoTime()) > policy.resolutionNanos()) { + // attempt to set an _unlinked_ Node to our tail + final Node proposedNode = new Node(previouslyStaged); + if (this.tail.compareAndSet(casTail, proposedNode)) { + // if we succeeded at setting an unlinked node, link to it from our old tail + casTail.setNext(proposedNode); + + // perform a force-compaction of our head if necessary, + // detected using plain memory access + final Node currentHead = head.getPlain(); + final long headAgeNanos = Math.subtractExact(newestCaptureNanoTime, currentHead.captureNanoTime()); + if (ExtendedFlowMetric.LOGGER.isTraceEnabled()) { + ExtendedFlowMetric.LOGGER.trace("{} post-append result (captures: `{}` span: `{}` }", this, estimateSize(currentHead), Duration.ofNanos(headAgeNanos)); + } + if (headAgeNanos > policy.forceCompactionNanos()) { + final Node compactHead = compactHead(Math.subtractExact(newestCaptureNanoTime, policy.retentionNanos())); + if (ExtendedFlowMetric.LOGGER.isDebugEnabled()) { + final long compactHeadAgeNanos = Math.subtractExact(newestCaptureNanoTime, compactHead.captureNanoTime()); + ExtendedFlowMetric.LOGGER.debug("{} forced-compaction result (captures: `{}` span: `{}`)", this, estimateSize(compactHead), Duration.ofNanos(compactHeadAgeNanos)); + } + } + } + } + } + + @Override + public String toString() { + return "RetentionWindow{" + + "policy=" + policy.policyName() + + " id=" + System.identityHashCode(this) + + '}'; + } + + /** + * @param nanoTime the nanoTime of the capture for which we are retrieving a baseline. + * @return an {@link Optional} that contains the youngest {@link DatapointCapture} that is older + * than this window's {@link FlowMetricRetentionPolicy} allowed retention if one + * exists, and is otherwise empty. + */ + public Optional baseline(final long nanoTime) { + final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos()); + final Node head = compactHead(barrier); + if (head.captureNanoTime() <= barrier) { + return Optional.of(head.capture); + } else { + return Optional.empty(); + } + } + + /** + * @return a computationally-expensive estimate of the number of captures in this window, + * using plain memory access. This should NOT be run in unguarded production code. + */ + private static int estimateSize(final Node headNode) { + int i = 1; // assume we have one additional staged + // NOTE: we chase the provided headNode's tail with plain-gets, + // which tolerates missed appends from other threads. + for (Node current = headNode; current != null; current = current.getNextPlain()) { + i++; + } + return i; + } + + /** + * @see RetentionWindow#estimateSize(Node) + */ + int estimateSize() { + return estimateSize(this.head.getPlain()); + } + + /** + * @param barrier a nanoTime that will NOT be crossed during compaction + * @return the head node after compaction up to the provided barrier. + */ + private Node compactHead(final long barrier) { + return this.head.updateAndGet((existingHead) -> { + final Node proposedHead = existingHead.seekWithoutCrossing(barrier); + return Objects.requireNonNullElse(proposedHead, existingHead); + }); + } + + /** + * Internal testing support + */ + long excessRetained(final long currentNanoTime, final ToLongFunction retentionWindowFunction) { + final long barrier = Math.subtractExact(currentNanoTime, retentionWindowFunction.applyAsLong(this.policy)); + return Math.max(0L, Math.subtractExact(barrier, this.head.getPlain().captureNanoTime())); + } + + /** + * A {@link Node} holds a single {@link DatapointCapture} and + * may link ahead to the next {@link Node}. + * It is an implementation detail of {@link RetentionWindow}. + */ + private static class Node { + private static final VarHandle NEXT; + + static { + try { + MethodHandles.Lookup l = MethodHandles.lookup(); + NEXT = l.findVarHandle(Node.class, "next", Node.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + + private final DatapointCapture capture; + private volatile Node next; + + Node(final DatapointCapture capture) { + this.capture = capture; + } + + Node seekWithoutCrossing(final long barrier) { + Node newestOlderThanThreshold = null; + Node candidate = this; + + while (candidate != null && candidate.captureNanoTime() < barrier) { + newestOlderThanThreshold = candidate; + candidate = candidate.getNext(); + } + return newestOlderThanThreshold; + } + + long captureNanoTime() { + return this.capture.nanoTime(); + } + + void setNext(final Node nextNode) { + next = nextNode; + } + + Node getNext() { + return next; + } + + Node getNextPlain() { + return (Node) NEXT.get(this); + } + } +}