Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
Expand All @@ -31,6 +30,8 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.apache.hadoop.classification.InterfaceAudience;
Expand All @@ -39,13 +40,13 @@
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.metastore.utils.RetryingExecutor;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -71,7 +72,6 @@ public class RetryingMetaStoreClient implements InvocationHandler {
private long lastConnectionTime;
private boolean localMetaStore;


protected RetryingMetaStoreClient(Configuration conf, Class<?>[] constructorArgTypes,
Object[] constructorArgs, Map<String, Long> metaCallTimeMap,
Class<? extends IMetaStoreClient> msClientClass) throws MetaException {
Expand Down Expand Up @@ -174,11 +174,6 @@ private static IMetaStoreClient getProxy(Class<?>[] interfaces,

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object ret;
int retriesMade = 0;
TException caughtException;

boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class);
boolean allowRetry = true;
Annotation[] directives = method.getDeclaredAnnotations();
if(directives != null) {
Expand All @@ -188,73 +183,60 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
}
}
}
boolean finalAllowRetry = allowRetry;
Predicate<Throwable> retryPolicy = ex -> finalAllowRetry && !base.isLocalMetaStore() &&
TTransportException.class.isAssignableFrom(ex.getClass());
AtomicInteger retriesMade = new AtomicInteger(0);
return new RetryingExecutor<>(retryLimit, () -> {
reconnect(method, retriesMade);
Object ret;
if (metaCallTimeMap == null) {
ret = method.invoke(base, args);
} else {
// need to capture the timing
long startTime = System.currentTimeMillis();
ret = method.invoke(base, args);
long timeTaken = System.currentTimeMillis() - startTime;
addMethodTime(method, timeTaken);
}
return ret;
}).sleepInterval(retryDelaySeconds * 1000)
.commandName(method.getName())
.onRetry(retryPolicy)
.run();
}

