diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index ab3bb1a1a59..358d46a886a 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -45,7 +45,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -80,6 +79,7 @@ import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.thrift.TApplicationException; @@ -1094,7 +1094,7 @@ private void cancelSession() throws InterruptedException, ThriftSecurityExceptio final HostAndPort parsedServer = HostAndPort.fromString(location); - long startTime = System.nanoTime(); + Timer timer = Timer.startNew(); // If somethingFailed is true then the batch writer will throw an exception on close or // flush, so no need to close this session. Only want to close the session for retryable @@ -1147,7 +1147,7 @@ private void cancelSession() throws InterruptedException, ThriftSecurityExceptio } // if a timeout is set on the batch writer, then do not retry longer than the timeout - if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) > timeout) { + if (timer.hasElapsed(timeout, MILLISECONDS)) { log.debug("Giving up on canceling session {} {} and timing out.", location, usid); throw new TimedOutException(Set.of(location)); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index 116c5850e0c..afa55e73b71 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockEntry; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,15 +148,13 @@ public boolean tryLock() { @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - long now = System.currentTimeMillis(); - long returnTime = now + MILLISECONDS.convert(time, unit); - while (returnTime > now) { + Timer timer = Timer.startNew(); + while (!timer.hasElapsed(time, unit)) { if (tryLock()) { return true; } // TODO: do something better than poll - ACCUMULO-1310 UtilWaitThread.sleep(100); - now = System.currentTimeMillis(); } return false; } diff --git a/core/src/main/java/org/apache/accumulo/core/util/Retry.java b/core/src/main/java/org/apache/accumulo/core/util/Retry.java index 96f44523820..8fc7c78a9f6 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/Retry.java +++ b/core/src/main/java/org/apache/accumulo/core/util/Retry.java @@ -41,9 +41,8 @@ public class Retry { private Duration currentWait; private Duration initialWait; - private boolean hasNeverLogged; private boolean hasLoggedWarn = false; - private long lastRetryLog; + private Timer lastRetryLogTimer; private double currentBackOffFactor; private boolean doTimeJitter = true; @@ -63,8 +62,7 @@ private Retry(long maxRetries, Duration startWait, Duration waitIncrement, Durat this.currentWait = startWait; this.initialWait = startWait; this.logInterval = logInterval; - this.hasNeverLogged = true; - this.lastRetryLog = -1; + this.lastRetryLogTimer = null; this.backOffFactor = backOffFactor; this.currentBackOffFactor = this.backOffFactor; @@ -201,16 +199,14 @@ protected void sleep(Duration wait) throws InterruptedException { public void logRetry(Logger log, String message, Throwable t) { // log the first time as debug, and then after every logInterval as a warning - long now = System.nanoTime(); - if (hasNeverLogged) { + if (lastRetryLogTimer == null) { if (log.isDebugEnabled()) { log.debug(getMessage(message, t)); } - hasNeverLogged = false; - lastRetryLog = now; - } else if ((now - lastRetryLog) > logInterval.toNanos()) { + lastRetryLogTimer = Timer.startNew(); + } else if (lastRetryLogTimer.hasElapsed(logInterval)) { log.warn(getMessage(message), t); - lastRetryLog = now; + lastRetryLogTimer.restart(); hasLoggedWarn = true; } else { if (log.isTraceEnabled()) { @@ -221,16 +217,14 @@ public void logRetry(Logger log, String message, Throwable t) { public void logRetry(Logger log, String message) { // log the first time as debug, and then after every logInterval as a warning - long now = System.nanoTime(); - if (hasNeverLogged) { + if (lastRetryLogTimer == null) { if (log.isDebugEnabled()) { log.debug(getMessage(message)); } - hasNeverLogged = false; - lastRetryLog = now; - } else if ((now - lastRetryLog) > logInterval.toNanos()) { + lastRetryLogTimer = Timer.startNew(); + } else if (lastRetryLogTimer.hasElapsed(logInterval)) { log.warn(getMessage(message)); - lastRetryLog = now; + lastRetryLogTimer.restart(); hasLoggedWarn = true; } else { if (log.isTraceEnabled()) { @@ -250,14 +244,15 @@ private String getMessage(String message, Throwable t) { } public void logCompletion(Logger log, String operationDescription) { - if (!hasNeverLogged) { - var message = operationDescription + " completed after " + (retriesDone + 1) - + " retries and is no longer retrying."; - if (hasLoggedWarn) { - log.info(message); - } else { - log.debug(message); - } + if (lastRetryLogTimer == null) { // have never logged a retry + return; + } + var message = operationDescription + " completed after " + (retriesDone + 1) + + " retries and is no longer retrying."; + if (hasLoggedWarn) { + log.info(message); + } else { + log.debug(message); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java index 6f20ec6b30b..514ed22081d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java +++ b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ public class LowMemoryDetector { private static class LowMemDetectorState { private long lastMemorySize = 0; private int lowMemCount = 0; - private long lastMemoryCheckTime = 0; + private Timer lastMemoryCheckTimer = null; private boolean runningLowOnMemory = false; } @@ -101,7 +102,6 @@ public synchronized void logGCInfo(AccumuloConfiguration conf) { memCheckTimeLock.lock(); try { LowMemDetectorState localState = state.get(); - final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); List gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); @@ -171,15 +171,16 @@ public synchronized void logGCInfo(AccumuloConfiguration conf) { } final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - if (localState.lastMemoryCheckTime > 0 && localState.lastMemoryCheckTime < now) { - final long diff = now - localState.lastMemoryCheckTime; - if (diff > keepAliveTimeout + 1000) { + if (localState.lastMemoryCheckTimer != null) { + if (localState.lastMemoryCheckTimer.hasElapsed(keepAliveTimeout + 1000, + TimeUnit.MILLISECONDS)) { + final long diff = localState.lastMemoryCheckTimer.elapsed(TimeUnit.MILLISECONDS); LOG.warn(String.format( "GC pause checker not called in a timely" + " fashion. Expected every %.1f seconds but was %.1f seconds since last check", keepAliveTimeout / 1000., diff / 1000.)); } - localState.lastMemoryCheckTime = now; + localState.lastMemoryCheckTimer.restart(); return; } @@ -188,7 +189,7 @@ public synchronized void logGCInfo(AccumuloConfiguration conf) { } localState.lastMemorySize = freeMemory; - localState.lastMemoryCheckTime = now; + localState.lastMemoryCheckTimer = Timer.startNew(); } finally { memCheckTimeLock.unlock(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java index 4148cfb0ed2..7810d137c3f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java @@ -18,9 +18,10 @@ */ package org.apache.accumulo.server.rpc; -import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import org.apache.accumulo.core.metrics.MetricsInfo; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.server.metrics.ThriftMetrics; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; @@ -33,25 +34,25 @@ public class TimedProcessor implements TProcessor { private final TProcessor other; private final ThriftMetrics thriftMetrics; - private long idleStart; + private final Timer idleTimer; public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) { this.other = next; thriftMetrics = new ThriftMetrics(); metricsInfo.addMetricsProducers(thriftMetrics); - idleStart = System.nanoTime(); + idleTimer = Timer.startNew(); } @Override public void process(TProtocol in, TProtocol out) throws TException { - long processStart = System.nanoTime(); - thriftMetrics.addIdle(NANOSECONDS.toMillis(processStart - idleStart)); + thriftMetrics.addIdle(idleTimer.elapsed(MILLISECONDS)); + Timer processTimer = Timer.startNew(); try { other.process(in, out); } finally { - // set idle to now, calc time in process - idleStart = System.nanoTime(); - thriftMetrics.addExecute(NANOSECONDS.toMillis(idleStart - processStart)); + // calc time in process, restart idle timer + thriftMetrics.addExecute(processTimer.elapsed(MILLISECONDS)); + idleTimer.restart(); } } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 1db20cf550a..e6d61f44e0c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -20,6 +20,7 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.toList; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; @@ -86,6 +87,7 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.compaction.CompactionStats; import org.apache.accumulo.server.fs.VolumeManager; @@ -928,7 +930,7 @@ void completeClose(boolean saveState) throws IOException { return currentlyUnreserved; }); - long lastLogTime = System.nanoTime(); + Timer lastLogTimer = Timer.startNew(); // wait for reads and writes to complete while (writesInProgress > 0 || !runningScans.isEmpty()) { @@ -940,13 +942,12 @@ void completeClose(boolean saveState) throws IOException { return currentlyUnreserved; }); - if (log.isDebugEnabled() - && System.nanoTime() - lastLogTime > TimeUnit.SECONDS.toNanos(60)) { + if (log.isDebugEnabled() && lastLogTimer.hasElapsed(1, MINUTES)) { for (ScanDataSource activeScan : runningScans) { log.debug("Waiting on scan in completeClose {} {}", extent, activeScan); } - lastLogTime = System.nanoTime(); + lastLogTimer.restart(); } try { @@ -1587,7 +1588,7 @@ void bringMinorCompactionOnline(ReferencedTabletFile tmpDatafile, } catch (IOException ioe) { log.warn("Tablet {} failed to rename {} after MinC, will retry in 60 secs...", getExtent(), newDatafile, ioe); - sleepUninterruptibly(1, TimeUnit.MINUTES); + sleepUninterruptibly(1, MINUTES); } } while (true);