From df30eef6f11454ce9b5f6fe3dd45ee931c41cfc0 Mon Sep 17 00:00:00 2001 From: Dom G Date: Fri, 15 Mar 2024 11:13:19 -0400 Subject: [PATCH 1/6] Use NanoTime in more places --- .../clientImpl/TabletServerBatchWriter.java | 6 +++--- .../zookeeper/DistributedReadWriteLock.java | 9 +++++---- .../apache/accumulo/core/util/OpTimer.java | 20 ++++++++++++------- .../org/apache/accumulo/core/util/Retry.java | 13 ++++++------ .../server/mem/LowMemoryDetector.java | 10 +++++----- .../accumulo/server/rpc/TimedProcessor.java | 15 +++++++------- .../accumulo/tserver/tablet/Tablet.java | 18 +++++++++-------- 7 files changed, 50 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index c2543cb8ee8..580e09541e9 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -43,7 +43,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -80,6 +79,7 @@ import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; @@ -1098,7 +1098,7 @@ private void cancelSession() throws InterruptedException, ThriftSecurityExceptio final HostAndPort parsedServer = HostAndPort.fromString(location); - long startTime = System.nanoTime(); + NanoTime startTime = NanoTime.now(); boolean useCloseUpdate = false; @@ -1172,7 +1172,7 @@ private void cancelSession() throws InterruptedException, ThriftSecurityExceptio } // if a timeout is set on the batch writer, then do not retry longer than the timeout - if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) > timeout) { + if (startTime.elapsed().toMillis() > timeout) { log.debug("Giving up on canceling session {} {} and timing out.", location, usid); throw new TimedOutException(Set.of(location)); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index 0bf4af19c7d..39cf1ccf3f9 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Map.Entry; @@ -31,6 +32,7 @@ import java.util.concurrent.locks.Lock; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.time.NanoTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -179,15 +181,14 @@ public boolean tryLock() { @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - long now = System.currentTimeMillis(); - long returnTime = now + MILLISECONDS.convert(time, unit); - while (returnTime > now) { + Duration returnTime = Duration.of(time, unit.toChronoUnit()); + NanoTime start = NanoTime.now(); + while (start.elapsed().compareTo(returnTime) < 0) { if (tryLock()) { return true; } // TODO: do something better than poll - ACCUMULO-1310 UtilWaitThread.sleep(100); - now = System.currentTimeMillis(); } return false; } diff --git a/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java b/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java index 6653c20d317..28d0514ba02 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java +++ b/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java @@ -20,8 +20,11 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; +import java.time.Duration; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.util.time.NanoTime; + /** * Provides a stop watch for timing a single type of event. This code is based on the * org.apache.hadoop.util.StopWatch available in hadoop 2.7.0 @@ -29,8 +32,8 @@ public class OpTimer { private boolean isStarted; - private long startNanos; - private long currentElapsedNanos; + private NanoTime startNanos; + private Duration currentElapsedNanos = Duration.ZERO; /** * Returns timer running state @@ -52,7 +55,7 @@ public OpTimer start() throws IllegalStateException { throw new IllegalStateException("OpTimer is already running"); } isStarted = true; - startNanos = System.nanoTime(); + startNanos = NanoTime.now(); return this; } @@ -66,9 +69,8 @@ public OpTimer stop() throws IllegalStateException { if (!isStarted) { throw new IllegalStateException("OpTimer is already stopped"); } - long now = System.nanoTime(); isStarted = false; - currentElapsedNanos += now - startNanos; + currentElapsedNanos = currentElapsedNanos.plus(startNanos.elapsed()); return this; } @@ -78,7 +80,7 @@ public OpTimer stop() throws IllegalStateException { * @return this instance for fluent chaining */ public OpTimer reset() { - currentElapsedNanos = 0; + currentElapsedNanos = Duration.ZERO; isStarted = false; return this; } @@ -115,7 +117,11 @@ public double scale(TimeUnit timeUnit) { * @return elapsed time in nanoseconds. */ public long now() { - return isStarted ? System.nanoTime() - startNanos + currentElapsedNanos : currentElapsedNanos; + if (isStarted) { + return startNanos.elapsed().plus(currentElapsedNanos).toNanos(); + } else { + return currentElapsedNanos.toNanos(); + } } /** diff --git a/core/src/main/java/org/apache/accumulo/core/util/Retry.java b/core/src/main/java/org/apache/accumulo/core/util/Retry.java index 28659b376f1..07e86f91a5f 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/Retry.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Retry.java @@ -22,6 +22,7 @@ import java.time.Duration; +import org.apache.accumulo.core.util.time.NanoTime; import org.slf4j.Logger; import com.google.common.annotations.VisibleForTesting; @@ -43,7 +44,7 @@ public class Retry { private boolean hasNeverLogged; private boolean hasLoggedWarn = false; - private long lastRetryLog; + private NanoTime lastRetryLog; private double currentBackOffFactor; private boolean doTimeJitter = true; @@ -64,7 +65,7 @@ private Retry(long maxRetries, Duration startWait, Duration waitIncrement, Durat this.initialWait = startWait; this.logInterval = logInterval; this.hasNeverLogged = true; - this.lastRetryLog = -1; + this.lastRetryLog = null; this.backOffFactor = backOffFactor; this.currentBackOffFactor = this.backOffFactor; @@ -201,14 +202,14 @@ protected void sleep(Duration wait) throws InterruptedException { public void logRetry(Logger log, String message, Throwable t) { // log the first time as debug, and then after every logInterval as a warning - long now = System.nanoTime(); + NanoTime now = NanoTime.now(); if (hasNeverLogged) { if (log.isDebugEnabled()) { log.debug(getMessage(message, t)); } hasNeverLogged = false; lastRetryLog = now; - } else if ((now - lastRetryLog) > logInterval.toNanos()) { + } else if (now.subtract(lastRetryLog).compareTo(logInterval) > 0) { log.warn(getMessage(message), t); lastRetryLog = now; hasLoggedWarn = true; @@ -221,14 +222,14 @@ public void logRetry(Logger log, String message, Throwable t) { public void logRetry(Logger log, String message) { // log the first time as debug, and then after every logInterval as a warning - long now = System.nanoTime(); + NanoTime now = NanoTime.now(); if (hasNeverLogged) { if (log.isDebugEnabled()) { log.debug(getMessage(message)); } hasNeverLogged = false; lastRetryLog = now; - } else if ((now - lastRetryLog) > logInterval.toNanos()) { + } else if (now.subtract(lastRetryLog).compareTo(logInterval) > 0) { log.warn(getMessage(message)); lastRetryLog = now; hasLoggedWarn = true; diff --git a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java index fb18c7c682b..043d7d963c5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java +++ b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java @@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory; import java.util.HashMap; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; @@ -30,6 +29,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +51,7 @@ public enum DetectionScope { private long lastMemorySize = 0; private int lowMemCount = 0; - private long lastMemoryCheckTime = 0; + private NanoTime lastMemoryCheckTime = null; private final Lock memCheckTimeLock = new ReentrantLock(); private volatile boolean runningLowOnMemory = false; @@ -104,7 +104,7 @@ public void logGCInfo(AccumuloConfiguration conf) { memCheckTimeLock.lock(); try { - final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + final NanoTime now = NanoTime.now(); List gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); @@ -174,8 +174,8 @@ public void logGCInfo(AccumuloConfiguration conf) { } final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) { - final long diff = now - lastMemoryCheckTime; + if (lastMemoryCheckTime != null && lastMemoryCheckTime.compareTo(now) < 0) { + final long diff = now.subtract(lastMemoryCheckTime).toMillis(); if (diff > keepAliveTimeout + 1000) { LOG.warn(String.format( "GC pause checker not called in a timely" diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java index 165479a71f0..530e52f80f0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java @@ -18,9 +18,8 @@ */ package org.apache.accumulo.server.rpc; -import static java.util.concurrent.TimeUnit.NANOSECONDS; - import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.server.metrics.ThriftMetrics; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; @@ -33,25 +32,25 @@ public class TimedProcessor implements TProcessor { private final TProcessor other; private final ThriftMetrics thriftMetrics; - private long idleStart; + private NanoTime idleStart; public TimedProcessor(TProcessor next) { this.other = next; thriftMetrics = new ThriftMetrics(); MetricsUtil.initializeProducers(thriftMetrics); - idleStart = System.nanoTime(); + idleStart = NanoTime.now(); } @Override public void process(TProtocol in, TProtocol out) throws TException { - long processStart = System.nanoTime(); - thriftMetrics.addIdle(NANOSECONDS.toMillis(processStart - idleStart)); + NanoTime processStart = NanoTime.now(); + thriftMetrics.addIdle(processStart.subtract(idleStart).toMillis()); try { other.process(in, out); } finally { // set idle to now, calc time in process - idleStart = System.nanoTime(); - thriftMetrics.addExecute(NANOSECONDS.toMillis(idleStart - processStart)); + idleStart = NanoTime.now(); + thriftMetrics.addExecute(idleStart.subtract(processStart).toMillis()); } } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 90af6d43268..2a5c2cbf220 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -27,6 +27,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.lang.ref.SoftReference; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -90,6 +91,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionStats; @@ -1726,13 +1728,13 @@ public void importDataFiles(long tid, Map fil // Clients timeout and will think that this operation failed. // Don't do it if we spent too long waiting for the lock - long now = System.nanoTime(); + NanoTime now = NanoTime.now(); synchronized (this) { if (isClosed()) { throw new IOException("tablet " + extent + " is closed"); } - long rpcTimeoutNanos = TimeUnit.MILLISECONDS.toNanos( + Duration rpcTimeoutNanos = Duration.ofNanos( (long) (getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT) * 1.1)); @@ -1745,9 +1747,9 @@ public void importDataFiles(long tid, Map fil throw new IllegalStateException(e); } - long lockWait = System.nanoTime() - now; - if (lockWait > rpcTimeoutNanos) { - throw new IOException("Timeout waiting " + TimeUnit.NANOSECONDS.toSeconds(lockWait) + Duration lockWait = now.elapsed(); + if (lockWait.compareTo(rpcTimeoutNanos) > 0) { + throw new IOException("Timeout waiting " + lockWait.toSeconds() + " seconds to get tablet lock for " + extent + " " + tid); } } @@ -1757,9 +1759,9 @@ public void importDataFiles(long tid, Map fil throw new IOException("tablet " + extent + " is closed"); } - long lockWait = System.nanoTime() - now; - if (lockWait > rpcTimeoutNanos) { - throw new IOException("Timeout waiting " + TimeUnit.NANOSECONDS.toSeconds(lockWait) + Duration lockWait = now.elapsed(); + if (lockWait.compareTo(rpcTimeoutNanos) > 0) { + throw new IOException("Timeout waiting " + lockWait.toSeconds() + " seconds to get tablet lock for " + extent + " " + tid); } From 865e11b81921cf13253f3205b2d704a071ee30ff Mon Sep 17 00:00:00 2001 From: Dom G Date: Fri, 15 Mar 2024 11:24:22 -0400 Subject: [PATCH 2/6] Rename vars to remove timeunit --- .../org/apache/accumulo/core/util/OpTimer.java | 14 +++++++------- .../org/apache/accumulo/tserver/tablet/Tablet.java | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java b/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java index 28d0514ba02..179585c72ef 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java +++ b/core/src/main/java/org/apache/accumulo/core/util/OpTimer.java @@ -32,8 +32,8 @@ public class OpTimer { private boolean isStarted; - private NanoTime startNanos; - private Duration currentElapsedNanos = Duration.ZERO; + private NanoTime start; + private Duration currentElapsed = Duration.ZERO; /** * Returns timer running state @@ -55,7 +55,7 @@ public OpTimer start() throws IllegalStateException { throw new IllegalStateException("OpTimer is already running"); } isStarted = true; - startNanos = NanoTime.now(); + start = NanoTime.now(); return this; } @@ -70,7 +70,7 @@ public OpTimer stop() throws IllegalStateException { throw new IllegalStateException("OpTimer is already stopped"); } isStarted = false; - currentElapsedNanos = currentElapsedNanos.plus(startNanos.elapsed()); + currentElapsed = currentElapsed.plus(start.elapsed()); return this; } @@ -80,7 +80,7 @@ public OpTimer stop() throws IllegalStateException { * @return this instance for fluent chaining */ public OpTimer reset() { - currentElapsedNanos = Duration.ZERO; + currentElapsed = Duration.ZERO; isStarted = false; return this; } @@ -118,9 +118,9 @@ public double scale(TimeUnit timeUnit) { */ public long now() { if (isStarted) { - return startNanos.elapsed().plus(currentElapsedNanos).toNanos(); + return start.elapsed().plus(currentElapsed).toNanos(); } else { - return currentElapsedNanos.toNanos(); + return currentElapsed.toNanos(); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 2a5c2cbf220..7799a181d01 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1734,7 +1734,7 @@ public void importDataFiles(long tid, Map fil throw new IOException("tablet " + extent + " is closed"); } - Duration rpcTimeoutNanos = Duration.ofNanos( + Duration rpcTimeout = Duration.ofNanos( (long) (getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT) * 1.1)); @@ -1748,7 +1748,7 @@ public void importDataFiles(long tid, Map fil } Duration lockWait = now.elapsed(); - if (lockWait.compareTo(rpcTimeoutNanos) > 0) { + if (lockWait.compareTo(rpcTimeout) > 0) { throw new IOException("Timeout waiting " + lockWait.toSeconds() + " seconds to get tablet lock for " + extent + " " + tid); } @@ -1760,7 +1760,7 @@ public void importDataFiles(long tid, Map fil } Duration lockWait = now.elapsed(); - if (lockWait.compareTo(rpcTimeoutNanos) > 0) { + if (lockWait.compareTo(rpcTimeout) > 0) { throw new IOException("Timeout waiting " + lockWait.toSeconds() + " seconds to get tablet lock for " + extent + " " + tid); } From 6a9a2e7272492cff54fd16ac7b9e0ee7df1e1d6c Mon Sep 17 00:00:00 2001 From: Dom G Date: Mon, 8 Apr 2024 15:45:09 -0400 Subject: [PATCH 3/6] Update server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java Co-authored-by: Keith Turner --- .../main/java/org/apache/accumulo/tserver/tablet/Tablet.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 7799a181d01..6eb1b9ae2ed 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1734,7 +1734,7 @@ public void importDataFiles(long tid, Map fil throw new IOException("tablet " + extent + " is closed"); } - Duration rpcTimeout = Duration.ofNanos( + Duration rpcTimeout = Duration.ofMillis( (long) (getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT) * 1.1)); From a42fec122c3636b39a431021bab5bbd6603dc606 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Thu, 16 Jan 2025 13:59:35 -0500 Subject: [PATCH 4/6] Replace old NanoTime object with Timer --- .../clientImpl/TabletServerBatchWriter.java | 6 +-- .../zookeeper/DistributedReadWriteLock.java | 8 ++-- .../org/apache/accumulo/core/util/Retry.java | 44 ++++++++----------- .../server/mem/LowMemoryDetector.java | 24 +++++----- .../accumulo/server/rpc/TimedProcessor.java | 18 ++++---- .../accumulo/tserver/tablet/Tablet.java | 22 +++++----- 6 files changed, 59 insertions(+), 63 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index a85560e3067..dacc3862899 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -77,9 +77,9 @@ import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; -import org.apache.accumulo.core.util.time.NanoTime; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; @@ -1096,7 +1096,7 @@ private void cancelSession() throws InterruptedException, ThriftSecurityExceptio final HostAndPort parsedServer = HostAndPort.fromString(location); - NanoTime startTime = NanoTime.now(); + Timer timer = Timer.startNew(); boolean useCloseUpdate = false; @@ -1170,7 +1170,7 @@ private void cancelSession() throws InterruptedException, ThriftSecurityExceptio } // if a timeout is set on the batch writer, then do not retry longer than the timeout - if (startTime.elapsed().toMillis() > timeout) { + if (timer.hasElapsed(timeout, MILLISECONDS)) { log.debug("Giving up on canceling session {} {} and timing out.", location, usid); throw new TimedOutException(Set.of(location)); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index 66c60f3236c..c87361f7849 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -22,7 +22,6 @@ import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Map.Entry; @@ -31,8 +30,8 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.core.util.time.NanoTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -182,9 +181,8 @@ public boolean tryLock() { @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - Duration returnTime = Duration.of(time, unit.toChronoUnit()); - NanoTime start = NanoTime.now(); - while (start.elapsed().compareTo(returnTime) < 0) { + Timer timer = Timer.startNew(); + while (timer.hasElapsed(time, unit)) { if (tryLock()) { return true; } diff --git a/core/src/main/java/org/apache/accumulo/core/util/Retry.java b/core/src/main/java/org/apache/accumulo/core/util/Retry.java index 73616fe55ae..0513b111d72 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/Retry.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Retry.java @@ -22,7 +22,6 @@ import java.time.Duration; -import org.apache.accumulo.core.util.time.NanoTime; import org.slf4j.Logger; import com.google.common.annotations.VisibleForTesting; @@ -42,9 +41,8 @@ public class Retry { private Duration currentWait; private Duration initialWait; - private boolean hasNeverLogged; private boolean hasLoggedWarn = false; - private NanoTime lastRetryLog; + private Timer lastRetryLogTimer; private double currentBackOffFactor; private boolean doTimeJitter = true; @@ -64,8 +62,7 @@ private Retry(long maxRetries, Duration startWait, Duration waitIncrement, Durat this.currentWait = startWait; this.initialWait = startWait; this.logInterval = logInterval; - this.hasNeverLogged = true; - this.lastRetryLog = null; + this.lastRetryLogTimer = null; this.backOffFactor = backOffFactor; this.currentBackOffFactor = this.backOffFactor; @@ -202,16 +199,14 @@ protected void sleep(Duration wait) throws InterruptedException { public void logRetry(Logger log, String message, Throwable t) { // log the first time as debug, and then after every logInterval as a warning - NanoTime now = NanoTime.now(); - if (hasNeverLogged) { + if (lastRetryLogTimer == null) { if (log.isDebugEnabled()) { log.debug(getMessage(message, t)); } - hasNeverLogged = false; - lastRetryLog = now; - } else if (now.subtract(lastRetryLog).compareTo(logInterval) > 0) { + lastRetryLogTimer = Timer.startNew(); + } else if (lastRetryLogTimer.hasElapsed(logInterval)) { log.warn(getMessage(message), t); - lastRetryLog = now; + lastRetryLogTimer.restart(); hasLoggedWarn = true; } else { if (log.isTraceEnabled()) { @@ -222,16 +217,14 @@ public void logRetry(Logger log, String message, Throwable t) { public void logRetry(Logger log, String message) { // log the first time as debug, and then after every logInterval as a warning - NanoTime now = NanoTime.now(); - if (hasNeverLogged) { + if (lastRetryLogTimer == null) { if (log.isDebugEnabled()) { log.debug(getMessage(message)); } - hasNeverLogged = false; - lastRetryLog = now; - } else if (now.subtract(lastRetryLog).compareTo(logInterval) > 0) { + lastRetryLogTimer = Timer.startNew(); + } else if (lastRetryLogTimer.hasElapsed(logInterval)) { log.warn(getMessage(message)); - lastRetryLog = now; + lastRetryLogTimer.restart(); hasLoggedWarn = true; } else { if (log.isTraceEnabled()) { @@ -251,14 +244,15 @@ private String getMessage(String message, Throwable t) { } public void logCompletion(Logger log, String operationDescription) { - if (!hasNeverLogged) { - var message = operationDescription + " completed after " + (retriesDone + 1) - + " retries and is no longer retrying."; - if (hasLoggedWarn) { - log.info(message); - } else { - log.debug(message); - } + if (lastRetryLogTimer == null) { // have never logged a retry + return; + } + var message = operationDescription + " completed after " + (retriesDone + 1) + + " retries and is no longer retrying."; + if (hasLoggedWarn) { + log.info(message); + } else { + log.debug(message); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java index 043d7d963c5..4a96d4c1cde 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java +++ b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java @@ -18,8 +18,11 @@ */ package org.apache.accumulo.server.mem; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.concurrent.locks.Lock; @@ -29,7 +32,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.util.Halt; -import org.apache.accumulo.core.util.time.NanoTime; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +54,7 @@ public enum DetectionScope { private long lastMemorySize = 0; private int lowMemCount = 0; - private NanoTime lastMemoryCheckTime = null; + private Timer lastMemoryCheckTime = null; private final Lock memCheckTimeLock = new ReentrantLock(); private volatile boolean runningLowOnMemory = false; @@ -104,7 +107,7 @@ public void logGCInfo(AccumuloConfiguration conf) { memCheckTimeLock.lock(); try { - final NanoTime now = NanoTime.now(); + final Timer currentTimer = Timer.startNew(); List gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); @@ -173,25 +176,24 @@ public void logGCInfo(AccumuloConfiguration conf) { LOG.debug(sb.toString()); } - final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - if (lastMemoryCheckTime != null && lastMemoryCheckTime.compareTo(now) < 0) { - final long diff = now.subtract(lastMemoryCheckTime).toMillis(); - if (diff > keepAliveTimeout + 1000) { + final Duration keepAliveTimeout = conf.getDuration(Property.INSTANCE_ZK_TIMEOUT); + if (lastMemoryCheckTime != null && lastMemoryCheckTime.hasElapsed(keepAliveTimeout)) { + if (currentTimer.hasElapsed(keepAliveTimeout.plus(Duration.ofSeconds(1)))) { LOG.warn(String.format( "GC pause checker not called in a timely" + " fashion. Expected every %.1f seconds but was %.1f seconds since last check", - keepAliveTimeout / 1000., diff / 1000.)); + keepAliveTimeout.toMillis() / 1000., currentTimer.elapsed(MILLISECONDS) / 1000.)); } - lastMemoryCheckTime = now; + lastMemoryCheckTime = currentTimer; return; } - if (maxIncreaseInCollectionTime > keepAliveTimeout) { + if (maxIncreaseInCollectionTime > keepAliveTimeout.toMillis()) { Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1); } lastMemorySize = freeMemory; - lastMemoryCheckTime = now; + lastMemoryCheckTime = currentTimer; } finally { memCheckTimeLock.unlock(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java index bb54f125197..7810d137c3f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java @@ -18,8 +18,10 @@ */ package org.apache.accumulo.server.rpc; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + import org.apache.accumulo.core.metrics.MetricsInfo; -import org.apache.accumulo.core.util.time.NanoTime; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.server.metrics.ThriftMetrics; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; @@ -32,25 +34,25 @@ public class TimedProcessor implements TProcessor { private final TProcessor other; private final ThriftMetrics thriftMetrics; - private NanoTime idleStart; + private final Timer idleTimer; public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) { this.other = next; thriftMetrics = new ThriftMetrics(); metricsInfo.addMetricsProducers(thriftMetrics); - idleStart = NanoTime.now(); + idleTimer = Timer.startNew(); } @Override public void process(TProtocol in, TProtocol out) throws TException { - NanoTime processStart = NanoTime.now(); - thriftMetrics.addIdle(processStart.subtract(idleStart).toMillis()); + thriftMetrics.addIdle(idleTimer.elapsed(MILLISECONDS)); + Timer processTimer = Timer.startNew(); try { other.process(in, out); } finally { - // set idle to now, calc time in process - idleStart = NanoTime.now(); - thriftMetrics.addExecute(idleStart.subtract(processStart).toMillis()); + // calc time in process, restart idle timer + thriftMetrics.addExecute(processTimer.elapsed(MILLISECONDS)); + idleTimer.restart(); } } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 6b758da4608..253c06ed53f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -20,6 +20,8 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import java.io.ByteArrayInputStream; @@ -92,7 +94,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.time.NanoTime; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionStats; @@ -1032,7 +1034,7 @@ synchronized void completeClose(boolean saveState, boolean completeClose) throws return currentlyUnreserved; }); - long lastLogTime = System.nanoTime(); + Timer lastLogTimer = Timer.startNew(); // wait for reads and writes to complete while (writesInProgress > 0 || !runningScans.isEmpty()) { @@ -1044,12 +1046,12 @@ synchronized void completeClose(boolean saveState, boolean completeClose) throws return currentlyUnreserved; }); - if (log.isDebugEnabled() && System.nanoTime() - lastLogTime > TimeUnit.SECONDS.toNanos(60)) { + if (log.isDebugEnabled() && lastLogTimer.hasElapsed(1, MINUTES)) { for (ScanDataSource activeScan : runningScans) { log.debug("Waiting on scan in completeClose {} {}", extent, activeScan); } - lastLogTime = System.nanoTime(); + lastLogTimer.restart(); } try { @@ -1770,7 +1772,7 @@ public void importDataFiles(long tid, Map fil // Clients timeout and will think that this operation failed. // Don't do it if we spent too long waiting for the lock - NanoTime now = NanoTime.now(); + Timer lockWaitTimer = Timer.startNew(); synchronized (this) { if (isClosed()) { throw new IOException("tablet " + extent + " is closed"); @@ -1789,9 +1791,8 @@ public void importDataFiles(long tid, Map fil throw new IllegalStateException(e); } - Duration lockWait = now.elapsed(); - if (lockWait.compareTo(rpcTimeout) > 0) { - throw new IOException("Timeout waiting " + lockWait.toSeconds() + if (lockWaitTimer.hasElapsed(rpcTimeout)) { + throw new IOException("Timeout waiting " + lockWaitTimer.elapsed(SECONDS) + " seconds to get tablet lock for " + extent + " " + tid); } } @@ -1801,9 +1802,8 @@ public void importDataFiles(long tid, Map fil throw new IOException("tablet " + extent + " is closed"); } - Duration lockWait = now.elapsed(); - if (lockWait.compareTo(rpcTimeout) > 0) { - throw new IOException("Timeout waiting " + lockWait.toSeconds() + if (lockWaitTimer.hasElapsed(rpcTimeout)) { + throw new IOException("Timeout waiting " + lockWaitTimer.elapsed(SECONDS) + " seconds to get tablet lock for " + extent + " " + tid); } From 18233c13645ea36a290e40c4bce218c3b52c2d03 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Tue, 28 Jan 2025 16:35:41 -0500 Subject: [PATCH 5/6] Fix logic in tryLock() --- .../accumulo/core/fate/zookeeper/DistributedReadWriteLock.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index c87361f7849..7cd5026029d 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -182,7 +182,7 @@ public boolean tryLock() { @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { Timer timer = Timer.startNew(); - while (timer.hasElapsed(time, unit)) { + while (!timer.hasElapsed(time, unit)) { if (tryLock()) { return true; } From 337ff213141a1ba8b3ce5da11e782105aceb528a Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Fri, 26 Dec 2025 12:22:05 -0500 Subject: [PATCH 6/6] add back code lost in merge --- .../accumulo/server/mem/LowMemoryDetector.java | 15 ++++++++------- .../apache/accumulo/tserver/tablet/Tablet.java | 3 ++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java index 6f20ec6b30b..514ed22081d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java +++ b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ public class LowMemoryDetector { private static class LowMemDetectorState { private long lastMemorySize = 0; private int lowMemCount = 0; - private long lastMemoryCheckTime = 0; + private Timer lastMemoryCheckTimer = null; private boolean runningLowOnMemory = false; } @@ -101,7 +102,6 @@ public synchronized void logGCInfo(AccumuloConfiguration conf) { memCheckTimeLock.lock(); try { LowMemDetectorState localState = state.get(); - final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); List gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); @@ -171,15 +171,16 @@ public synchronized void logGCInfo(AccumuloConfiguration conf) { } final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - if (localState.lastMemoryCheckTime > 0 && localState.lastMemoryCheckTime < now) { - final long diff = now - localState.lastMemoryCheckTime; - if (diff > keepAliveTimeout + 1000) { + if (localState.lastMemoryCheckTimer != null) { + if (localState.lastMemoryCheckTimer.hasElapsed(keepAliveTimeout + 1000, + TimeUnit.MILLISECONDS)) { + final long diff = localState.lastMemoryCheckTimer.elapsed(TimeUnit.MILLISECONDS); LOG.warn(String.format( "GC pause checker not called in a timely" + " fashion. Expected every %.1f seconds but was %.1f seconds since last check", keepAliveTimeout / 1000., diff / 1000.)); } - localState.lastMemoryCheckTime = now; + localState.lastMemoryCheckTimer.restart(); return; } @@ -188,7 +189,7 @@ public synchronized void logGCInfo(AccumuloConfiguration conf) { } localState.lastMemorySize = freeMemory; - localState.lastMemoryCheckTime = now; + localState.lastMemoryCheckTimer = Timer.startNew(); } finally { memCheckTimeLock.unlock(); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 145808269c0..e6d61f44e0c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -20,6 +20,7 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.toList; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; @@ -1587,7 +1588,7 @@ void bringMinorCompactionOnline(ReferencedTabletFile tmpDatafile, } catch (IOException ioe) { log.warn("Tablet {} failed to rename {} after MinC, will retry in 60 secs...", getExtent(), newDatafile, ioe); - sleepUninterruptibly(1, TimeUnit.MINUTES); + sleepUninterruptibly(1, MINUTES); } } while (true);