while (true) {
try {
SecurityUtils.reloginExpiringKeytabUser();

if (allowReconnect) {
if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) {
if (this.ugi != null) {
// Perform reconnect with the proper user context
try {
LOG.info("RetryingMetaStoreClient trying reconnect as " + this.ugi);

this.ugi.doAs(
new PrivilegedExceptionAction<Object> () {
@Override
public Object run() throws MetaException {
base.reconnect();
return null;
}
});
} catch (UndeclaredThrowableException e) {
Throwable te = e.getCause();
if (te instanceof PrivilegedActionException) {
throw te.getCause();
} else {
throw te;
}
}
lastConnectionTime = System.currentTimeMillis();
private void reconnect(Method method, AtomicInteger retriesMade) throws Exception {
SecurityUtils.reloginExpiringKeytabUser();
boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class);
if (allowReconnect) {
if (retriesMade.getAndIncrement() > 0 || hasConnectionLifeTimeReached(method)) {
if (this.ugi != null) {
// Perform reconnect with the proper user context
try {
LOG.info("RetryingMetaStoreClient trying reconnect as " + this.ugi);

this.ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
base.reconnect();
return null;
});
} catch (UndeclaredThrowableException e) {
Throwable te = e.getCause();
if (te instanceof PrivilegedActionException pe) {
throw pe;
} else if (te instanceof Exception ex){
throw ex;
} else {
LOG.warn("RetryingMetaStoreClient unable to reconnect. No UGI information.");
throw new MetaException("UGI information unavailable. Will not attempt a reconnect.");
throw new RuntimeException(te);
}
}
}

if (metaCallTimeMap == null) {
ret = method.invoke(base, args);
lastConnectionTime = System.currentTimeMillis();
} else {
// need to capture the timing
long startTime = System.currentTimeMillis();
ret = method.invoke(base, args);
long timeTaken = System.currentTimeMillis() - startTime;
addMethodTime(method, timeTaken);
LOG.warn("RetryingMetaStoreClient unable to reconnect. No UGI information.");
throw new MetaException("UGI information unavailable. Will not attempt a reconnect.");
}
break;
} catch (UndeclaredThrowableException e) {
throw e.getCause();
} catch (InvocationTargetException e) {
Throwable t = e.getCause();
// Metastore client needs retry for only TTransportException.
if (TTransportException.class.isAssignableFrom(t.getClass())) {
caughtException = (TTransportException) t;
} else {
throw t;
}
}

if (retriesMade >= retryLimit || base.isLocalMetaStore() || !allowRetry) {
throw caughtException;
}
retriesMade++;
LOG.warn("MetaStoreClient lost connection. Attempting to reconnect (" + retriesMade + " of " +
retryLimit + ") after " + retryDelaySeconds + "s. " + method.getName(), caughtException);
Thread.sleep(retryDelaySeconds * 1000);
}
return ret;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
Expand All @@ -44,6 +45,7 @@
import org.apache.hadoop.hive.metastore.utils.FilterUtils;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.RetryingExecutor;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
Expand Down Expand Up @@ -92,6 +94,7 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.createThriftPartitionsReq;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
Expand Down Expand Up @@ -676,8 +679,6 @@ private TTransport createBinaryClient(URI store, boolean useSSL) throws TTranspo

private void open() throws MetaException {
isConnected = false;
TTransportException tte = null;
MetaException recentME = null;
boolean useSSL = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_SSL);
boolean useSasl = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_THRIFT_SASL);
String clientAuthMode = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_AUTH_MODE);
Expand All @@ -689,91 +690,63 @@ private void open() throws MetaException {
if (clientAuthMode != null) {
usePasswordAuth = "PLAIN".equalsIgnoreCase(clientAuthMode);
}

for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
for (URI store : metastoreUris) {
LOG.info("Trying to connect to metastore with URI ({}) in {} transport mode", store,
transportMode);
try {
try {
if (isHttpTransportMode) {
transport = createHttpClient(store, useSSL);
} else {
transport = createBinaryClient(store, useSSL);
}
} catch (TTransportException te) {
tte = te;
throw new MetaException(te.toString());
}

final TProtocol protocol;
if (useCompactProtocol) {
protocol = new TCompactProtocol(transport);
} else {
protocol = new TBinaryProtocol(transport);
}
client = new ThriftHiveMetastore.Client(protocol);
try {
if (!transport.isOpen()) {
transport.open();
final int newCount = connCount.incrementAndGet();
if (useSSL) {
LOG.info(
"Opened an SSL connection to metastore, current connections: {}",
newCount);
if (LOG.isTraceEnabled()) {
LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]",
System.identityHashCode(this), new Exception());
}
} else {
LOG.info("Opened a connection to metastore, URI ({}) "
+ "current connections: {}", store, newCount);
if (LOG.isTraceEnabled()) {
LOG.trace("METASTORE CONNECTION TRACE - open [{}]",
System.identityHashCode(this), new Exception());
}
}
}
isConnected = true;
} catch (TTransportException e) {
tte = e;
String errMsg = String.format("Failed to connect to the MetaStore Server URI (%s) in %s "
+ "transport mode", store, transportMode);
LOG.warn(errMsg);
LOG.debug(errMsg, e);
AtomicReference<Throwable> recentEx = new AtomicReference<>();
Predicate<Throwable> retryPolicy = ex -> {
recentEx.set(ex);
return !isConnected && (ex instanceof MetaException || ex instanceof TTransportException);
};
AtomicInteger inx = new AtomicInteger(0);
isConnected = new RetryingExecutor<>(retries * metastoreUris.length, () -> {
URI store = metastoreUris[inx.getAndIncrement() % metastoreUris.length];
LOG.info("Trying to connect to metastore with URI ({}) in {} transport mode", store, transportMode);
if (isHttpTransportMode) {
transport = createHttpClient(store, useSSL);
} else {
transport = createBinaryClient(store, useSSL);
}
final TProtocol protocol;
if (useCompactProtocol) {
protocol = new TCompactProtocol(transport);
} else {
protocol = new TBinaryProtocol(transport);
}
client = new ThriftHiveMetastore.Client(protocol);
if (!transport.isOpen()) {
transport.open();
final int newCount = connCount.incrementAndGet();
if (useSSL) {
LOG.info(
"Opened an SSL connection to metastore, current connections: {}",
newCount);
if (LOG.isTraceEnabled()) {
LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]",
System.identityHashCode(this), new Exception());
}

if (isConnected && !useSasl && !usePasswordAuth && !isHttpTransportMode &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.EXECUTE_SET_UGI)) {
// Call set_ugi, only in unsecure mode.
try {
UserGroupInformation ugi = SecurityUtils.getUGI();
client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames()));
} catch (IOException e) {
LOG.warn("Failed to find ugi of client set_ugi() is not successful, Continuing without it.", e);
} catch (TException e) {
LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. "
+ "Continuing without it.", e);
}
} else {
LOG.info("Opened a connection to metastore, URI ({}) "
+ "current connections: {}", store, newCount);
if (LOG.isTraceEnabled()) {
LOG.trace("METASTORE CONNECTION TRACE - open [{}]",
System.identityHashCode(this), new Exception());
}
} catch (MetaException e) {
recentME = e;
String errMsg = "Failed to connect to metastore with URI (" + store
+ ") transport mode:" + transportMode + " in attempt " + attempt;
LOG.error(errMsg, e);
}
if (isConnected) {
// Set the beeline session modified metaConfVars for new HMS connection
overlaySessionModifiedMetaConf();
break;
}
}
// Wait before launching the next round of connection retries.
if (!isConnected && retryDelaySeconds > 0) {
try {
LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt.");
Thread.sleep(retryDelaySeconds * 1000);
} catch (InterruptedException ignore) {}
return true;
}).sleepInterval(retryDelaySeconds * 1000)
.commandName("open")
.onRetry(retryPolicy).runWithMetaException();

if (!useSasl && !usePasswordAuth && !isHttpTransportMode &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.EXECUTE_SET_UGI)) {
// Call set_ugi, only in unsecure mode.
try {
UserGroupInformation ugi = SecurityUtils.getUGI();
client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames()));
} catch (IOException e) {
LOG.warn("Failed to find ugi of client set_ugi() is not successful, Continuing without it.", e);
} catch (TException e) {
LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. "
+ "Continuing without it.", e);
}
}

Expand All @@ -782,15 +755,14 @@ private void open() throws MetaException {
// be null. When MetaException wraps TTransportException, tte will be set so stringify that
// directly.
String exceptionString = "Unknown exception";
if (tte != null) {
exceptionString = StringUtils.stringifyException(tte);
} else if (recentME != null) {
exceptionString = StringUtils.stringifyException(recentME);
if (recentEx.get() != null) {
exceptionString = StringUtils.stringifyException(recentEx.get());
}
throw new MetaException("Could not connect to meta store using any of the URIs provided." +
" Most recent failure: " + exceptionString);
}

// Set the beeline session modified metaConfVars for new HMS connection
overlaySessionModifiedMetaConf();
snapshotActiveConf();
}

Expand Down
Loading