Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase.getLeaderNotReadyException;
import static org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase.getNotLeaderException;
import static org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase.getReadException;
import static org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase.getReadIndexException;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.RpcController;
Expand All @@ -42,6 +44,8 @@
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ReadConsistencyHint;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -302,6 +306,18 @@ public Object invoke(Object proxy, final Method method, final Object[] args)
// If we break here instead, we will retry the same leader again without waiting
throw e;
}

ReadIndexException readIndexException = getReadIndexException(e);
if (readIndexException != null) {
// This should trigger failover in the following shouldFailover
LOG.debug("Encountered ReadIndexException from {}. ", current.proxyInfo);
}

ReadException readException = getReadException(e);
if (readException != null) {
// This should trigger failover in the following shouldFailover
LOG.debug("Encountered ReadException from {}. ", current.proxyInfo);
}
}

if (!failoverProxy.shouldFailover(e)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -434,6 +436,44 @@ public static OMNotLeaderException getNotLeaderException(
return null;
}

/**
* Unwrap the exception and return the wrapped ReadIndexException if any.
*
* @param exception exception to unwrap.
* @return the unwrapped ReadIndexException or null if the wrapped
* exception is not ReadIndexException.
*/
public static ReadIndexException getReadIndexException(Exception exception) {
Throwable cause = exception.getCause();
if (cause instanceof RemoteException) {
IOException ioException =
((RemoteException) cause).unwrapRemoteException();
if (ioException instanceof ReadIndexException) {
return (ReadIndexException) ioException;
}
}
return null;
}

/**
* Unwrap the exception and return the wrapped ReadException if any.
*
* @param exception exception to unwrap.
* @return the unwrapped ReadException or null if the wrapped
* exception is not ReadException.
*/
public static ReadException getReadException(Exception exception) {
Throwable cause = exception.getCause();
if (cause instanceof RemoteException) {
IOException ioException =
((RemoteException) cause).unwrapRemoteException();
if (ioException instanceof ReadException) {
return (ReadException) ioException;
}
}
return null;
}

protected ConfigurationSource getConf() {
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -337,6 +339,22 @@ void testNullRequest() throws Exception {
assertInstanceOf(RpcNoSuchProtocolException.class, exception.getCause());
}

@Test
void testReadIndexException() throws Exception {
setupProxyProvider(3);
omNodeAnswers[0].isThrowReadIndexException = true;
doRead();
assertHandledBy(1);
}

@Test
void testReadException() throws Exception {
setupProxyProvider(3);
omNodeAnswers[0].isThrowReadException = true;
doRead();
assertHandledBy(1);
}

private void setupProxyProvider(int omNodeCount) throws Exception {
setupProxyProvider(omNodeCount, new OzoneConfiguration());
}
Expand Down Expand Up @@ -489,6 +507,8 @@ private static class OMAnswer {
private volatile boolean isLeader = false;
private volatile boolean isLeaderReady = true;
private volatile boolean isFollowerReadSupported = true;
private volatile boolean isThrowReadIndexException = false;
private volatile boolean isThrowReadException = false;

private OMProtocolAnswer clientAnswer = new OMProtocolAnswer();

Expand Down Expand Up @@ -524,13 +544,31 @@ public OMResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
}
break;
case GetKeyInfo:
if (!isLeader && !isFollowerReadSupported) {
throw new ServiceException(
new RemoteException(
OMNotLeaderException.class.getCanonicalName(),
"OM follower read is not supported"
)
);
if (!isLeader) {
if (!isFollowerReadSupported) {
throw new ServiceException(
new RemoteException(
OMNotLeaderException.class.getCanonicalName(),
"OM follower read is not supported"
)
);
}
if (isThrowReadIndexException) {
throw new ServiceException(
new RemoteException(
ReadIndexException.class.getCanonicalName(),
"ReadIndex exception"
)
);
}
if (isThrowReadException) {
throw new ServiceException(
new RemoteException(
ReadException.class.getCanonicalName(),
"ReadException"
)
);
}
}
if (isLeader && !isLeaderReady) {
throw new ServiceException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

Expand Down Expand Up @@ -434,6 +435,31 @@ protected void createKeyTest(boolean checkSuccess) throws Exception {
}
}

protected void listVolumes(boolean checkSuccess)
throws Exception {
try {
getObjectStore().getClientProxy().listVolumes(null, null, 100);
} catch (IOException e) {
if (!checkSuccess) {
// If the last OM to be tried by the RetryProxy is down, we would get
// ConnectException. Otherwise, we would get a RemoteException from the
// last running OM as it would fail to get a quorum.
if (e instanceof RemoteException) {
// Linearizable read will fail with ReadIndexException if the follower does not recognize any leader
// or leader is uncontactable. It will throw ReadException if the read submitted to Ratis encounters
// timeout.
assertThat(((RemoteException) e).unwrapRemoteException()).isInstanceOf(RaftException.class);
} else if (e instanceof ConnectException) {
assertThat(e).hasMessageContaining("Connection refused");
} else {
assertThat(e).hasMessageContaining("Could not determine or connect to OM Leader");
}
} else {
throw e;
}
}
}

protected void waitForLeaderToBeReady()
throws InterruptedException, TimeoutException {
// Wait for Leader Election timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ void twoOMDown() throws Exception {
getCluster().stopOzoneManager(2);
Thread.sleep(NODE_FAILURE_TIMEOUT * 4);

// Write requests will fail with OMNotLeaderException
createVolumeTest(false);
createKeyTest(false);

// Read requests will fail with either ReadIndexException or ReadException
listVolumes(false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -58,21 +57,27 @@ public void init() throws Exception {

@Test
public void testAllowLeaderSkipLinearizableRead() throws Exception {
super.testListAllKeysInternal("skipvol1");
long lastMetrics = getCluster().getOMLeader().getMetrics().getNumLeaderSkipLinearizableRead();
Assertions.assertTrue(lastMetrics > 0);

OzoneConfiguration oldConf = getCluster().getConf();
OzoneConfiguration newConf = new OzoneConfiguration(oldConf);
newConf.setBoolean("ozone.om.allow.leader.skip.linearizable.read", false);
getCluster().getOMLeader().setConfiguration(newConf);

super.testListAllKeysInternal("skipvol2");

long curMetrics = getCluster().getOMLeader().getMetrics().getNumLeaderSkipLinearizableRead();
assertEquals(lastMetrics, curMetrics);

getCluster().getOMLeader().setConfiguration(oldConf);
try {
String[] args = new String[]{"volume", "list"};
OzoneShell ozoneShell = new OzoneShell();
ozoneShell.getOzoneConf().setBoolean("ozone.client.follower.read.enabled", true);
for (int i = 0; i < 100; i++) {
execute(ozoneShell, args);
}
long lastMetrics = getCluster().getOMLeader().getMetrics().getNumLeaderSkipLinearizableRead();
assertThat(lastMetrics).isGreaterThan(0);
OzoneConfiguration newConf = new OzoneConfiguration(oldConf);
newConf.setBoolean("ozone.om.allow.leader.skip.linearizable.read", false);
getCluster().getOMLeader().setConfiguration(newConf);
for (int i = 0; i < 100; i++) {
execute(ozoneShell, args);
}
long curMetrics = getCluster().getOMLeader().getMetrics().getNumLeaderSkipLinearizableRead();
assertEquals(lastMetrics, curMetrics);
} finally {
getCluster().getOMLeader().setConfiguration(oldConf);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
Expand Down Expand Up @@ -608,6 +610,16 @@ private OMResponse createOmResponseImpl(OMRequest omRequest,
throw new ServiceException(new OMNotLeaderException(leaderSteppingDownException.getMessage()));
}

ReadIndexException readIndexException = reply.getReadIndexException();
if (readIndexException != null) {
throw new ServiceException(readIndexException);
}

ReadException readException = reply.getReadException();
if (readException != null) {
throw new ServiceException(readException);
}

StateMachineException stateMachineException =
reply.getStateMachineException();
if (stateMachineException != null) {
Expand Down