Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private void compareBlockMerkleTree(ContainerProtos.BlockMerkleTree thisBlockMer
// Our block deleting service will eventually catch up.
// Our container scanner will not update this deleted block in the merkle tree further even if it is still on
// disk so that we remain in sync with the peer.
// TODO HDDS-11765 Add support for deleting blocks from our replica when a peer has already deleted the block.
// Actual block data and chunk file deletion is handled in KeyValueHandler.reconcileContainerInternal.
report.addDivergedDeletedBlock(peerBlockMerkleTree);
} else {
// Neither our nor peer's block is deleted. Walk the chunk list to find differences.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.io.RandomAccessFileChannel;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -1747,9 +1749,16 @@ private void reconcileContainerInternal(DNContainerOperationClient dnClient, Con
}

// Merge block deletes from the peer that do not match our list of deleted blocks.
// In addition to updating the merkle tree, also delete the actual block data from our container.
for (ContainerDiffReport.DeletedBlock deletedBlock : diffReport.getDivergedDeletedBlocks()) {
updatedTreeWriter.setDeletedBlock(deletedBlock.getBlockID(), deletedBlock.getDataChecksum());
numDivergedDeletedBlocksUpdated++;
try {
deleteBlockForReconciliation(kvContainer, deletedBlock.getBlockID());
} catch (IOException e) {
LOG.error("Error while deleting diverged deleted block {} in container {} during reconciliation",
deletedBlock.getBlockID(), containerID, e);
}
}

// Based on repaired done with this peer, write the updated merkle tree to the container.
Expand Down Expand Up @@ -2012,6 +2021,72 @@ private boolean previousChunkPresent(BlockID blockID, long chunkOffset,
}
}

/**
* Called during reconciliation to delete block data and chunk files for a block that a peer has already deleted.
* This handles the case where some replicas miss block delete transactions from SCM.
* If the block metadata exists in RocksDB, chunk files are deleted, the block key is removed from the DB,
* and container stats are updated. If the block metadata does not exist in RocksDB (already deleted or never
* existed), falls back to deleteUnreferenced to clean up any orphaned chunk files.
*/
@VisibleForTesting
void deleteBlockForReconciliation(KeyValueContainer container, long localBlockID) throws IOException {
KeyValueContainerData containerData = container.getContainerData();
long containerID = containerData.getContainerID();

container.writeLock();
try (DBHandle db = BlockUtils.getDB(containerData, conf)) {
String blockKey = containerData.getBlockKey(localBlockID);
BlockData blockData = db.getStore().getBlockDataTable().get(blockKey);

if (blockData == null) {
// Block metadata not in DB, but chunk files may still be on disk.
LOG.debug("Block {} not found in DB for container {}. Attempting to clean up unreferenced chunk files.",
localBlockID, containerID);
try {
deleteUnreferenced(container, localBlockID);
} catch (IOException e) {
LOG.warn("Failed to delete unreferenced files for block {} of container {}",
localBlockID, containerID, e);
}
return;
}

// Delete chunk files from disk.
deleteBlock(container, blockData);
long releasedBytes = KeyValueContainerUtil.getBlockLengthTryCatch(blockData);

// Remove block metadata from DB and update counters.
try (BatchOperation batch = db.getStore().getBatchHandler().initBatchOperation()) {
db.getStore().getBlockDataTable().deleteWithBatch(batch, blockKey);
// Also remove from lastChunkInfoTable for schema V2/V3.
if (!containerData.hasSchema(OzoneConsts.SCHEMA_V1)) {
db.getStore().getLastChunkInfoTable().deleteWithBatch(batch, blockKey);
}

// Update DB counters. These blocks were not marked as pending deletion by SCM, so we only
// decrement bytesUsed and blockCount. pendingDeleteBlockCount is not affected.
Table<String, Long> metadataTable = db.getStore().getMetadataTable();
final ContainerData.BlockByteAndCounts stats = containerData.getStatistics().getBlockByteAndCounts();
metadataTable.putWithBatch(batch, containerData.getBytesUsedKey(), stats.getBytes() - releasedBytes);
metadataTable.putWithBatch(batch, containerData.getBlockCountKey(), stats.getCount() - 1);
db.getStore().getBatchHandler().commitBatchOperation(batch);
}

// Update in-memory stats (only bytesUsed and blockCount, not pendingDeletion).
containerData.getStatistics().decDeletion(releasedBytes, 0, 1, 0);
containerData.getVolume().decrementUsedSpace(releasedBytes);

if (!container.hasBlocks()) {
containerData.markAsEmpty();
}

LOG.info("Deleted block {} ({} bytes) from container {} during reconciliation",
localBlockID, releasedBytes, containerID);
} finally {
container.writeUnlock();
}
}

