Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash.instrument.metrics;

/**
* A {@link DatapointCapture} provides access to timing information
* captured at a specific point in time.
*/
abstract class DatapointCapture {

private final long nanoTime;

DatapointCapture(final long nanoTime) {
this.nanoTime = nanoTime;
}

/**
* @return the nanoTime of this capture, as provided at time
* of capture by the {@link FlowMetric}.
*/
public long nanoTime() {
return nanoTime;
}

/**
* Internal tooling to select the younger capture between and this and proposed.
*/
DatapointCapture selectNewestCapture(final DatapointCapture proposed) {
if (proposed == null) {
return this;
}

return (this.nanoTime() > proposed.nanoTime()) ? this : proposed;
}

/**
* Internal tooling to select the younger of two captures provided.
*/
static DatapointCapture selectNewestCaptureOf(final DatapointCapture existing, final DatapointCapture proposed) {
if (existing == null) {
return proposed;
}

return existing.selectNewestCapture(proposed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,15 @@
import org.apache.logging.log4j.Logger;
import org.logstash.util.SetOnceReference;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -100,7 +96,15 @@ public Map<String, Double> getValue() {
this.retentionWindows.get()
.forEach(window -> window.baseline(currentCapture.nanoTime())
.or(() -> windowDefaultBaseline(window))
.map((baseline) -> calculateRate(currentCapture, baseline))
.map(b -> {
if (b instanceof FlowCapture baseline) {
return calculateRate(currentCapture, baseline);
} else {
LOGGER.warn("Retention window `{}` provided a non-FlowCapture baseline `{}`; skipping rate calculation",
window.policy.policyName(), b);
return OptionalDouble.empty();
}
})
.orElseGet(OptionalDouble::empty)
.ifPresent((rate) -> rates.put(window.policy.policyName(), rate)));

Expand Down Expand Up @@ -132,16 +136,6 @@ private static void injectIntoRetentionWindows(final List<RetentionWindow> reten
retentionWindows.forEach((rw) -> rw.append(capture));
}

/**
* Internal tooling to select the younger of two captures
*/
private static FlowCapture selectNewerCapture(final FlowCapture existing, final FlowCapture proposed) {
if (existing == null) { return proposed; }
if (proposed == null) { return existing; }

return (existing.nanoTime() > proposed.nanoTime()) ? existing : proposed;
}

/**
* If a window's policy allows it to report before its retention has been reached,
* use our lifetime baseline as a default.
Expand Down Expand Up @@ -180,183 +174,4 @@ Duration estimateExcessRetained(final ToLongFunction<FlowMetricRetentionPolicy>
.sum();
return Duration.ofNanos(cumulativeExcessRetained);
}

/**
* A {@link RetentionWindow} efficiently holds sufficient {@link FlowCapture}s to
* meet its {@link FlowMetricRetentionPolicy}, providing access to the youngest capture
* that is older than the policy's allowed retention (if any).
* The implementation is similar to a singly-linked list whose youngest captures are at
* the tail and oldest captures are at the head, with an additional pre-tail stage.
* Compaction is always done at read-time and occasionally at write-time.
* Both reads and writes are non-blocking and concurrency-safe.
*/
private static class RetentionWindow {
private final AtomicReference<FlowCapture> stagedCapture = new AtomicReference<>();
private final AtomicReference<Node> tail;
private final AtomicReference<Node> head;
private final FlowMetricRetentionPolicy policy;

RetentionWindow(final FlowMetricRetentionPolicy policy, final FlowCapture zeroCapture) {
this.policy = policy;
final Node zeroNode = new Node(zeroCapture);
this.head = new AtomicReference<>(zeroNode);
this.tail = new AtomicReference<>(zeroNode);
}

/**
* Append the newest {@link FlowCapture} into this {@link RetentionWindow},
* while respecting our {@link FlowMetricRetentionPolicy}.
* We tolerate minor jitter in the provided {@link FlowCapture#nanoTime()}, but
* expect callers of this method to minimize lag between instantiating the capture
* and appending it.
*
* @param newestCapture the newest capture to stage
*/
private void append(final FlowCapture newestCapture) {
final Node casTail = this.tail.getAcquire(); // for CAS
final long newestCaptureNanoTime = newestCapture.nanoTime();

// stage our newest capture unless it is older than the currently-staged capture
final FlowCapture previouslyStaged = stagedCapture.getAndAccumulate(newestCapture, ExtendedFlowMetric::selectNewerCapture);

// promote our previously-staged capture IFF our newest capture is too far
// ahead of the current tail to support policy's resolution.
if (previouslyStaged != null && Math.subtractExact(newestCaptureNanoTime, casTail.captureNanoTime()) > policy.resolutionNanos()) {
// attempt to set an _unlinked_ Node to our tail
final Node proposedNode = new Node(previouslyStaged);
if (this.tail.compareAndSet(casTail, proposedNode)) {
// if we succeeded at setting an unlinked node, link to it from our old tail
casTail.setNext(proposedNode);

// perform a force-compaction of our head if necessary,
// detected using plain memory access
final Node currentHead = head.getPlain();
final long headAgeNanos = Math.subtractExact(newestCaptureNanoTime, currentHead.captureNanoTime());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("{} post-append result (captures: `{}` span: `{}` }", this, estimateSize(currentHead), Duration.ofNanos(headAgeNanos));
}
if (headAgeNanos > policy.forceCompactionNanos()) {
final Node compactHead = compactHead(Math.subtractExact(newestCaptureNanoTime, policy.retentionNanos()));
if (LOGGER.isDebugEnabled()) {
final long compactHeadAgeNanos = Math.subtractExact(newestCaptureNanoTime, compactHead.captureNanoTime());
LOGGER.debug("{} forced-compaction result (captures: `{}` span: `{}`)", this, estimateSize(compactHead), Duration.ofNanos(compactHeadAgeNanos));
}
}
}
}
}

@Override
public String toString() {
return "RetentionWindow{" +
"policy=" + policy.policyName() +
" id=" + System.identityHashCode(this) +
'}';
}

/**
* @param nanoTime the nanoTime of the capture for which we are retrieving a baseline.
* @return an {@link Optional} that contains the youngest {@link FlowCapture} that is older
* than this window's {@link FlowMetricRetentionPolicy} allowed retention if one
* exists, and is otherwise empty.
*/
public Optional<FlowCapture> baseline(final long nanoTime) {
final long barrier = Math.subtractExact(nanoTime, policy.retentionNanos());
final Node head = compactHead(barrier);
if (head.captureNanoTime() <= barrier) {
return Optional.of(head.capture);
} else {
return Optional.empty();
}
}

/**
* @return a computationally-expensive estimate of the number of captures in this window,
* using plain memory access. This should NOT be run in unguarded production code.
*/
private static int estimateSize(final Node headNode) {
int i = 1; // assume we have one additional staged
// NOTE: we chase the provided headNode's tail with plain-gets,
// which tolerates missed appends from other threads.
for (Node current = headNode; current != null; current = current.getNextPlain()) { i++; }
return i;
}

/**
* @see RetentionWindow#estimateSize(Node)
*/
private int estimateSize() {
return estimateSize(this.head.getPlain());
}

/**
* @param barrier a nanoTime that will NOT be crossed during compaction
* @return the head node after compaction up to the provided barrier.
*/
private Node compactHead(final long barrier) {
return this.head.updateAndGet((existingHead) -> {
final Node proposedHead = existingHead.seekWithoutCrossing(barrier);
return Objects.requireNonNullElse(proposedHead, existingHead);
});
}

/**
* Internal testing support
*/
private long excessRetained(final long currentNanoTime, final ToLongFunction<FlowMetricRetentionPolicy> retentionWindowFunction) {
final long barrier = Math.subtractExact(currentNanoTime, retentionWindowFunction.applyAsLong(this.policy));
return Math.max(0L, Math.subtractExact(barrier, this.head.getPlain().captureNanoTime()));
}

/**
* A {@link Node} holds a single {@link FlowCapture} and
* may link ahead to the next {@link Node}.
* It is an implementation detail of {@link RetentionWindow}.
*/
private static class Node {
private static final VarHandle NEXT;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Node.class, "next", Node.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}

private final FlowCapture capture;
private volatile Node next;

Node(final FlowCapture capture) {
this.capture = capture;
}

Node seekWithoutCrossing(final long barrier) {
Node newestOlderThanThreshold = null;
Node candidate = this;

while(candidate != null && candidate.captureNanoTime() < barrier) {
newestOlderThanThreshold = candidate;
candidate = candidate.getNext();
}
return newestOlderThanThreshold;
}

long captureNanoTime() {
return this.capture.nanoTime();
}

void setNext(final Node nextNode) {
next = nextNode;
}

Node getNext() {
return next;
}

Node getNextPlain() {
return (Node)NEXT.get(this);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,16 @@
* point-in-time data for a pair of {@link Metric}s.
* It is immutable.
*/
class FlowCapture {
class FlowCapture extends DatapointCapture {
private final Number numerator;
private final Number denominator;

private final long nanoTime;

FlowCapture(final long nanoTime,
final Number numerator,
final Number denominator) {
super(nanoTime);
this.numerator = numerator;
this.denominator = denominator;
this.nanoTime = nanoTime;
}

/**
* @return the nanoTime of this capture, as provided at time
* of capture by the {@link FlowMetric}.
*/
public long nanoTime() {
return nanoTime;
}

/**
Expand All @@ -65,7 +55,7 @@ public BigDecimal denominator() {
@Override
public String toString() {
return getClass().getSimpleName() +"{" +
"nanoTimestamp=" + nanoTime +
"nanoTimestamp=" + nanoTime() +
" numerator=" + numerator() +
" denominator=" + denominator() +
'}';
Expand Down
Loading