Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -78,6 +77,7 @@
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.thrift.TApplicationException;
Expand Down Expand Up @@ -1096,7 +1096,7 @@ private void cancelSession() throws InterruptedException, ThriftSecurityExceptio

final HostAndPort parsedServer = HostAndPort.fromString(location);

long startTime = System.nanoTime();
Timer timer = Timer.startNew();

boolean useCloseUpdate = false;

Expand Down Expand Up @@ -1170,7 +1170,7 @@ private void cancelSession() throws InterruptedException, ThriftSecurityExceptio
}

// if a timeout is set on the batch writer, then do not retry longer than the timeout
if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) > timeout) {
if (timer.hasElapsed(timeout, MILLISECONDS)) {
log.debug("Giving up on canceling session {} {} and timing out.", location, usid);
throw new TimedOutException(Set.of(location));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -180,15 +181,13 @@ public boolean tryLock() {

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long now = System.currentTimeMillis();
long returnTime = now + MILLISECONDS.convert(time, unit);
while (returnTime > now) {
Timer timer = Timer.startNew();
while (!timer.hasElapsed(time, unit)) {
if (tryLock()) {
return true;
}
// TODO: do something better than poll - ACCUMULO-1310
UtilWaitThread.sleep(100);
now = System.currentTimeMillis();
}
return false;
}
Expand Down
43 changes: 19 additions & 24 deletions core/src/main/java/org/apache/accumulo/core/util/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ public class Retry {
private Duration currentWait;
private Duration initialWait;

private boolean hasNeverLogged;
private boolean hasLoggedWarn = false;
private long lastRetryLog;
private Timer lastRetryLogTimer;
private double currentBackOffFactor;
private boolean doTimeJitter = true;

Expand All @@ -63,8 +62,7 @@ private Retry(long maxRetries, Duration startWait, Duration waitIncrement, Durat
this.currentWait = startWait;
this.initialWait = startWait;
this.logInterval = logInterval;
this.hasNeverLogged = true;
this.lastRetryLog = -1;
this.lastRetryLogTimer = null;
this.backOffFactor = backOffFactor;
this.currentBackOffFactor = this.backOffFactor;

Expand Down Expand Up @@ -201,16 +199,14 @@ protected void sleep(Duration wait) throws InterruptedException {

public void logRetry(Logger log, String message, Throwable t) {
// log the first time as debug, and then after every logInterval as a warning
long now = System.nanoTime();
if (hasNeverLogged) {
if (lastRetryLogTimer == null) {
if (log.isDebugEnabled()) {
log.debug(getMessage(message, t));
}
hasNeverLogged = false;
lastRetryLog = now;
} else if ((now - lastRetryLog) > logInterval.toNanos()) {
lastRetryLogTimer = Timer.startNew();
} else if (lastRetryLogTimer.hasElapsed(logInterval)) {
log.warn(getMessage(message), t);
lastRetryLog = now;
lastRetryLogTimer.restart();
hasLoggedWarn = true;
} else {
if (log.isTraceEnabled()) {
Expand All @@ -221,16 +217,14 @@ public void logRetry(Logger log, String message, Throwable t) {

public void logRetry(Logger log, String message) {
// log the first time as debug, and then after every logInterval as a warning
long now = System.nanoTime();
if (hasNeverLogged) {
if (lastRetryLogTimer == null) {
if (log.isDebugEnabled()) {
log.debug(getMessage(message));
}
hasNeverLogged = false;
lastRetryLog = now;
} else if ((now - lastRetryLog) > logInterval.toNanos()) {
lastRetryLogTimer = Timer.startNew();
} else if (lastRetryLogTimer.hasElapsed(logInterval)) {
log.warn(getMessage(message));
lastRetryLog = now;
lastRetryLogTimer.restart();
hasLoggedWarn = true;
} else {
if (log.isTraceEnabled()) {
Expand All @@ -250,14 +244,15 @@ private String getMessage(String message, Throwable t) {
}

public void logCompletion(Logger log, String operationDescription) {
if (!hasNeverLogged) {
var message = operationDescription + " completed after " + (retriesDone + 1)
+ " retries and is no longer retrying.";
if (hasLoggedWarn) {
log.info(message);
} else {
log.debug(message);
}
if (lastRetryLogTimer == null) { // have never logged a retry
return;
}
var message = operationDescription + " completed after " + (retriesDone + 1)
+ " retries and is no longer retrying.";
if (hasLoggedWarn) {
log.info(message);
} else {
log.debug(message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
*/
package org.apache.accumulo.server.mem;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,7 +54,7 @@ public enum DetectionScope {

private long lastMemorySize = 0;
private int lowMemCount = 0;
private long lastMemoryCheckTime = 0;
private Timer lastMemoryCheckTime = null;
private final Lock memCheckTimeLock = new ReentrantLock();
private volatile boolean runningLowOnMemory = false;

Expand Down Expand Up @@ -104,7 +107,7 @@ public void logGCInfo(AccumuloConfiguration conf) {

memCheckTimeLock.lock();
try {
final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
final Timer currentTimer = Timer.startNew();

List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();

Expand Down Expand Up @@ -173,25 +176,24 @@ public void logGCInfo(AccumuloConfiguration conf) {
LOG.debug(sb.toString());
}

final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
final long diff = now - lastMemoryCheckTime;
if (diff > keepAliveTimeout + 1000) {
final Duration keepAliveTimeout = conf.getDuration(Property.INSTANCE_ZK_TIMEOUT);
if (lastMemoryCheckTime != null && lastMemoryCheckTime.hasElapsed(keepAliveTimeout)) {
if (currentTimer.hasElapsed(keepAliveTimeout.plus(Duration.ofSeconds(1)))) {
LOG.warn(String.format(
"GC pause checker not called in a timely"
+ " fashion. Expected every %.1f seconds but was %.1f seconds since last check",
keepAliveTimeout / 1000., diff / 1000.));
keepAliveTimeout.toMillis() / 1000., currentTimer.elapsed(MILLISECONDS) / 1000.));
}
lastMemoryCheckTime = now;
lastMemoryCheckTime = currentTimer;
return;
}

if (maxIncreaseInCollectionTime > keepAliveTimeout) {
if (maxIncreaseInCollectionTime > keepAliveTimeout.toMillis()) {
Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1);
}

lastMemorySize = freeMemory;
lastMemoryCheckTime = now;
lastMemoryCheckTime = currentTimer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if there's a possibility of using reset for this, instead of replacing the timer? It seems odd to replace the timer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timer lastMemoryCheckTime is initialized as null and then the code checks if its null in places to see if the check has happened yet. We could probably achieve the same with one Timer and a boolean flag that indicates if the check has happened yet but this seemed like this best way to do things from what I can tell.

} finally {
memCheckTimeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
*/
package org.apache.accumulo.server.rpc;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.server.metrics.ThriftMetrics;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
Expand All @@ -33,25 +34,25 @@ public class TimedProcessor implements TProcessor {

private final TProcessor other;
private final ThriftMetrics thriftMetrics;
private long idleStart;
private final Timer idleTimer;

public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) {
this.other = next;
thriftMetrics = new ThriftMetrics();
metricsInfo.addMetricsProducers(thriftMetrics);
idleStart = System.nanoTime();
idleTimer = Timer.startNew();
}

@Override
public void process(TProtocol in, TProtocol out) throws TException {
long processStart = System.nanoTime();
thriftMetrics.addIdle(NANOSECONDS.toMillis(processStart - idleStart));
thriftMetrics.addIdle(idleTimer.elapsed(MILLISECONDS));
Timer processTimer = Timer.startNew();
try {
other.process(in, out);
} finally {
// set idle to now, calc time in process
idleStart = System.nanoTime();
thriftMetrics.addExecute(NANOSECONDS.toMillis(idleStart - processStart));
// calc time in process, restart idle timer
thriftMetrics.addExecute(processTimer.elapsed(MILLISECONDS));
idleTimer.restart();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.CompactionStats;
Expand Down Expand Up @@ -1031,7 +1034,7 @@ synchronized void completeClose(boolean saveState, boolean completeClose) throws
return currentlyUnreserved;
});

long lastLogTime = System.nanoTime();
Timer lastLogTimer = Timer.startNew();

// wait for reads and writes to complete
while (writesInProgress > 0 || !runningScans.isEmpty()) {
Expand All @@ -1043,12 +1046,12 @@ synchronized void completeClose(boolean saveState, boolean completeClose) throws
return currentlyUnreserved;
});

if (log.isDebugEnabled() && System.nanoTime() - lastLogTime > TimeUnit.SECONDS.toNanos(60)) {
if (log.isDebugEnabled() && lastLogTimer.hasElapsed(1, MINUTES)) {
for (ScanDataSource activeScan : runningScans) {
log.debug("Waiting on scan in completeClose {} {}", extent, activeScan);
}

lastLogTime = System.nanoTime();
lastLogTimer.restart();
}

try {
Expand Down Expand Up @@ -1769,13 +1772,13 @@ public void importDataFiles(long tid, Map<ReferencedTabletFile,DataFileInfo> fil

// Clients timeout and will think that this operation failed.
// Don't do it if we spent too long waiting for the lock
long now = System.nanoTime();
Timer lockWaitTimer = Timer.startNew();
synchronized (this) {
if (isClosed()) {
throw new IOException("tablet " + extent + " is closed");
}

long rpcTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(
Duration rpcTimeout = Duration.ofMillis(
(long) (getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)
* 1.1));

Expand All @@ -1788,9 +1791,8 @@ public void importDataFiles(long tid, Map<ReferencedTabletFile,DataFileInfo> fil
throw new IllegalStateException(e);
}

long lockWait = System.nanoTime() - now;
if (lockWait > rpcTimeoutNanos) {
throw new IOException("Timeout waiting " + TimeUnit.NANOSECONDS.toSeconds(lockWait)
if (lockWaitTimer.hasElapsed(rpcTimeout)) {
throw new IOException("Timeout waiting " + lockWaitTimer.elapsed(SECONDS)
+ " seconds to get tablet lock for " + extent + " " + tid);
}
}
Expand All @@ -1800,9 +1802,8 @@ public void importDataFiles(long tid, Map<ReferencedTabletFile,DataFileInfo> fil
throw new IOException("tablet " + extent + " is closed");
}

long lockWait = System.nanoTime() - now;
if (lockWait > rpcTimeoutNanos) {
throw new IOException("Timeout waiting " + TimeUnit.NANOSECONDS.toSeconds(lockWait)
if (lockWaitTimer.hasElapsed(rpcTimeout)) {
throw new IOException("Timeout waiting " + lockWaitTimer.elapsed(SECONDS)
+ " seconds to get tablet lock for " + extent + " " + tid);
}

Expand Down