/**
* Called by BlockDeletingService to delete all the chunks in a block
* before proceeding to delete the block info from DB.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.spy;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
Expand Down Expand Up @@ -334,6 +336,69 @@ public void testContainerReconciliationFailureContainerScan()
waitForExpectedScanCount(2);
}

private List<BlockData> getBlocks(MockDatanode dn, long containerID) throws IOException {
KeyValueContainer container = dn.getContainer(containerID);
return dn.getHandler().getBlockManager().listBlock(container, -1, 100);
}

private boolean chunkFileExists(KeyValueContainerData containerData, long localBlockID) {
return new File(containerData.getChunksPath(), localBlockID + ".block").exists();
}

@Test
public void testDeleteBlockForReconciliation() throws Exception {
long containerID = ContainerTestHelper.getTestContainerID();
MockDatanode dn = datanodes.get(0);
dn.addContainerWithBlocks(containerID, 3);
KeyValueContainer container = dn.getContainer(containerID);
KeyValueContainerData containerData = container.getContainerData();

List<BlockData> blocks = getBlocks(dn, containerID);
long blockToDelete = blocks.get(0).getLocalID();
assertTrue(chunkFileExists(containerData, blockToDelete));

long initialBytesUsed = containerData.getBytesUsed();
long initialBlockCount = containerData.getBlockCount();

dn.getHandler().deleteBlockForReconciliation(container, blockToDelete);

assertFalse(chunkFileExists(containerData, blockToDelete), "Chunk file should be deleted from disk");
try (DBHandle db = BlockUtils.getDB(containerData, dn.getConf())) {
assertNull(db.getStore().getBlockDataTable().get(containerData.getBlockKey(blockToDelete)),
"Block metadata should be removed from DB");
}
assertEquals(initialBlockCount - 1, containerData.getBlockCount());
assertTrue(containerData.getBytesUsed() < initialBytesUsed);
}

@Test
public void testDeleteBlockForReconciliationOrphanedChunk() throws Exception {
long containerID = ContainerTestHelper.getTestContainerID();
MockDatanode dn = datanodes.get(0);
dn.addContainerWithBlocks(containerID, 2);
KeyValueContainer container = dn.getContainer(containerID);
KeyValueContainerData containerData = container.getContainerData();

List<BlockData> blocks = getBlocks(dn, containerID);
long blockToDelete = blocks.get(0).getLocalID();

long initialBytesUsed = containerData.getBytesUsed();
long initialBlockCount = containerData.getBlockCount();

// Remove DB metadata only to simulate an orphaned file.
try (DBHandle db = BlockUtils.getDB(containerData, dn.getConf());
BatchOperation op = db.getStore().getBatchHandler().initBatchOperation()) {
db.getStore().getBlockDataTable().deleteWithBatch(op, containerData.getBlockKey(blockToDelete));
db.getStore().getBatchHandler().commitBatchOperation(op);
}

dn.getHandler().deleteBlockForReconciliation(container, blockToDelete);

assertFalse(chunkFileExists(containerData, blockToDelete), "Orphaned chunk file should be deleted");
assertEquals(initialBlockCount, containerData.getBlockCount(), "Block count should be unchanged");
assertEquals(initialBytesUsed, containerData.getBytesUsed(), "Bytes used should be unchanged");
}

/**
* Uses the on-demand container scanner metrics to wait for the expected number of on-demand scans to complete on
* every datanode.
Expand Down Expand Up @@ -664,6 +729,10 @@ public void addContainerWithBlocks(long containerId, int blocks) throws Exceptio
handler.closeContainer(container);
}

public OzoneConfiguration getConf() {
return conf;
}

@Override
public String toString() {
return dnDetails.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -68,6 +69,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -484,6 +486,130 @@ public void testContainerChecksumChunkCorruption() throws Exception {
TestHelper.validateData(KEY_NAME, data, store, volume, bucket);
}

/**
* Setup:
* - Both peers: Simulate BlockDeletingService — delete DB metadata + chunk files, then call
* addDeletedBlocks to mark blocks as deleted in their merkle trees.
* - sutDn (DN under test): Only remove DB metadata (chunk files remain on disk as orphans).
*
* When sutDn reconciles with a peer, the diff finds blockIDs with deleted=true in the peer's tree
* that sutDn's tree doesn't have, triggering deleteBlockForReconciliation to clean up orphaned chunk files.
*
* Note: DB metadata is removed (not just marking deleted) because the current checksum computation
* does not include the deleted flag — removing blockIDs from the tree forces the diff to detect divergence.
*/
@Test
@Flaky("HDDS-13401")
public void testReconcileDeletedBlocks() throws Exception {
// Step 1: Write data and close the container on all 3 DNs.
String volume = UUID.randomUUID().toString();
String bucket = UUID.randomUUID().toString();
Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024 * 1024, volume, bucket);
long containerID = containerAndData.getLeft();

List<DatanodeDetails> dataNodeDetails = cluster.getStorageContainerManager().getContainerManager()
.getContainerReplicas(ContainerID.valueOf(containerID))
.stream().map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
assertEquals(3, dataNodeDetails.size());

// sutDn: the DN under test — will have orphaned chunk files cleaned up during reconciliation.
HddsDatanodeService sutDn = cluster.getHddsDatanode(dataNodeDetails.get(0));
DatanodeStateMachine sutDsm = sutDn.getDatanodeStateMachine();
Container<?> sutContainer = sutDsm.getContainer().getContainerSet().getContainer(containerID);
KeyValueContainerData sutContainerData = (KeyValueContainerData) sutContainer.getContainerData();
KeyValueHandler sutHandler = (KeyValueHandler) sutDsm.getContainer().getDispatcher()
.getHandler(ContainerProtos.ContainerType.KeyValueContainer);

