From 9a3fc68fab129b000fea2838977abb8b55897edc Mon Sep 17 00:00:00 2001 From: zdeng Date: Thu, 6 Nov 2025 12:27:12 +0800 Subject: [PATCH] HIVE-29385: Use RetryingExecutor elsewhere in HMS --- .../metastore/RetryingMetaStoreClient.java | 116 ++++++------- .../client/ThriftHiveMetaStoreClient.java | 148 +++++++---------- .../metastore/utils/RetryingExecutor.java | 19 ++- .../hadoop/hive/metastore/HiveMetaStore.java | 28 +--- .../hadoop/hive/metastore/ObjectStore.java | 8 +- .../hadoop/hive/metastore/RawStoreProxy.java | 4 +- .../hive/metastore/RetryingHMSHandler.java | 155 +++++------------- .../metastore/leader/LeaseLeaderElection.java | 59 +++---- 8 files changed, 196 insertions(+), 341 deletions(-) rename standalone-metastore/{metastore-server => metastore-common}/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java (91%) diff --git a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java index 0cf9901fd2ad..26b7687fadd1 100644 --- a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java +++ b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.lang.reflect.UndeclaredThrowableException; @@ -31,6 +30,8 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.hadoop.classification.InterfaceAudience; @@ -39,13 +40,13 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.hive.metastore.utils.RetryingExecutor; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.metastore.annotation.NoReconnect; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import com.google.common.annotations.VisibleForTesting; @@ -71,7 +72,6 @@ public class RetryingMetaStoreClient implements InvocationHandler { private long lastConnectionTime; private boolean localMetaStore; - protected RetryingMetaStoreClient(Configuration conf, Class[] constructorArgTypes, Object[] constructorArgs, Map metaCallTimeMap, Class msClientClass) throws MetaException { @@ -174,11 +174,6 @@ private static IMetaStoreClient getProxy(Class[] interfaces, @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - Object ret; - int retriesMade = 0; - TException caughtException; - - boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class); boolean allowRetry = true; Annotation[] directives = method.getDeclaredAnnotations(); if(directives != null) { @@ -188,73 +183,60 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl } } } + boolean finalAllowRetry = allowRetry; + Predicate retryPolicy = ex -> finalAllowRetry && !base.isLocalMetaStore() && + TTransportException.class.isAssignableFrom(ex.getClass()); + AtomicInteger retriesMade = new AtomicInteger(0); + return new RetryingExecutor<>(retryLimit, () -> { + reconnect(method, retriesMade); + Object ret; + if (metaCallTimeMap == null) { + ret = method.invoke(base, args); + } else { + // need to capture the timing + long startTime = System.currentTimeMillis(); + ret = method.invoke(base, args); + long timeTaken = System.currentTimeMillis() - startTime; + addMethodTime(method, timeTaken); + } + return ret; + }).sleepInterval(retryDelaySeconds * 1000) + .commandName(method.getName()) + .onRetry(retryPolicy) + .run(); + } - while (true) { - try { - SecurityUtils.reloginExpiringKeytabUser(); - - if (allowReconnect) { - if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { - if (this.ugi != null) { - // Perform reconnect with the proper user context - try { - LOG.info("RetryingMetaStoreClient trying reconnect as " + this.ugi); - - this.ugi.doAs( - new PrivilegedExceptionAction () { - @Override - public Object run() throws MetaException { - base.reconnect(); - return null; - } - }); - } catch (UndeclaredThrowableException e) { - Throwable te = e.getCause(); - if (te instanceof PrivilegedActionException) { - throw te.getCause(); - } else { - throw te; - } - } - lastConnectionTime = System.currentTimeMillis(); + private void reconnect(Method method, AtomicInteger retriesMade) throws Exception { + SecurityUtils.reloginExpiringKeytabUser(); + boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class); + if (allowReconnect) { + if (retriesMade.getAndIncrement() > 0 || hasConnectionLifeTimeReached(method)) { + if (this.ugi != null) { + // Perform reconnect with the proper user context + try { + LOG.info("RetryingMetaStoreClient trying reconnect as " + this.ugi); + + this.ugi.doAs((PrivilegedExceptionAction) () -> { + base.reconnect(); + return null; + }); + } catch (UndeclaredThrowableException e) { + Throwable te = e.getCause(); + if (te instanceof PrivilegedActionException pe) { + throw pe; + } else if (te instanceof Exception ex){ + throw ex; } else { - LOG.warn("RetryingMetaStoreClient unable to reconnect. No UGI information."); - throw new MetaException("UGI information unavailable. Will not attempt a reconnect."); + throw new RuntimeException(te); } } - } - - if (metaCallTimeMap == null) { - ret = method.invoke(base, args); + lastConnectionTime = System.currentTimeMillis(); } else { - // need to capture the timing - long startTime = System.currentTimeMillis(); - ret = method.invoke(base, args); - long timeTaken = System.currentTimeMillis() - startTime; - addMethodTime(method, timeTaken); + LOG.warn("RetryingMetaStoreClient unable to reconnect. No UGI information."); + throw new MetaException("UGI information unavailable. Will not attempt a reconnect."); } - break; - } catch (UndeclaredThrowableException e) { - throw e.getCause(); - } catch (InvocationTargetException e) { - Throwable t = e.getCause(); - // Metastore client needs retry for only TTransportException. - if (TTransportException.class.isAssignableFrom(t.getClass())) { - caughtException = (TTransportException) t; - } else { - throw t; - } - } - - if (retriesMade >= retryLimit || base.isLocalMetaStore() || !allowRetry) { - throw caughtException; } - retriesMade++; - LOG.warn("MetaStoreClient lost connection. Attempting to reconnect (" + retriesMade + " of " + - retryLimit + ") after " + retryDelaySeconds + "s. " + method.getName(), caughtException); - Thread.sleep(retryDelaySeconds * 1000); } - return ret; } /** diff --git a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/client/ThriftHiveMetaStoreClient.java b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/client/ThriftHiveMetaStoreClient.java index 60d20aba8fe2..3ab86b65b588 100644 --- a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/client/ThriftHiveMetaStoreClient.java +++ b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/client/ThriftHiveMetaStoreClient.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.collect.Lists; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.metastore.utils.FilterUtils; import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.RetryingExecutor; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; @@ -92,6 +94,7 @@ import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.createThriftPartitionsReq; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; @@ -676,8 +679,6 @@ private TTransport createBinaryClient(URI store, boolean useSSL) throws TTranspo private void open() throws MetaException { isConnected = false; - TTransportException tte = null; - MetaException recentME = null; boolean useSSL = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_SSL); boolean useSasl = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_THRIFT_SASL); String clientAuthMode = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_AUTH_MODE); @@ -689,91 +690,63 @@ private void open() throws MetaException { if (clientAuthMode != null) { usePasswordAuth = "PLAIN".equalsIgnoreCase(clientAuthMode); } - - for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { - for (URI store : metastoreUris) { - LOG.info("Trying to connect to metastore with URI ({}) in {} transport mode", store, - transportMode); - try { - try { - if (isHttpTransportMode) { - transport = createHttpClient(store, useSSL); - } else { - transport = createBinaryClient(store, useSSL); - } - } catch (TTransportException te) { - tte = te; - throw new MetaException(te.toString()); - } - - final TProtocol protocol; - if (useCompactProtocol) { - protocol = new TCompactProtocol(transport); - } else { - protocol = new TBinaryProtocol(transport); - } - client = new ThriftHiveMetastore.Client(protocol); - try { - if (!transport.isOpen()) { - transport.open(); - final int newCount = connCount.incrementAndGet(); - if (useSSL) { - LOG.info( - "Opened an SSL connection to metastore, current connections: {}", - newCount); - if (LOG.isTraceEnabled()) { - LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]", - System.identityHashCode(this), new Exception()); - } - } else { - LOG.info("Opened a connection to metastore, URI ({}) " - + "current connections: {}", store, newCount); - if (LOG.isTraceEnabled()) { - LOG.trace("METASTORE CONNECTION TRACE - open [{}]", - System.identityHashCode(this), new Exception()); - } - } - } - isConnected = true; - } catch (TTransportException e) { - tte = e; - String errMsg = String.format("Failed to connect to the MetaStore Server URI (%s) in %s " - + "transport mode", store, transportMode); - LOG.warn(errMsg); - LOG.debug(errMsg, e); + AtomicReference recentEx = new AtomicReference<>(); + Predicate retryPolicy = ex -> { + recentEx.set(ex); + return !isConnected && (ex instanceof MetaException || ex instanceof TTransportException); + }; + AtomicInteger inx = new AtomicInteger(0); + isConnected = new RetryingExecutor<>(retries * metastoreUris.length, () -> { + URI store = metastoreUris[inx.getAndIncrement() % metastoreUris.length]; + LOG.info("Trying to connect to metastore with URI ({}) in {} transport mode", store, transportMode); + if (isHttpTransportMode) { + transport = createHttpClient(store, useSSL); + } else { + transport = createBinaryClient(store, useSSL); + } + final TProtocol protocol; + if (useCompactProtocol) { + protocol = new TCompactProtocol(transport); + } else { + protocol = new TBinaryProtocol(transport); + } + client = new ThriftHiveMetastore.Client(protocol); + if (!transport.isOpen()) { + transport.open(); + final int newCount = connCount.incrementAndGet(); + if (useSSL) { + LOG.info( + "Opened an SSL connection to metastore, current connections: {}", + newCount); + if (LOG.isTraceEnabled()) { + LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]", + System.identityHashCode(this), new Exception()); } - - if (isConnected && !useSasl && !usePasswordAuth && !isHttpTransportMode && - MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.EXECUTE_SET_UGI)) { - // Call set_ugi, only in unsecure mode. - try { - UserGroupInformation ugi = SecurityUtils.getUGI(); - client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames())); - } catch (IOException e) { - LOG.warn("Failed to find ugi of client set_ugi() is not successful, Continuing without it.", e); - } catch (TException e) { - LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. " - + "Continuing without it.", e); - } + } else { + LOG.info("Opened a connection to metastore, URI ({}) " + + "current connections: {}", store, newCount); + if (LOG.isTraceEnabled()) { + LOG.trace("METASTORE CONNECTION TRACE - open [{}]", + System.identityHashCode(this), new Exception()); } - } catch (MetaException e) { - recentME = e; - String errMsg = "Failed to connect to metastore with URI (" + store - + ") transport mode:" + transportMode + " in attempt " + attempt; - LOG.error(errMsg, e); - } - if (isConnected) { - // Set the beeline session modified metaConfVars for new HMS connection - overlaySessionModifiedMetaConf(); - break; } } - // Wait before launching the next round of connection retries. - if (!isConnected && retryDelaySeconds > 0) { - try { - LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt."); - Thread.sleep(retryDelaySeconds * 1000); - } catch (InterruptedException ignore) {} + return true; + }).sleepInterval(retryDelaySeconds * 1000) + .commandName("open") + .onRetry(retryPolicy).runWithMetaException(); + + if (!useSasl && !usePasswordAuth && !isHttpTransportMode && + MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.EXECUTE_SET_UGI)) { + // Call set_ugi, only in unsecure mode. + try { + UserGroupInformation ugi = SecurityUtils.getUGI(); + client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames())); + } catch (IOException e) { + LOG.warn("Failed to find ugi of client set_ugi() is not successful, Continuing without it.", e); + } catch (TException e) { + LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. " + + "Continuing without it.", e); } } @@ -782,15 +755,14 @@ private void open() throws MetaException { // be null. When MetaException wraps TTransportException, tte will be set so stringify that // directly. String exceptionString = "Unknown exception"; - if (tte != null) { - exceptionString = StringUtils.stringifyException(tte); - } else if (recentME != null) { - exceptionString = StringUtils.stringifyException(recentME); + if (recentEx.get() != null) { + exceptionString = StringUtils.stringifyException(recentEx.get()); } throw new MetaException("Could not connect to meta store using any of the URIs provided." + " Most recent failure: " + exceptionString); } - + // Set the beeline session modified metaConfVars for new HMS connection + overlaySessionModifiedMetaConf(); snapshotActiveConf(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java similarity index 91% rename from standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java rename to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java index 74c9a44f6ef2..1a8596e8948b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +73,7 @@ public RetryingExecutor sleepInterval(long sleepInterval, return this; } - public T run() throws MetaException { + public T run() throws TException { while (true) { try { return command.call(); @@ -100,7 +101,7 @@ public T run() throws MetaException { } } - private Throwable checkException(Throwable e) throws MetaException { + private Throwable checkException(Throwable e) throws TException { Throwable cause = e; if (e instanceof InvocationTargetException || e instanceof UndeclaredThrowableException) { @@ -109,6 +110,9 @@ private Throwable checkException(Throwable e) throws MetaException { if (retryPolicy != null && !retryPolicy.test(cause)) { String message = "See a fatal exception, avoid to retry the command:" + commandName; LOG.error(message, cause); + if (cause instanceof TException te) { + throw te; + } String errorMessage = ExceptionUtils.getMessage(cause); Throwable rootCause = ExceptionUtils.getRootCause(e); errorMessage += (rootCause == null ? "" : ("\nRoot cause: " + rootCause)); @@ -117,6 +121,17 @@ private Throwable checkException(Throwable e) throws MetaException { return cause; } + public T runWithMetaException() throws MetaException { + try { + return run(); + } catch (TException te) { + if (te instanceof MetaException me) { + throw me; + } + throw new MetaException(te.getMessage()); + } + } + public static class RetryException extends Exception { private static final long serialVersionUID = 1L; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 1d9fbf3e3ae1..89bfc354070a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -113,9 +113,6 @@ public class HiveMetaStore extends ThriftHiveMetastore { private static ShutdownHookManager shutdownHookMgr; - /** MM write states. */ - public static final char MM_WRITE_OPEN = 'o', MM_WRITE_COMMITTED = 'c', MM_WRITE_ABORTED = 'a'; - static HadoopThriftAuthBridge.Server saslServer; private static MetastoreDelegationTokenManager delegationTokenManager; static boolean useSasl; @@ -125,26 +122,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { private static ThriftServer thriftServer; /** the servlet server. */ private static Server servletServer = null; - /** the port and path of the property servlet. */ - private static int propertyServletPort = -1; - - /** - * Gets the embedded servlet server. - * @return the server instance or null - */ - public static Server getServletServer() { - return servletServer; - } - /** - * Gets the property servlet connector port. - *

If configuration is 0, this port is allocated by the system.

- * @return the connector port or -1 if not configured - */ - public static int getPropertyServletPort() { - return propertyServletPort; - } - public static boolean isRenameAllowed(Database srcDB, Database destDB) { if (!srcDB.getName().equalsIgnoreCase(destDB.getName())) { if (ReplChangeManager.isSourceOfReplication(srcDB) || ReplChangeManager.isSourceOfReplication(destDB)) { @@ -716,13 +694,9 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, // optionally create and start the property and Iceberg REST server ServletServerBuilder builder = new ServletServerBuilder(conf); - ServletServerBuilder.Descriptor properties = builder.addServlet(PropertyServlet.createServlet(conf)); + builder.addServlet(PropertyServlet.createServlet(conf)); builder.addServlet(createCatalogServlet(conf)); servletServer = builder.start(LOG); - if (servletServer != null && properties != null) { - propertyServletPort = properties.getPort(); - } - // main server thriftServer.start(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index c9856fcd9125..6e21276c2683 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -9236,7 +9236,7 @@ public Map updateTableColumnStatistics(ColumnStatistics colStats return newParams; }).onRetry(e -> e instanceof RetryingExecutor.RetryException) .commandName("updateTableColumnStatistics").sleepInterval(sleepInterval, interval -> - ThreadLocalRandom.current().nextLong(sleepInterval) + 30).run(); + ThreadLocalRandom.current().nextLong(sleepInterval) + 30).runWithMetaException(); committed = commitTransaction(); return committed ? result : null; } finally { @@ -9327,7 +9327,7 @@ public Map updatePartitionColumnStatistics(Table table, MTable m return newParams; }).onRetry(e -> e instanceof RetryingExecutor.RetryException) .commandName("updatePartitionColumnStatistics").sleepInterval(sleepInterval, interval -> - ThreadLocalRandom.current().nextLong(sleepInterval) + 30).run(); + ThreadLocalRandom.current().nextLong(sleepInterval) + 30).runWithMetaException(); committed = commitTransaction(); return committed ? result : null; } finally { @@ -11133,14 +11133,14 @@ private void lockNotificationSequenceForUpdate() throws MetaException { new RetryingExecutor(maxRetries, () -> { directSql.lockDbTable("NOTIFICATION_SEQUENCE"); return null; - }).commandName("lockNotificationSequenceForUpdate").sleepInterval(sleepInterval).run(); + }).commandName("lockNotificationSequenceForUpdate").sleepInterval(sleepInterval).runWithMetaException(); } else { String selectQuery = "select \"NEXT_EVENT_ID\" from \"NOTIFICATION_SEQUENCE\""; String lockingQuery = sqlGenerator.addForUpdateClause(selectQuery); new RetryingExecutor(maxRetries, () -> { executePlainSQL(lockingQuery, false, null); return null; - }).commandName("lockNotificationSequenceForUpdate").sleepInterval(sleepInterval).run(); + }).commandName("lockNotificationSequenceForUpdate").sleepInterval(sleepInterval).runWithMetaException(); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java index a03b6b7e2e63..f163cf71f28c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java @@ -100,9 +100,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl Deadline.stopTimer(); } } - } catch (UndeclaredThrowableException e) { - throw e.getCause(); - } catch (InvocationTargetException e) { + } catch (UndeclaredThrowableException | InvocationTargetException e) { throw e.getCause(); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java index 0d52107ad095..fd4ce94fa897 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java @@ -18,10 +18,11 @@ package org.apache.hadoop.hive.metastore; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.lang.reflect.UndeclaredThrowableException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import javax.jdo.JDOException; @@ -30,20 +31,18 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.metastore.utils.RetryingExecutor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.datanucleus.exceptions.NucleusException; @InterfaceAudience.Private @InterfaceStability.Evolving public class RetryingHMSHandler extends AbstractHMSHandlerProxy { - private static final Logger LOG = LoggerFactory.getLogger(RetryingHMSHandler.class); private final long retryInterval; private final int retryLimit; + private final Predicate retryOnException; public RetryingHMSHandler(Configuration conf, IHMSHandler baseHandler, boolean local) throws MetaException { @@ -52,17 +51,14 @@ public RetryingHMSHandler(Configuration conf, IHMSHandler baseHandler, boolean l ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS); retryLimit = MetastoreConf.getIntVar(origConf, ConfVars.HMS_HANDLER_ATTEMPTS); - try { - //invoking init method of baseHandler this way since it adds the retry logic - //in case of transient failures in init method - invoke(baseHandler, baseHandler.getClass().getDeclaredMethod("init", (Class[]) null), - null); - } catch (Throwable e) { - LOG.error("HMSHandler Fatal error: " + ExceptionUtils.getStackTrace(e)); - MetaException me = new MetaException(e.getMessage()); - me.initCause(e); - throw me; - } + retryOnException = exception -> + (ExceptionUtils.stream(exception).anyMatch(e -> e instanceof JDOException) || + ExceptionUtils.stream(exception).anyMatch(e -> e instanceof NucleusException)) && + ExceptionUtils.stream(exception).anyMatch(DatabaseProduct::isRecoverableException); + + new RetryingExecutor<>(retryLimit, () -> { baseHandler.init(); return null;}) + .commandName("init").onRetry(retryOnException) + .sleepInterval(retryInterval).runWithMetaException(); } @Override @@ -71,113 +67,40 @@ protected void initBaseHandler() throws MetaException { } public Result invokeInternal(final Object proxy, final Method method, final Object[] args) throws Throwable { - boolean gotNewConnectUrl = false; + final AtomicBoolean gotNewConnectUrl = new AtomicBoolean(false); if (reloadConf) { MetaStoreInit.updateConnectionURL(origConf, getActiveConf(), null, metaStoreInitData); } - int retryCount = 0; - Throwable caughtException = null; - while (true) { + AtomicInteger retryCount = new AtomicInteger(0); + return new RetryingExecutor<>(retryLimit, () -> { + if (reloadConf || gotNewConnectUrl.get()) { + baseHandler.setConf(getActiveConf()); + } + Object object; + boolean isStarted = Deadline.startTimer(method.getName()); try { - if (reloadConf || gotNewConnectUrl) { - baseHandler.setConf(getActiveConf()); - } - Object object = null; - boolean isStarted = Deadline.startTimer(method.getName()); - try { - object = method.invoke(baseHandler, args); - } finally { - if (isStarted) { - Deadline.stopTimer(); - } + object = method.invoke(baseHandler, args); + } catch (Exception e) { + // If we have a connection error, the JDO connection URL hook might + // provide us with a new URL to access the datastore. + String lastUrl = MetaStoreInit.getConnectionURL(getActiveConf()); + gotNewConnectUrl.set(MetaStoreInit.updateConnectionURL(origConf, getActiveConf(), + lastUrl, metaStoreInitData)); + throw e; + } finally { + if (isStarted) { + Deadline.stopTimer(); } - StringBuilder additionalInfo = new StringBuilder(); - additionalInfo.append("retryCount=").append(retryCount) - .append(" error=").append(false); - return new Result(object, additionalInfo.toString()); - } catch (UndeclaredThrowableException e) { - if (e.getCause() != null) { - if (e.getCause() instanceof javax.jdo.JDOException) { - // Due to reflection, the jdo exception is wrapped in - // invocationTargetException - caughtException = e.getCause(); - } else if (e.getCause() instanceof MetaException && e.getCause().getCause() != null - && e.getCause().getCause() instanceof javax.jdo.JDOException) { - // The JDOException may be wrapped further in a MetaException - caughtException = e.getCause().getCause(); - } else { - LOG.error(ExceptionUtils.getStackTrace(e.getCause())); - throw e.getCause(); - } - } else { - LOG.error(ExceptionUtils.getStackTrace(e)); - throw e; - } - } catch (InvocationTargetException e) { - if (e.getCause() instanceof javax.jdo.JDOException) { - // Due to reflection, the jdo exception is wrapped in - // invocationTargetException - caughtException = e.getCause(); - } else if (e.getCause() instanceof NoSuchObjectException || e.getTargetException().getCause() instanceof NoSuchObjectException) { - String methodName = method.getName(); - if (!methodName.startsWith("get_database") && !methodName.startsWith("get_table") - && !methodName.startsWith("get_partition") && !methodName.startsWith("get_function") - && !methodName.startsWith("get_stored_procedure") && !methodName.startsWith("find_package")) { - LOG.error(ExceptionUtils.getStackTrace(e.getCause())); - } - throw e.getCause(); - } else if (e.getCause() instanceof MetaException && e.getCause().getCause() != null) { - if (e.getCause().getCause() instanceof javax.jdo.JDOException || - e.getCause().getCause() instanceof NucleusException) { - // The JDOException or the Nucleus Exception may be wrapped further in a MetaException - caughtException = e.getCause().getCause(); - } else if (e.getCause().getCause() instanceof DeadlineException) { - // The Deadline Exception needs no retry and be thrown immediately. - Deadline.clear(); - LOG.error("Error happens in method " + method.getName() + ": " + - ExceptionUtils.getStackTrace(e.getCause())); - throw e.getCause(); - } else { - LOG.error(ExceptionUtils.getStackTrace(e.getCause())); - throw e.getCause(); - } - } else { - LOG.error(ExceptionUtils.getStackTrace(e.getCause())); - throw e.getCause(); - } - } - - Throwable rootCause = ExceptionUtils.getRootCause(caughtException); - String errorMessage = ExceptionUtils.getMessage(caughtException) + - (rootCause == null ? "" : ("\nRoot cause: " + rootCause)); - if (retryCount >= retryLimit || !isRecoverableException(caughtException)) { - LOG.error("HMSHandler Fatal error: " + ExceptionUtils.getStackTrace(caughtException)); - throw new MetaException(errorMessage); } - - assert (retryInterval >= 0); - retryCount++; - LOG.error( - String.format( - "Retrying HMSHandler after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit) + - " with error: " + ExceptionUtils.getStackTrace(caughtException)); - - Thread.sleep(retryInterval); - // If we have a connection error, the JDO connection URL hook might - // provide us with a new URL to access the datastore. - String lastUrl = MetaStoreInit.getConnectionURL(getActiveConf()); - gotNewConnectUrl = MetaStoreInit.updateConnectionURL(origConf, getActiveConf(), - lastUrl, metaStoreInitData); - } - } - - private boolean isRecoverableException(Throwable t) { - if (!(t instanceof JDOException || t instanceof NucleusException)) { - return false; - } - return DatabaseProduct.isRecoverableException(t); + String additionalInfo = "retryCount=" + retryCount.getAndIncrement() + " error=" + false; + return new Result(object, additionalInfo); + }).commandName(method.getName()) + .onRetry(retryOnException) + .sleepInterval(retryInterval) + .run(); } + } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java index ee635a6f9f5f..33190e23dba3 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java @@ -38,7 +38,9 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.metastore.utils.RetryingExecutor; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,8 +103,8 @@ public LeaseLeaderElection() throws IOException { hostName = InetAddress.getLocalHost().getHostName(); } - private synchronized void doWork(LockResponse resp, Configuration conf, - TableName tableName) throws LeaderException { + private synchronized void + doWork(LockResponse resp, Configuration conf, TableName tableName) { long start = System.currentTimeMillis(); lockId = resp.getLockid(); assert resp.getState() == LockState.ACQUIRED || resp.getState() == LockState.WAITING; @@ -163,47 +165,36 @@ public void tryBeLeader(Configuration conf, TableName table) throws LeaderExcept .setOperationType(DataOperationType.NO_TXN) .build()); - boolean lockable = false; - Exception recentException = null; long start = System.currentTimeMillis(); LockRequest req = new LockRequest(components, userName, hostName); int numRetries = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.LOCK_NUMRETRIES); long maxSleep = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS); - for (int i = 0; i < numRetries && !stopped; i++) { - try { + try { + if (stopped) { + throw new LeaderException("Invalid candidate as " + getName() + " has been stopped"); + } + new RetryingExecutor<>(numRetries, () -> { LockResponse res = store.lock(req); - if (res.getState() == LockState.WAITING || res.getState() == LockState.ACQUIRED) { - lockable = true; - LOG.debug("{} Spent {}ms to take part in election, retries: {}", getName(), System.currentTimeMillis() - start, i); + if (res.getState() == LockState.WAITING + || res.getState() == LockState.ACQUIRED) { doWork(res, conf, table); - break; + return true; + } else { + throw new MetaException("Un-desirable lock state: " + res.getState()); } - } catch (NoSuchTxnException | TxnAbortedException e) { - throw new AssertionError("This should not happen, we didn't open txn", e); - } catch (MetaException e) { - recentException = e; - LOG.warn("Error while locking the table: {}, num retries: {}, max retries: {}", - table, i, numRetries, e); - } - backoff(maxSleep); - } - if (!lockable) { - throw new LeaderException("Error locking the table: " + table + " in " + numRetries + - " retries, time spent: " + (System.currentTimeMillis() - start) + " ms", recentException); - } - } - - // Sleep before we send checkLock again, but do it with a back off - // so we don't sit and hammer the metastore in a tight loop - private void backoff(long maxSleep) { - nextSleep *= 2; - if (nextSleep > maxSleep) - nextSleep = maxSleep; - try { - Thread.sleep(nextSleep); - } catch (InterruptedException ignored) { + }).onRetry(e -> e instanceof MetaException && !stopped) + .commandName("LeaseLeaderElection#tryBeLeader") + .sleepInterval(nextSleep, nextSleep -> { + nextSleep *= 2; + if (nextSleep > maxSleep) + nextSleep = maxSleep; + return nextSleep; + }).run(); + } catch (TException e) { + throw new LeaderException("Unable to take the lock on table: " + table, e); } + LOG.debug("{}: is leader? {}, time spent: {}", getName(), isLeader(), System.currentTimeMillis() - start); } protected void shutdownWatcher() {