diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index effb1c466bab9..48deaef45f7a9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -92,8 +92,8 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev private static final long LOG_FILE_MAX_SIZE = CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax(); private final TEndPoint currentNodeTEndPoint; - private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+"); - private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$"); + private static final Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("log_inprogress_(\\d+)$"); + private static final Pattern LOG_PATTERN = Pattern.compile("log_(\\d+)_(\\d+)$"); public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) { this.executor = executor; @@ -121,6 +121,13 @@ public TSStatus write(IConsensusRequest request) { /** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */ protected TSStatus write(ConfigPhysicalPlan plan) { + if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { + final TSStatus persistStatus = persistPlanForSimpleConsensus(plan); + if (persistStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return persistStatus; + } + } + TSStatus result; try { result = executor.executeNonQueryPlan(plan); @@ -129,10 +136,6 @@ protected TSStatus write(ConfigPhysicalPlan plan) { result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); } - if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { - writeLogForSimpleConsensus(plan); - } - if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { PipeConfigNodeAgent.runtime().listener().tryListenToPlan(plan, false); } @@ -197,7 +200,6 @@ public boolean takeSnapshot(File snapshotDir) { PipeConfigNodeAgent.runtime() .listener() .tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots()); - return true; } catch (IOException e) { if (PipeConfigNodeAgent.runtime().listener().isOpened()) { LOGGER.warn( @@ -205,14 +207,18 @@ public boolean takeSnapshot(File snapshotDir) { e); } } + return true; } return false; } @Override public void loadSnapshot(final File latestSnapshotRootDir) { + if (!executor.loadSnapshot(latestSnapshotRootDir)) { + return; + } + try { - executor.loadSnapshot(latestSnapshotRootDir); // We recompute the snapshot for pipe listener when loading snapshot // to recover the newest snapshot in cache PipeConfigNodeAgent.runtime() @@ -342,6 +348,9 @@ public void start() { @Override public void stop() { + if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { + closeSimpleLogWriter(); + } // Shutdown leader related service for config pipe PipeConfigNodeAgent.runtime().notifyLeaderUnavailable(); } @@ -351,56 +360,48 @@ public boolean isReadOnly() { return CommonDescriptor.getInstance().getConfig().isReadOnly(); } - private void writeLogForSimpleConsensus(ConfigPhysicalPlan plan) { - if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) { - try { - simpleLogWriter.force(); - File completedFilePath = new File(FILE_PATH + startIndex + "_" + endIndex); - Files.move( - simpleLogFile.toPath(), completedFilePath.toPath(), StandardCopyOption.ATOMIC_MOVE); - } catch (IOException e) { - LOGGER.error("Can't force logWriter for ConfigNode SimpleConsensus mode", e); + private TSStatus persistPlanForSimpleConsensus(ConfigPhysicalPlan plan) { + try { + if (simpleLogWriter == null || simpleLogFile == null) { + throw new IOException("SimpleConsensus log writer is not initialized."); } - for (int retry = 0; retry < 5; retry++) { - try { - simpleLogWriter.close(); - } catch (IOException e) { - LOGGER.warn( - "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, " - + "filePath: {}, retry: {}", - simpleLogFile.getAbsolutePath(), - retry); - try { - // Sleep 1s and retry - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e2) { - Thread.currentThread().interrupt(); - LOGGER.warn("Unexpected interruption during the close method of logWriter"); - } - continue; - } - break; + + if (simpleLogFile.length() > LOG_FILE_MAX_SIZE) { + rollSimpleConsensusLogFile(); } - startIndex = endIndex + 1; - createLogFile(startIndex); - } - try { ByteBuffer buffer = plan.serializeToByteBuffer(); buffer.position(buffer.limit()); simpleLogWriter.write(buffer); + simpleLogWriter.force(); endIndex = endIndex + 1; } catch (Exception e) { LOGGER.error( - "Can't serialize current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode", e); + "Persist current ConfigPhysicalPlan for ConfigNode SimpleConsensus mode failed", e); + return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage( + "Persist ConfigNode SimpleConsensus log failed: " + String.valueOf(e.getMessage())); } + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + private void rollSimpleConsensusLogFile() throws IOException { + simpleLogWriter.force(); + closeSimpleLogWriter(); + Files.move( + simpleLogFile.toPath(), + new File(FILE_PATH + startIndex + "_" + endIndex).toPath(), + StandardCopyOption.ATOMIC_MOVE); + startIndex = endIndex + 1; + createLogFile(startIndex); } private void initStandAloneConfigNode() { File dir = new File(CURRENT_FILE_DIR); dir.mkdirs(); String[] list = new File(CURRENT_FILE_DIR).list(); + endIndex = 0; if (list != null && list.length != 0) { Arrays.sort(list, new FileComparator()); for (String logFileName : list) { @@ -417,7 +418,7 @@ private void initStandAloneConfigNode() { continue; } - startIndex = endIndex; + final int recoveredStartIndex = parseStartIndex(logFileName); while (logReader.hasNext()) { endIndex++; // Read and re-serialize the PhysicalPlan @@ -435,13 +436,13 @@ private void initStandAloneConfigNode() { } } logReader.close(); + if (isInProgressLogFile(logFileName)) { + sealRecoveredInProgressLogFile(logFile, recoveredStartIndex, endIndex); + } } - } else { - startIndex = 0; - endIndex = 0; } - startIndex = startIndex + 1; - createLogFile(endIndex); + startIndex = endIndex + 1; + createLogFile(startIndex); ScheduledExecutorService simpleConsensusThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( @@ -482,26 +483,72 @@ private void createLogFile(int startIndex) { } } + private void sealRecoveredInProgressLogFile( + File logFile, int recoveredStartIndex, int recoveredEndIndex) { + try { + if (recoveredStartIndex > recoveredEndIndex) { + Files.deleteIfExists(logFile.toPath()); + return; + } + Files.move( + logFile.toPath(), + new File(FILE_PATH + recoveredStartIndex + "_" + recoveredEndIndex).toPath(), + StandardCopyOption.ATOMIC_MOVE); + } catch (IOException e) { + LOGGER.warn("Seal recovered ConfigNode SimpleConsensus log failed: {}", logFile, e); + } + } + + private boolean isInProgressLogFile(String filename) { + return filename.startsWith("log_inprogress_"); + } + + private void closeSimpleLogWriter() { + if (simpleLogWriter == null) { + return; + } + for (int retry = 0; retry < 5; retry++) { + try { + simpleLogWriter.close(); + simpleLogWriter = null; + return; + } catch (IOException e) { + LOGGER.warn( + "Can't close StandAloneLog for ConfigNode SimpleConsensus mode, " + + "filePath: {}, retry: {}", + simpleLogFile == null ? null : simpleLogFile.getAbsolutePath(), + retry); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); + LOGGER.warn("Unexpected interruption during the close method of logWriter"); + break; + } + } + } + } + static class FileComparator implements Comparator { @Override public int compare(String filename1, String filename2) { - long id1 = parseEndIndex(filename1); - long id2 = parseEndIndex(filename2); + long id1 = parseStartIndex(filename1); + long id2 = parseStartIndex(filename2); return Long.compare(id1, id2); } } - static long parseEndIndex(String filename) { + static int parseStartIndex(String filename) { if (filename.startsWith("log_inprogress_")) { Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename); if (matcher.find()) { - return Long.parseLong(matcher.group()); + return Integer.parseInt(matcher.group(1)); } } else { Matcher matcher = LOG_PATTERN.matcher(filename); if (matcher.find()) { - return Long.parseLong(matcher.group()); + return Integer.parseInt(matcher.group(1)); } } return 0; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 2e50c6b787b20..abd89ff6037de 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.node; import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; @@ -46,6 +47,7 @@ import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan; import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan; import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan; @@ -321,30 +323,33 @@ public DataSet registerDataNode(TDataNodeRegisterReq req) { DataNodeRegisterResp resp = new DataNodeRegisterResp(); resp.setConfigNodeList(getRegisteredConfigNodes()); - // Create a new DataNodeHeartbeatCache and force update NodeStatus int dataNodeId = nodeInfo.generateNextNodeId(); - getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode, dataNodeId); - // TODO: invoke a force heartbeat to update new DataNode's status immediately RegisterDataNodePlan registerDataNodePlan = new RegisterDataNodePlan(req.getDataNodeConfiguration()); // Register new DataNode registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId); - try { - getConsensusManager().write(registerDataNodePlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus registerStatus = writeConfigPhysicalPlan(registerDataNodePlan); + if (!isConsensusWriteSuccessful(registerStatus)) { + resp.setStatus(registerStatus); + return resp; } // update datanode's versionInfo UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId); - try { - getConsensusManager().write(updateVersionInfoPlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateVersionStatus = writeConfigPhysicalPlan(updateVersionInfoPlan); + if (!isConsensusWriteSuccessful(updateVersionStatus)) { + resp.setStatus( + rollbackDataNodeRegistration( + registerDataNodePlan.getDataNodeConfiguration().getLocation(), updateVersionStatus)); + return resp; } + // Create a new DataNodeHeartbeatCache and force update NodeStatus + getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.DataNode, dataNodeId); + // TODO: invoke a force heartbeat to update new DataNode's status immediately + // Bind DataNode metrics PartitionMetrics.bindDataNodePartitionMetricsWhenUpdate( MetricService.getInstance(), configManager, dataNodeId); @@ -352,7 +357,10 @@ public DataSet registerDataNode(TDataNodeRegisterReq req) { // Adjust the maximum RegionGroup number of each Database getClusterSchemaManager().adjustMaxRegionGroupNum(); - resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION); + resp.setStatus( + buildSuccessStatus( + ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION.getMessage(), + registerStatus.getMessage())); resp.setDataNodeId( registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId()); resp.setRuntimeConfiguration(getRuntimeConfiguration(dataNodeId)); @@ -380,10 +388,10 @@ public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) { // Update DataNodeConfiguration when modified during restart UpdateDataNodePlan updateDataNodePlan = new UpdateDataNodePlan(req.getDataNodeConfiguration()); - try { - getConsensusManager().write(updateDataNodePlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateStatus = writeConfigPhysicalPlan(updateDataNodePlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + resp.setStatus(updateStatus); + return resp; } } TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId); @@ -391,14 +399,14 @@ public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) { // Update versionInfo when modified during restart UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId); - try { - getConsensusManager().write(updateVersionInfoPlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + resp.setStatus(updateStatus); + return resp; } } - resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART); + resp.setStatus(buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage())); resp.setRuntimeConfiguration(getRuntimeConfiguration(nodeId)); resp.setCorrectConsensusGroups(getPartitionManager().getAllReplicaSets(nodeId)); @@ -476,13 +484,12 @@ public TSStatus updateConfigNodeIfNecessary(int configNodeId, TNodeVersionInfo v // Update versionInfo when modified during restart UpdateVersionInfoPlan updateConfigNodePlan = new UpdateVersionInfoPlan(versionInfo, configNodeId); - try { - return getConsensusManager().write(updateConfigNodePlan); - } catch (ConsensusException e) { - return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode()); + TSStatus updateStatus = writeConfigPhysicalPlan(updateConfigNodePlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + return updateStatus; } } - return ClusterNodeStartUtils.ACCEPT_NODE_RESTART; + return buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage()); } public List getRegisteredAINodeInfoList() { @@ -528,27 +535,37 @@ public synchronized DataSet registerAINode(TAINodeRegisterReq req) { } int aiNodeId = nodeInfo.generateNextNodeId(); - getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode, aiNodeId); RegisterAINodePlan registerAINodePlan = new RegisterAINodePlan(req.getAiNodeConfiguration()); // Register new DataNode registerAINodePlan.getAINodeConfiguration().getLocation().setAiNodeId(aiNodeId); - try { - getConsensusManager().write(registerAINodePlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus registerStatus = writeConfigPhysicalPlan(registerAINodePlan); + if (!isConsensusWriteSuccessful(registerStatus)) { + AINodeRegisterResp resp = new AINodeRegisterResp(); + resp.setConfigNodeList(getRegisteredConfigNodes()); + resp.setStatus(registerStatus); + return resp; } // update datanode's versionInfo UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), aiNodeId); - try { - getConsensusManager().write(updateVersionInfoPlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateVersionStatus = writeConfigPhysicalPlan(updateVersionInfoPlan); + if (!isConsensusWriteSuccessful(updateVersionStatus)) { + AINodeRegisterResp resp = new AINodeRegisterResp(); + resp.setConfigNodeList(getRegisteredConfigNodes()); + resp.setStatus( + rollbackAINodeRegistration( + registerAINodePlan.getAINodeConfiguration().getLocation(), updateVersionStatus)); + return resp; } + getLoadManager().getLoadCache().createNodeHeartbeatCache(NodeType.AINode, aiNodeId); + AINodeRegisterResp resp = new AINodeRegisterResp(); - resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION); + resp.setStatus( + buildSuccessStatus( + ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION.getMessage(), + registerStatus.getMessage())); resp.setConfigNodeList(getRegisteredConfigNodes()); resp.setAINodeId(registerAINodePlan.getAINodeConfiguration().getLocation().getAiNodeId()); return resp; @@ -586,10 +603,12 @@ public TAINodeRestartResp updateAINodeIfNecessary(TAINodeRestartReq req) { if (!req.getAiNodeConfiguration().equals(aiNodeConfiguration)) { // Update AINodeConfiguration when modified during restart UpdateAINodePlan updateAINodePlan = new UpdateAINodePlan(req.getAiNodeConfiguration()); - try { - getConsensusManager().write(updateAINodePlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateStatus = writeConfigPhysicalPlan(updateAINodePlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + TAINodeRestartResp resp = new TAINodeRestartResp(); + resp.setConfigNodeList(getRegisteredConfigNodes()); + resp.setStatus(updateStatus); + return resp; } } TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId); @@ -597,15 +616,17 @@ public TAINodeRestartResp updateAINodeIfNecessary(TAINodeRestartReq req) { // Update versionInfo when modified during restart UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId); - try { - getConsensusManager().write(updateVersionInfoPlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + TAINodeRestartResp resp = new TAINodeRestartResp(); + resp.setConfigNodeList(getRegisteredConfigNodes()); + resp.setStatus(updateStatus); + return resp; } } TAINodeRestartResp resp = new TAINodeRestartResp(); - resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART); + resp.setStatus(buildSuccessStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART.getMessage())); resp.setConfigNodeList(getRegisteredConfigNodes()); return resp; } @@ -890,17 +911,15 @@ public List getRegisteredConfigNodeInfo4Infor public void applyConfigNode( TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) { ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation); - try { - getConsensusManager().write(applyConfigNodePlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); - } + ensureConsensusWriteSuccessful( + writeConfigPhysicalPlan(applyConfigNodePlan), + String.format("apply ConfigNode %s", configNodeLocation)); UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId()); - try { - getConsensusManager().write(updateVersionInfoPlan); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + final TSStatus updateStatus = writeConfigPhysicalPlan(updateVersionInfoPlan); + if (!isConsensusWriteSuccessful(updateStatus)) { + throw new IllegalStateException( + rollbackConfigNodeRegistration(configNodeLocation, updateStatus).getMessage()); } } @@ -1303,6 +1322,144 @@ public TDataNodeLocation getLowestLoadDataNode(Set nodes) { return getRegisteredDataNode(dataNodeId).getLocation(); } + private TSStatus writeConfigPhysicalPlan(ConfigPhysicalPlan plan) { + try { + return getConsensusManager().write(plan); + } catch (ConsensusException e) { + LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage()); + } + } + + private boolean isConsensusWriteSuccessful(TSStatus status) { + return status != null && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode(); + } + + private TSStatus rollbackDataNodeRegistration( + TDataNodeLocation dataNodeLocation, TSStatus versionUpdateStatus) { + final TSStatus rollbackStatus = + writeConfigPhysicalPlan( + new RemoveDataNodePlan(Collections.singletonList(dataNodeLocation))); + final String failureMessage = + String.format( + "Failed to persist version info for DataNode %d: %s", + dataNodeLocation.getDataNodeId(), describeStatus(versionUpdateStatus)); + if (isConsensusWriteSuccessful(rollbackStatus)) { + return buildStatus( + versionUpdateStatus.getCode(), + failureMessage, + "The registration has been rolled back. Please retry the registration."); + } + + LOGGER.error( + "Failed to roll back DataNode registration {} after version info persistence failure. " + + "versionUpdateStatus: {}, rollbackStatus: {}", + dataNodeLocation, + versionUpdateStatus, + rollbackStatus); + return buildStatus( + rollbackStatus.getCode(), + failureMessage, + String.format("The registration rollback also failed: %s", describeStatus(rollbackStatus)), + "Manual cleanup may be required before retrying the registration."); + } + + private TSStatus rollbackAINodeRegistration( + TAINodeLocation aiNodeLocation, TSStatus versionUpdateStatus) { + final TSStatus rollbackStatus = writeConfigPhysicalPlan(new RemoveAINodePlan(aiNodeLocation)); + final String failureMessage = + String.format( + "Failed to persist version info for AINode %d: %s", + aiNodeLocation.getAiNodeId(), describeStatus(versionUpdateStatus)); + if (isConsensusWriteSuccessful(rollbackStatus)) { + return buildStatus( + versionUpdateStatus.getCode(), + failureMessage, + "The registration has been rolled back. Please retry the registration."); + } + + LOGGER.error( + "Failed to roll back AINode registration {} after version info persistence failure. " + + "versionUpdateStatus: {}, rollbackStatus: {}", + aiNodeLocation, + versionUpdateStatus, + rollbackStatus); + return buildStatus( + rollbackStatus.getCode(), + failureMessage, + String.format("The registration rollback also failed: %s", describeStatus(rollbackStatus)), + "Manual cleanup may be required before retrying the registration."); + } + + private TSStatus rollbackConfigNodeRegistration( + TConfigNodeLocation configNodeLocation, TSStatus versionUpdateStatus) { + final TSStatus rollbackStatus = + writeConfigPhysicalPlan(new RemoveConfigNodePlan(configNodeLocation)); + final String failureMessage = + String.format( + "Failed to persist version info for ConfigNode %d: %s", + configNodeLocation.getConfigNodeId(), describeStatus(versionUpdateStatus)); + if (isConsensusWriteSuccessful(rollbackStatus)) { + return buildStatus( + versionUpdateStatus.getCode(), + failureMessage, + "The ConfigNode registration has been rolled back."); + } + + LOGGER.error( + "Failed to roll back ConfigNode registration {} after version info persistence failure. " + + "versionUpdateStatus: {}, rollbackStatus: {}", + configNodeLocation, + versionUpdateStatus, + rollbackStatus); + return buildStatus( + rollbackStatus.getCode(), + failureMessage, + String.format("The registration rollback also failed: %s", describeStatus(rollbackStatus)), + "Manual cleanup may be required before retrying the registration."); + } + + private TSStatus buildStatus(int statusCode, String... messages) { + final TSStatus status = new TSStatus(statusCode); + final StringBuilder builder = new StringBuilder(); + for (String message : messages) { + if (message == null || message.isEmpty()) { + continue; + } + if (builder.length() > 0) { + builder.append(' '); + } + builder.append(message); + } + if (builder.length() > 0) { + status.setMessage(builder.toString()); + } + return status; + } + + private TSStatus buildSuccessStatus(String... messages) { + return buildStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode(), messages); + } + + private String describeStatus(TSStatus status) { + if (status == null) { + return "unknown error"; + } + if (status.getMessage() != null && !status.getMessage().isEmpty()) { + return status.getMessage(); + } + return "status code " + status.getCode(); + } + + private void ensureConsensusWriteSuccessful(TSStatus status, String action) { + if (isConsensusWriteSuccessful(status)) { + return; + } + throw new IllegalStateException( + String.format("Failed to %s through consensus layer: %s", action, status)); + } + private ConsensusManager getConsensusManager() { return configManager.getConsensusManager(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index b2bae24de38b7..fe1a608f6ab77 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -760,12 +760,12 @@ public boolean takeSnapshot(File snapshotDir) { return result.get(); } - public void loadSnapshot(final File latestSnapshotRootDir) { + public boolean loadSnapshot(final File latestSnapshotRootDir) { if (!latestSnapshotRootDir.exists()) { LOGGER.error( "snapshot directory [{}] is not exist, can not load snapshot with this directory.", latestSnapshotRootDir.getAbsolutePath()); - return; + return false; } final AtomicBoolean result = new AtomicBoolean(true); @@ -793,6 +793,7 @@ public void loadSnapshot(final File latestSnapshotRootDir) { "[ConfigNodeSnapshot] Load snapshot success, latestSnapshotRootDir: {}", latestSnapshotRootDir); } + return result.get(); } private DataSet getSchemaNodeManagementPartition(ConfigPhysicalPlan req) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index 5a80364f0331a..e8b3cb3bb5548 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -243,7 +243,7 @@ public int getRegionGroupCount(TConsensusGroupType type) { result.getAndIncrement(); } }); - return result.getAndIncrement(); + return result.get(); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index 5c2c93daeabed..f876d4dc25e19 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -243,7 +243,9 @@ public TSStatus offerRegionMaintainTasks( */ public TSStatus pollRegionMaintainTask() { synchronized (regionMaintainTaskList) { - regionMaintainTaskList.remove(0); + if (!regionMaintainTaskList.isEmpty()) { + regionMaintainTaskList.remove(0); + } return RpcUtils.SUCCESS_STATUS; } } @@ -1008,9 +1010,14 @@ public boolean processTakeSnapshot(File snapshotDir) throws TException, IOExcept databasePartitionTableEntry.getValue().serialize(bufferedOutputStream, protocol); } + final List copiedRegionMaintainTaskList; + synchronized (regionMaintainTaskList) { + copiedRegionMaintainTaskList = new ArrayList<>(regionMaintainTaskList); + } + // serialize regionCleanList - ReadWriteIOUtils.write(regionMaintainTaskList.size(), bufferedOutputStream); - for (RegionMaintainTask task : regionMaintainTaskList) { + ReadWriteIOUtils.write(copiedRegionMaintainTaskList.size(), bufferedOutputStream); + for (RegionMaintainTask task : copiedRegionMaintainTaskList) { task.serialize(bufferedOutputStream, protocol); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java index b38696a10ed2b..c0d45d00d33cb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AddConfigNodeProcedure.java @@ -80,9 +80,9 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, AddConfigNodeState s LOG.info("Successfully ADD_PEER {}", tConfigNodeLocation); break; case REGISTER_SUCCESS: - env.notifyRegisterSuccess(tConfigNodeLocation); - env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId()); env.applyConfigNode(tConfigNodeLocation, versionInfo); + env.createConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId()); + env.notifyRegisterSuccess(tConfigNodeLocation); LOG.info("The ConfigNode: {} is successfully added to the cluster", tConfigNodeLocation); return Flow.NO_MORE_STATE; } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java new file mode 100644 index 0000000000000..7bd59c8117868 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachineTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.statemachine; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class ConfigRegionStateMachineTest { + + @Test + public void testParseStartIndex() { + Assert.assertEquals(1, ConfigRegionStateMachine.parseStartIndex("log_1_10")); + Assert.assertEquals(11, ConfigRegionStateMachine.parseStartIndex("log_11_20")); + Assert.assertEquals(21, ConfigRegionStateMachine.parseStartIndex("log_inprogress_21")); + Assert.assertEquals(0, ConfigRegionStateMachine.parseStartIndex("invalid")); + } + + @Test + public void testFileComparatorSortsByStartIndex() { + List filenames = + new ArrayList<>(Arrays.asList("log_inprogress_21", "log_11_20", "log_1_10")); + + filenames.sort(new ConfigRegionStateMachine.FileComparator()); + + Assert.assertEquals(Arrays.asList("log_1_10", "log_11_20", "log_inprogress_21"), filenames); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java new file mode 100644 index 0000000000000..247d745661ef8 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.node; + +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.cluster.NodeType; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; +import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan; +import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; +import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp; +import org.apache.iotdb.confignode.manager.ClusterManager; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.load.LoadManager; +import org.apache.iotdb.confignode.manager.load.cache.LoadCache; +import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; +import org.apache.iotdb.confignode.persistence.node.NodeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp; +import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class NodeManagerTest { + + private IManager configManager; + private ConsensusManager consensusManager; + private LoadManager loadManager; + private LoadCache loadCache; + private ClusterSchemaManager clusterSchemaManager; + private ClusterManager clusterManager; + private NodeInfo nodeInfo; + private NodeManager nodeManager; + + @Before + public void setUp() { + configManager = Mockito.mock(IManager.class); + consensusManager = Mockito.mock(ConsensusManager.class); + loadManager = Mockito.mock(LoadManager.class); + loadCache = Mockito.mock(LoadCache.class); + clusterSchemaManager = Mockito.mock(ClusterSchemaManager.class); + clusterManager = Mockito.mock(ClusterManager.class); + nodeInfo = new NodeInfo(); + nodeManager = new NodeManager(configManager, nodeInfo); + + when(configManager.getConsensusManager()).thenReturn(consensusManager); + when(configManager.getLoadManager()).thenReturn(loadManager); + when(loadManager.getLoadCache()).thenReturn(loadCache); + when(configManager.getClusterSchemaManager()).thenReturn(clusterSchemaManager); + when(configManager.getClusterManager()).thenReturn(clusterManager); + } + + @Test + public void testRegisterDataNodeStopsWhenRegisterWriteFails() throws ConsensusException { + TSStatus failureStatus = + new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()).setMessage("redirect"); + when(consensusManager.write(any())).thenReturn(failureStatus); + + DataNodeRegisterResp resp = + (DataNodeRegisterResp) nodeManager.registerDataNode(generateDataNodeRegisterReq(1)); + + Assert.assertEquals(failureStatus, resp.getStatus()); + verify(loadCache, never()).createNodeHeartbeatCache(eq(NodeType.DataNode), anyInt()); + verify(clusterSchemaManager, never()).adjustMaxRegionGroupNum(); + } + + @Test + public void testRegisterDataNodeRollsBackWhenVersionWriteFails() throws ConsensusException { + TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + TSStatus failureStatus = + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("update failed"); + when(consensusManager.write(any())).thenReturn(successStatus, failureStatus, successStatus); + + DataNodeRegisterResp resp = + (DataNodeRegisterResp) nodeManager.registerDataNode(generateDataNodeRegisterReq(1)); + + Assert.assertEquals( + TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), resp.getStatus().getCode()); + Assert.assertTrue(resp.getStatus().getMessage().contains("rolled back")); + verify(loadCache, never()).createNodeHeartbeatCache(eq(NodeType.DataNode), anyInt()); + verify(clusterSchemaManager, never()).adjustMaxRegionGroupNum(); + + ArgumentCaptor planCaptor = + ArgumentCaptor.forClass(ConfigPhysicalPlan.class); + verify(consensusManager, Mockito.times(3)).write(planCaptor.capture()); + Assert.assertTrue(planCaptor.getAllValues().get(0) instanceof RegisterDataNodePlan); + Assert.assertTrue(planCaptor.getAllValues().get(1) instanceof UpdateVersionInfoPlan); + Assert.assertTrue(planCaptor.getAllValues().get(2) instanceof RemoveDataNodePlan); + } + + @Test + public void testRestartDataNodeReturnsFailureWhenUpdateWriteFails() throws ConsensusException { + final TDataNodeConfiguration registeredConfig = generateDataNodeConfiguration(1, "127.0.0.1"); + nodeInfo.registerDataNode(new RegisterDataNodePlan(registeredConfig)); + when(clusterManager.getClusterIdWithRetry(anyLong())).thenReturn("cluster"); + + TSStatus failureStatus = + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("update failed"); + when(consensusManager.write(any())).thenReturn(failureStatus); + + final TDataNodeRestartReq req = new TDataNodeRestartReq(); + req.setDataNodeConfiguration(generateDataNodeConfiguration(1, "127.0.0.2")); + req.setVersionInfo(new TNodeVersionInfo("version", "build")); + + final TDataNodeRestartResp resp = nodeManager.updateDataNodeIfNecessary(req); + + Assert.assertEquals(failureStatus, resp.getStatus()); + } + + @Test + public void testApplyConfigNodeRollsBackWhenVersionWriteFails() throws ConsensusException { + TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + TSStatus failureStatus = + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("apply failed"); + when(consensusManager.write(any())).thenReturn(successStatus, failureStatus, successStatus); + + try { + nodeManager.applyConfigNode( + new TConfigNodeLocation( + 1, new TEndPoint("127.0.0.1", 10710), new TEndPoint("127.0.0.1", 10720)), + new TNodeVersionInfo("version", "build")); + Assert.fail("Expected applyConfigNode to fail fast"); + } catch (IllegalStateException e) { + Assert.assertTrue(e.getMessage().contains("rolled back")); + } + + ArgumentCaptor planCaptor = + ArgumentCaptor.forClass(ConfigPhysicalPlan.class); + verify(consensusManager, Mockito.times(3)).write(planCaptor.capture()); + Assert.assertTrue(planCaptor.getAllValues().get(0) instanceof ApplyConfigNodePlan); + Assert.assertTrue(planCaptor.getAllValues().get(1) instanceof UpdateVersionInfoPlan); + Assert.assertTrue(planCaptor.getAllValues().get(2) instanceof RemoveConfigNodePlan); + } + + private TDataNodeRegisterReq generateDataNodeRegisterReq(int dataNodeId) { + final TDataNodeRegisterReq req = new TDataNodeRegisterReq(); + req.setDataNodeConfiguration(generateDataNodeConfiguration(dataNodeId, "127.0.0.1")); + req.setVersionInfo(new TNodeVersionInfo("version", "build")); + return req; + } + + private TDataNodeConfiguration generateDataNodeConfiguration(int dataNodeId, String ip) { + final TDataNodeLocation dataNodeLocation = new TDataNodeLocation(); + dataNodeLocation.setDataNodeId(dataNodeId); + dataNodeLocation.setClientRpcEndPoint(new TEndPoint(ip, 6667 + dataNodeId)); + dataNodeLocation.setInternalEndPoint(new TEndPoint(ip, 10730 + dataNodeId)); + dataNodeLocation.setMPPDataExchangeEndPoint(new TEndPoint(ip, 10740 + dataNodeId)); + dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint(ip, 10760 + dataNodeId)); + dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint(ip, 10750 + dataNodeId)); + + final TDataNodeConfiguration dataNodeConfiguration = new TDataNodeConfiguration(); + dataNodeConfiguration.setLocation(dataNodeLocation); + return dataNodeConfiguration; + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java index afccb0c0eba12..eccddcaa8d107 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java @@ -37,6 +37,7 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp; +import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; @@ -267,6 +268,34 @@ public void testShowRegion() { }); } + @Test + public void testRegionGroupCount() throws DatabaseNotExistsException { + partitionInfo.createDatabase( + new DatabaseSchemaPlan( + ConfigPhysicalPlanType.CreateDatabase, new TDatabaseSchema("root.region_count"))); + + CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan(); + createRegionGroupsPlan.addRegionGroup( + "root.region_count", + generateTRegionReplicaSet( + testFlag.SchemaPartition.getFlag(), + generateTConsensusGroupId( + testFlag.SchemaPartition.getFlag(), TConsensusGroupType.SchemaRegion))); + createRegionGroupsPlan.addRegionGroup( + "root.region_count", + generateTRegionReplicaSet( + testFlag.DataPartition.getFlag(), + generateTConsensusGroupId( + testFlag.DataPartition.getFlag(), TConsensusGroupType.DataRegion))); + partitionInfo.createRegionGroups(createRegionGroupsPlan); + + Assert.assertEquals( + 1, + partitionInfo.getRegionGroupCount("root.region_count", TConsensusGroupType.SchemaRegion)); + Assert.assertEquals( + 1, partitionInfo.getRegionGroupCount("root.region_count", TConsensusGroupType.DataRegion)); + } + private TRegionReplicaSet generateTRegionReplicaSet( int startFlag, TConsensusGroupId tConsensusGroupId) { TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();