// Use sutDn's block list as the source of truth (all replicas are identical at this point).
List<BlockData> allBlocks = sutHandler.getBlockManager().listBlock(sutContainer, -1, 100);
assertTrue(allBlocks.size() > 1, "Container should have more than 1 block for this test");

List<BlockData> blocksToDelete = new ArrayList<>();
for (int i = 0; i < allBlocks.size(); i += 2) {
blocksToDelete.add(allBlocks.get(i));
}

// Step 2: Both peers — simulate BlockDeletingService: delete DB + chunks, then mark deleted in tree.
for (int i = 1; i <= 2; i++) {
HddsDatanodeService peerDn = cluster.getHddsDatanode(dataNodeDetails.get(i));
DatanodeStateMachine peerDsm = peerDn.getDatanodeStateMachine();
Container<?> peerContainer = peerDsm.getContainer().getContainerSet().getContainer(containerID);
KeyValueContainerData peerContainerData = (KeyValueContainerData) peerContainer.getContainerData();
KeyValueHandler peerHandler = (KeyValueHandler) peerDsm.getContainer().getDispatcher()
.getHandler(ContainerProtos.ContainerType.KeyValueContainer);
String peerChunksPath = peerContainer.getContainerData().getChunksPath();

try (DBHandle db = BlockUtils.getDB(peerContainerData, conf);
BatchOperation op = db.getStore().getBatchHandler().initBatchOperation()) {
for (BlockData blockData : blocksToDelete) {
long localID = blockData.getLocalID();
db.getStore().getBlockDataTable().deleteWithBatch(op, peerContainerData.getBlockKey(localID));
Files.deleteIfExists(Paths.get(peerChunksPath + "/" + localID + ".block"));
}
db.getStore().getBatchHandler().commitBatchOperation(op);
db.getStore().flushDB();
}
peerHandler.getChecksumManager().addDeletedBlocks(peerContainerData, blocksToDelete);
peerDsm.getContainer().getContainerSet().scanContainerWithoutGap(containerID, TEST_SCAN);
}

// Step 3: sutDn — remove DB metadata only (keep chunk files on disk as orphans), then scan.
String sutChunksPath = sutContainer.getContainerData().getChunksPath();
try (DBHandle db = BlockUtils.getDB(sutContainerData, conf);
BatchOperation op = db.getStore().getBatchHandler().initBatchOperation()) {
for (BlockData blockData : blocksToDelete) {
db.getStore().getBlockDataTable().deleteWithBatch(op,
sutContainerData.getBlockKey(blockData.getLocalID()));
}
db.getStore().getBatchHandler().commitBatchOperation(op);
db.getStore().flushDB();
}
sutDsm.getContainer().getContainerSet().scanContainerWithoutGap(containerID, TEST_SCAN);

waitForDataChecksumsAtSCM(containerID, 2);

// Step 4: Verify chunk files still exist on sutDn before reconciliation.
for (BlockData bd : blocksToDelete) {
assertTrue(Files.exists(Paths.get(sutChunksPath + "/" + bd.getLocalID() + ".block")),
"Chunk file for block " + bd.getLocalID() + " should still exist on sutDn before reconciliation");
}

// Step 5: Trigger reconciliation.
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);

// Step 6: Wait for reconciliation to delete the orphaned chunk files on sutDn.
GenericTestUtils.waitFor(() -> {
for (BlockData bd : blocksToDelete) {
if (Files.exists(Paths.get(sutChunksPath + "/" + bd.getLocalID() + ".block"))) {
return false;
}
}
return true;
}, 1000, 120000);

// Step 7: Verify chunk files are physically deleted from sutDn.
for (BlockData bd : blocksToDelete) {
assertFalse(Files.exists(Paths.get(sutChunksPath + "/" + bd.getLocalID() + ".block")),
"Chunk file for block " + bd.getLocalID()
+ " should be physically deleted from sutDn by deleteBlockForReconciliation");
}

// Verify deleted blocks are marked as deleted in sutDn's merkle tree.
ContainerProtos.ContainerChecksumInfo sutFinalInfo = readChecksumFile(sutContainer.getContainerData());
for (BlockData bd : blocksToDelete) {
long blockID = bd.getLocalID();
ContainerProtos.BlockMerkleTree blockTree =
sutFinalInfo.getContainerMerkleTree().getBlockMerkleTreeList().stream()
.filter(b -> b.getBlockID() == blockID)
.findFirst()
.orElseThrow(() -> new AssertionError(
"Block " + blockID + " should be in sutDn's tree (as deleted) after reconciliation"));
assertTrue(blockTree.getDeleted(),
"Block " + blockID + " should be marked deleted in sutDn's tree after reconciliation");
}
}

@Test
@Flaky("HDDS-13401")
public void testDataChecksumReportedAtSCM() throws Exception {
Expand Down