diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java index 698f005ca50c..62d58443f542 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java @@ -202,7 +202,7 @@ private void compareBlockMerkleTree(ContainerProtos.BlockMerkleTree thisBlockMer // Our block deleting service will eventually catch up. // Our container scanner will not update this deleted block in the merkle tree further even if it is still on // disk so that we remain in sync with the peer. - // TODO HDDS-11765 Add support for deleting blocks from our replica when a peer has already deleted the block. + // Actual block data and chunk file deletion is handled in KeyValueHandler.reconcileContainerInternal. report.addDivergedDeletedBlock(peerBlockMerkleTree); } else { // Neither our nor peer's block is deleted. Walk the chunk list to find differences. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index ef598f3c0cb2..0b25903832e8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -123,6 +123,8 @@ import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature; import org.apache.hadoop.hdds.utils.FaultInjector; import org.apache.hadoop.hdds.utils.HddsServerUtil; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.io.RandomAccessFileChannel; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; @@ -1747,9 +1749,16 @@ private void reconcileContainerInternal(DNContainerOperationClient dnClient, Con } // Merge block deletes from the peer that do not match our list of deleted blocks. + // In addition to updating the merkle tree, also delete the actual block data from our container. for (ContainerDiffReport.DeletedBlock deletedBlock : diffReport.getDivergedDeletedBlocks()) { updatedTreeWriter.setDeletedBlock(deletedBlock.getBlockID(), deletedBlock.getDataChecksum()); numDivergedDeletedBlocksUpdated++; + try { + deleteBlockForReconciliation(kvContainer, deletedBlock.getBlockID()); + } catch (IOException e) { + LOG.error("Error while deleting diverged deleted block {} in container {} during reconciliation", + deletedBlock.getBlockID(), containerID, e); + } } // Based on repaired done with this peer, write the updated merkle tree to the container. @@ -2012,6 +2021,72 @@ private boolean previousChunkPresent(BlockID blockID, long chunkOffset, } } + /** + * Called during reconciliation to delete block data and chunk files for a block that a peer has already deleted. + * This handles the case where some replicas miss block delete transactions from SCM. + * If the block metadata exists in RocksDB, chunk files are deleted, the block key is removed from the DB, + * and container stats are updated. If the block metadata does not exist in RocksDB (already deleted or never + * existed), falls back to deleteUnreferenced to clean up any orphaned chunk files. + */ + @VisibleForTesting + void deleteBlockForReconciliation(KeyValueContainer container, long localBlockID) throws IOException { + KeyValueContainerData containerData = container.getContainerData(); + long containerID = containerData.getContainerID(); + + container.writeLock(); + try (DBHandle db = BlockUtils.getDB(containerData, conf)) { + String blockKey = containerData.getBlockKey(localBlockID); + BlockData blockData = db.getStore().getBlockDataTable().get(blockKey); + + if (blockData == null) { + // Block metadata not in DB, but chunk files may still be on disk. + LOG.debug("Block {} not found in DB for container {}. Attempting to clean up unreferenced chunk files.", + localBlockID, containerID); + try { + deleteUnreferenced(container, localBlockID); + } catch (IOException e) { + LOG.warn("Failed to delete unreferenced files for block {} of container {}", + localBlockID, containerID, e); + } + return; + } + + // Delete chunk files from disk. + deleteBlock(container, blockData); + long releasedBytes = KeyValueContainerUtil.getBlockLengthTryCatch(blockData); + + // Remove block metadata from DB and update counters. + try (BatchOperation batch = db.getStore().getBatchHandler().initBatchOperation()) { + db.getStore().getBlockDataTable().deleteWithBatch(batch, blockKey); + // Also remove from lastChunkInfoTable for schema V2/V3. + if (!containerData.hasSchema(OzoneConsts.SCHEMA_V1)) { + db.getStore().getLastChunkInfoTable().deleteWithBatch(batch, blockKey); + } + + // Update DB counters. These blocks were not marked as pending deletion by SCM, so we only + // decrement bytesUsed and blockCount. pendingDeleteBlockCount is not affected. + Table metadataTable = db.getStore().getMetadataTable(); + final ContainerData.BlockByteAndCounts stats = containerData.getStatistics().getBlockByteAndCounts(); + metadataTable.putWithBatch(batch, containerData.getBytesUsedKey(), stats.getBytes() - releasedBytes); + metadataTable.putWithBatch(batch, containerData.getBlockCountKey(), stats.getCount() - 1); + db.getStore().getBatchHandler().commitBatchOperation(batch); + } + + // Update in-memory stats (only bytesUsed and blockCount, not pendingDeletion). + containerData.getStatistics().decDeletion(releasedBytes, 0, 1, 0); + containerData.getVolume().decrementUsedSpace(releasedBytes); + + if (!container.hasBlocks()) { + containerData.markAsEmpty(); + } + + LOG.info("Deleted block {} ({} bytes) from container {} during reconciliation", + localBlockID, releasedBytes, containerID); + } finally { + container.writeUnlock(); + } + } + /** * Called by BlockDeletingService to delete all the chunks in a block * before proceeding to delete the block info from DB. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index c051b3478b4c..74908142da0e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@ -30,13 +30,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; @@ -75,6 +76,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; @@ -334,6 +336,69 @@ public void testContainerReconciliationFailureContainerScan() waitForExpectedScanCount(2); } + private List getBlocks(MockDatanode dn, long containerID) throws IOException { + KeyValueContainer container = dn.getContainer(containerID); + return dn.getHandler().getBlockManager().listBlock(container, -1, 100); + } + + private boolean chunkFileExists(KeyValueContainerData containerData, long localBlockID) { + return new File(containerData.getChunksPath(), localBlockID + ".block").exists(); + } + + @Test + public void testDeleteBlockForReconciliation() throws Exception { + long containerID = ContainerTestHelper.getTestContainerID(); + MockDatanode dn = datanodes.get(0); + dn.addContainerWithBlocks(containerID, 3); + KeyValueContainer container = dn.getContainer(containerID); + KeyValueContainerData containerData = container.getContainerData(); + + List blocks = getBlocks(dn, containerID); + long blockToDelete = blocks.get(0).getLocalID(); + assertTrue(chunkFileExists(containerData, blockToDelete)); + + long initialBytesUsed = containerData.getBytesUsed(); + long initialBlockCount = containerData.getBlockCount(); + + dn.getHandler().deleteBlockForReconciliation(container, blockToDelete); + + assertFalse(chunkFileExists(containerData, blockToDelete), "Chunk file should be deleted from disk"); + try (DBHandle db = BlockUtils.getDB(containerData, dn.getConf())) { + assertNull(db.getStore().getBlockDataTable().get(containerData.getBlockKey(blockToDelete)), + "Block metadata should be removed from DB"); + } + assertEquals(initialBlockCount - 1, containerData.getBlockCount()); + assertTrue(containerData.getBytesUsed() < initialBytesUsed); + } + + @Test + public void testDeleteBlockForReconciliationOrphanedChunk() throws Exception { + long containerID = ContainerTestHelper.getTestContainerID(); + MockDatanode dn = datanodes.get(0); + dn.addContainerWithBlocks(containerID, 2); + KeyValueContainer container = dn.getContainer(containerID); + KeyValueContainerData containerData = container.getContainerData(); + + List blocks = getBlocks(dn, containerID); + long blockToDelete = blocks.get(0).getLocalID(); + + long initialBytesUsed = containerData.getBytesUsed(); + long initialBlockCount = containerData.getBlockCount(); + + // Remove DB metadata only to simulate an orphaned file. + try (DBHandle db = BlockUtils.getDB(containerData, dn.getConf()); + BatchOperation op = db.getStore().getBatchHandler().initBatchOperation()) { + db.getStore().getBlockDataTable().deleteWithBatch(op, containerData.getBlockKey(blockToDelete)); + db.getStore().getBatchHandler().commitBatchOperation(op); + } + + dn.getHandler().deleteBlockForReconciliation(container, blockToDelete); + + assertFalse(chunkFileExists(containerData, blockToDelete), "Orphaned chunk file should be deleted"); + assertEquals(initialBlockCount, containerData.getBlockCount(), "Block count should be unchanged"); + assertEquals(initialBytesUsed, containerData.getBytesUsed(), "Bytes used should be unchanged"); + } + /** * Uses the on-demand container scanner metrics to wait for the expected number of on-demand scans to complete on * every datanode. @@ -664,6 +729,10 @@ public void addContainerWithBlocks(long containerId, int blocks) throws Exceptio handler.closeContainer(container); } + public OzoneConfiguration getConf() { + return conf; + } + @Override public String toString() { return dnDetails.toString(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java index b632b87a90b5..c016e89dc73d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java @@ -57,6 +57,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -68,6 +69,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.Set; @@ -484,6 +486,130 @@ public void testContainerChecksumChunkCorruption() throws Exception { TestHelper.validateData(KEY_NAME, data, store, volume, bucket); } + /** + * Setup: + * - Both peers: Simulate BlockDeletingService — delete DB metadata + chunk files, then call + * addDeletedBlocks to mark blocks as deleted in their merkle trees. + * - sutDn (DN under test): Only remove DB metadata (chunk files remain on disk as orphans). + * + * When sutDn reconciles with a peer, the diff finds blockIDs with deleted=true in the peer's tree + * that sutDn's tree doesn't have, triggering deleteBlockForReconciliation to clean up orphaned chunk files. + * + * Note: DB metadata is removed (not just marking deleted) because the current checksum computation + * does not include the deleted flag — removing blockIDs from the tree forces the diff to detect divergence. + */ + @Test + @Flaky("HDDS-13401") + public void testReconcileDeletedBlocks() throws Exception { + // Step 1: Write data and close the container on all 3 DNs. + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + Pair containerAndData = getDataAndContainer(true, 20 * 1024 * 1024, volume, bucket); + long containerID = containerAndData.getLeft(); + + List dataNodeDetails = cluster.getStorageContainerManager().getContainerManager() + .getContainerReplicas(ContainerID.valueOf(containerID)) + .stream().map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + assertEquals(3, dataNodeDetails.size()); + + // sutDn: the DN under test — will have orphaned chunk files cleaned up during reconciliation. + HddsDatanodeService sutDn = cluster.getHddsDatanode(dataNodeDetails.get(0)); + DatanodeStateMachine sutDsm = sutDn.getDatanodeStateMachine(); + Container sutContainer = sutDsm.getContainer().getContainerSet().getContainer(containerID); + KeyValueContainerData sutContainerData = (KeyValueContainerData) sutContainer.getContainerData(); + KeyValueHandler sutHandler = (KeyValueHandler) sutDsm.getContainer().getDispatcher() + .getHandler(ContainerProtos.ContainerType.KeyValueContainer); + + // Use sutDn's block list as the source of truth (all replicas are identical at this point). + List allBlocks = sutHandler.getBlockManager().listBlock(sutContainer, -1, 100); + assertTrue(allBlocks.size() > 1, "Container should have more than 1 block for this test"); + + List blocksToDelete = new ArrayList<>(); + for (int i = 0; i < allBlocks.size(); i += 2) { + blocksToDelete.add(allBlocks.get(i)); + } + + // Step 2: Both peers — simulate BlockDeletingService: delete DB + chunks, then mark deleted in tree. + for (int i = 1; i <= 2; i++) { + HddsDatanodeService peerDn = cluster.getHddsDatanode(dataNodeDetails.get(i)); + DatanodeStateMachine peerDsm = peerDn.getDatanodeStateMachine(); + Container peerContainer = peerDsm.getContainer().getContainerSet().getContainer(containerID); + KeyValueContainerData peerContainerData = (KeyValueContainerData) peerContainer.getContainerData(); + KeyValueHandler peerHandler = (KeyValueHandler) peerDsm.getContainer().getDispatcher() + .getHandler(ContainerProtos.ContainerType.KeyValueContainer); + String peerChunksPath = peerContainer.getContainerData().getChunksPath(); + + try (DBHandle db = BlockUtils.getDB(peerContainerData, conf); + BatchOperation op = db.getStore().getBatchHandler().initBatchOperation()) { + for (BlockData blockData : blocksToDelete) { + long localID = blockData.getLocalID(); + db.getStore().getBlockDataTable().deleteWithBatch(op, peerContainerData.getBlockKey(localID)); + Files.deleteIfExists(Paths.get(peerChunksPath + "/" + localID + ".block")); + } + db.getStore().getBatchHandler().commitBatchOperation(op); + db.getStore().flushDB(); + } + peerHandler.getChecksumManager().addDeletedBlocks(peerContainerData, blocksToDelete); + peerDsm.getContainer().getContainerSet().scanContainerWithoutGap(containerID, TEST_SCAN); + } + + // Step 3: sutDn — remove DB metadata only (keep chunk files on disk as orphans), then scan. + String sutChunksPath = sutContainer.getContainerData().getChunksPath(); + try (DBHandle db = BlockUtils.getDB(sutContainerData, conf); + BatchOperation op = db.getStore().getBatchHandler().initBatchOperation()) { + for (BlockData blockData : blocksToDelete) { + db.getStore().getBlockDataTable().deleteWithBatch(op, + sutContainerData.getBlockKey(blockData.getLocalID())); + } + db.getStore().getBatchHandler().commitBatchOperation(op); + db.getStore().flushDB(); + } + sutDsm.getContainer().getContainerSet().scanContainerWithoutGap(containerID, TEST_SCAN); + + waitForDataChecksumsAtSCM(containerID, 2); + + // Step 4: Verify chunk files still exist on sutDn before reconciliation. + for (BlockData bd : blocksToDelete) { + assertTrue(Files.exists(Paths.get(sutChunksPath + "/" + bd.getLocalID() + ".block")), + "Chunk file for block " + bd.getLocalID() + " should still exist on sutDn before reconciliation"); + } + + // Step 5: Trigger reconciliation. + cluster.getStorageContainerLocationClient().reconcileContainer(containerID); + + // Step 6: Wait for reconciliation to delete the orphaned chunk files on sutDn. + GenericTestUtils.waitFor(() -> { + for (BlockData bd : blocksToDelete) { + if (Files.exists(Paths.get(sutChunksPath + "/" + bd.getLocalID() + ".block"))) { + return false; + } + } + return true; + }, 1000, 120000); + + // Step 7: Verify chunk files are physically deleted from sutDn. + for (BlockData bd : blocksToDelete) { + assertFalse(Files.exists(Paths.get(sutChunksPath + "/" + bd.getLocalID() + ".block")), + "Chunk file for block " + bd.getLocalID() + + " should be physically deleted from sutDn by deleteBlockForReconciliation"); + } + + // Verify deleted blocks are marked as deleted in sutDn's merkle tree. + ContainerProtos.ContainerChecksumInfo sutFinalInfo = readChecksumFile(sutContainer.getContainerData()); + for (BlockData bd : blocksToDelete) { + long blockID = bd.getLocalID(); + ContainerProtos.BlockMerkleTree blockTree = + sutFinalInfo.getContainerMerkleTree().getBlockMerkleTreeList().stream() + .filter(b -> b.getBlockID() == blockID) + .findFirst() + .orElseThrow(() -> new AssertionError( + "Block " + blockID + " should be in sutDn's tree (as deleted) after reconciliation")); + assertTrue(blockTree.getDeleted(), + "Block " + blockID + " should be marked deleted in sutDn's tree after reconciliation"); + } + } + @Test @Flaky("HDDS-13401") public void testDataChecksumReportedAtSCM() throws Exception {