From 6310ef3642a76116e03580dd48917e7fbb38d68f Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Wed, 21 Jan 2026 12:24:50 +0530 Subject: [PATCH 1/6] HDDS-14183. Attempted to decrement available space to a negative value --- .../container/keyvalue/KeyValueHandler.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 13bfa834dbc6..312cf3e9dbe6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1037,6 +1037,9 @@ ContainerCommandResponseProto handleWriteChunk( } ContainerProtos.BlockData blockDataProto = null; + long bytesWritten = 0; + boolean writeChunkSucceeded = false; + try { checkContainerOpen(kvContainer); @@ -1057,9 +1060,11 @@ ContainerCommandResponseProto handleWriteChunk( ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList()); // TODO: Can improve checksum validation here. Make this one-shot after protocol change. validateChunkChecksumData(data, chunkInfo); + bytesWritten = chunkInfo.getLen(); } chunkManager .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); + writeChunkSucceeded = true; final boolean isCommit = dispatcherContext.getStage().isCommit(); if (isCommit && writeChunk.hasBlock()) { @@ -1092,8 +1097,10 @@ ContainerCommandResponseProto handleWriteChunk( .getChunkData().getLen()); } } catch (StorageContainerException ex) { + rollbackUsedSpaceOnWriteFailure(kvContainer, writeChunkSucceeded, bytesWritten); return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { + rollbackUsedSpaceOnWriteFailure(kvContainer, writeChunkSucceeded, bytesWritten); return ContainerUtils.logAndReturnError(LOG, new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION), request); @@ -1102,6 +1109,19 @@ ContainerCommandResponseProto handleWriteChunk( return getWriteChunkResponseSuccess(request, blockDataProto); } + /** + * Roll back usedSpace when write operation fails after writeChunk succeeded. + */ + private void rollbackUsedSpaceOnWriteFailure(KeyValueContainer kvContainer, boolean writeChunkSucceeded, + long bytesWritten) { + if (writeChunkSucceeded && bytesWritten > 0) { + HddsVolume volume = kvContainer.getContainerData().getVolume(); + if (volume != null) { + volume.decrementUsedSpace(bytesWritten); + } + } + } + /** * Handle Write Chunk operation for closed container. Calls ChunkManager to process the request. */ From 7e8bb92de75fd8b382c7b2a0f626b238527f56c6 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Thu, 22 Jan 2026 16:17:34 +0530 Subject: [PATCH 2/6] Introduce space reserved and handle committedBytes --- .../container/common/impl/ContainerData.java | 40 +++++++++++------ .../container/common/volume/HddsVolume.java | 19 ++++++++ .../container/keyvalue/KeyValueHandler.java | 43 ++++++++++++++----- 3 files changed, 79 insertions(+), 23 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index c334a2d842ed..e4462e8829aa 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -398,17 +398,10 @@ public Statistics getStatistics() { } /** - * Increase the number of bytes write into the container. - * Also decrement committed bytes against the bytes written. - * @param bytes the number of bytes write into the container. + * Calculate how much committedBytes should be decremented for write. + * This is used both for decrementing on write and restoring on failure. */ - private void incrWriteBytes(long bytes) { - /* - Increase the cached Used Space in VolumeInfo as it - maybe not updated, DU or DedicatedDiskSpaceUsage runs - periodically to update the Used Space in VolumeInfo. - */ - this.getVolume().incrementUsedSpace(bytes); + private long getCommittedBytesDecrement(long bytes) { // Calculate bytes used before this write operation. // Note that getBytesUsed() already includes the 'bytes' from the current write. long bytesUsedBeforeWrite = getBytesUsed() - bytes; @@ -417,8 +410,31 @@ private void incrWriteBytes(long bytes) { if (committedSpace && availableSpaceBeforeWrite > 0) { // Decrement committed space only by the portion of the write that fits within the originally committed space, // up to maxSize - long decrement = Math.min(bytes, availableSpaceBeforeWrite); - this.getVolume().incCommittedBytes(-decrement); + return Math.min(bytes, availableSpaceBeforeWrite); + } + return 0; + } + + /** + * Increase the number of bytes write into the container. + * Also decrement committed bytes against the bytes written. + * @param bytes the number of bytes write into the container. + */ + private void incrWriteBytes(long bytes) { + long committedBytesDecrement = getCommittedBytesDecrement(bytes); + if (committedBytesDecrement > 0) { + this.getVolume().incCommittedBytes(-committedBytesDecrement); + } + } + + /** + * Restore committedBytes when a write operation fails after writeChunk succeeded. + * This undoes the committedBytes decrement done in incrWriteBytes(). + */ + public void restoreCommittedBytesOnWriteFailure(long bytes) { + long committedBytesDecrement = getCommittedBytesDecrement(bytes); + if (committedBytesDecrement > 0) { + this.getVolume().incCommittedBytes(committedBytesDecrement); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index f331db7defc3..90639be4e5a2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -93,6 +93,7 @@ public class HddsVolume extends StorageVolume { private ContainerController controller; private final AtomicLong committedBytes = new AtomicLong(); // till Open containers become full + private final AtomicLong spaceReservedForWrites = new AtomicLong(); // for in-flight writes private Function gatherContainerUsages = (K) -> 0L; private final ConcurrentSkipListSet containerIds = new ConcurrentSkipListSet<>(); @@ -410,6 +411,24 @@ public long getCommittedBytes() { return committedBytes.get(); } + /** + * Reserve space for an in-flight write operation. + * + * @param bytes bytes to reserve + */ + public void reserveSpaceForWrite(long bytes) { + spaceReservedForWrites.addAndGet(bytes); + } + + /** + * Release space reserved for write when write completes or fails. + * + * @param bytes bytes to release + */ + public void releaseReservedSpaceForWrite(long bytes) { + spaceReservedForWrites.addAndGet(-bytes); + } + public long getFreeSpaceToSpare(long volumeCapacity) { return getDatanodeConfig().getMinFreeSpace(volumeCapacity); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 312cf3e9dbe6..bc2600d8481f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1037,7 +1037,9 @@ ContainerCommandResponseProto handleWriteChunk( } ContainerProtos.BlockData blockDataProto = null; - long bytesWritten = 0; + HddsVolume volume = kvContainer.getContainerData().getVolume(); + long bytesToWrite = 0; + boolean spaceReserved = false; boolean writeChunkSucceeded = false; try { @@ -1060,7 +1062,13 @@ ContainerCommandResponseProto handleWriteChunk( ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList()); // TODO: Can improve checksum validation here. Make this one-shot after protocol change. validateChunkChecksumData(data, chunkInfo); - bytesWritten = chunkInfo.getLen(); + bytesToWrite = chunkInfo.getLen(); + + // Reserve space before writing + if (volume != null && bytesToWrite > 0) { + volume.reserveSpaceForWrite(bytesToWrite); + spaceReserved = true; + } } chunkManager .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); @@ -1091,16 +1099,17 @@ ContainerCommandResponseProto handleWriteChunk( metrics.incContainerOpsLatencies(Type.PutBlock, Time.monotonicNowNanos() - startTime); } + commitSpaceReservedForWrite(volume, spaceReserved, bytesToWrite); // We should increment stats after writeChunk if (isWrite) { metrics.incContainerBytesStats(Type.WriteChunk, writeChunk .getChunkData().getLen()); } } catch (StorageContainerException ex) { - rollbackUsedSpaceOnWriteFailure(kvContainer, writeChunkSucceeded, bytesWritten); + releaseSpaceReservedForWrite(volume, spaceReserved, writeChunkSucceeded, bytesToWrite, kvContainer); return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { - rollbackUsedSpaceOnWriteFailure(kvContainer, writeChunkSucceeded, bytesWritten); + releaseSpaceReservedForWrite(volume, spaceReserved, writeChunkSucceeded, bytesToWrite, kvContainer); return ContainerUtils.logAndReturnError(LOG, new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION), request); @@ -1110,14 +1119,26 @@ ContainerCommandResponseProto handleWriteChunk( } /** - * Roll back usedSpace when write operation fails after writeChunk succeeded. + * Commit space reserved for write to usedSpace when write operation succeeds. + */ + private void commitSpaceReservedForWrite(HddsVolume volume, boolean spaceReserved, long bytes) { + if (spaceReserved) { + volume.releaseReservedSpaceForWrite(bytes); + volume.incrementUsedSpace(bytes); + } + } + + /** + * Release space reserved for write when write operation fails. + * Also restores committedBytes if it was decremented during write. */ - private void rollbackUsedSpaceOnWriteFailure(KeyValueContainer kvContainer, boolean writeChunkSucceeded, - long bytesWritten) { - if (writeChunkSucceeded && bytesWritten > 0) { - HddsVolume volume = kvContainer.getContainerData().getVolume(); - if (volume != null) { - volume.decrementUsedSpace(bytesWritten); + private void releaseSpaceReservedForWrite(HddsVolume volume, boolean spaceReserved, + boolean writeChunkSucceeded, long bytes, KeyValueContainer kvContainer) { + if (spaceReserved) { + volume.releaseReservedSpaceForWrite(bytes); + // Only restore committedBytes if write chunk succeeded + if (writeChunkSucceeded) { + kvContainer.getContainerData().restoreCommittedBytesOnWriteFailure(bytes); } } } From dbfefc254cb9f42af14cf6ae872b8f7da2671561 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Wed, 28 Jan 2026 11:06:53 +0530 Subject: [PATCH 3/6] Add test coverage and incrementUsedSpace first --- .../container/common/volume/HddsVolume.java | 9 + .../container/keyvalue/KeyValueHandler.java | 12 +- .../keyvalue/TestKeyValueHandler.java | 198 ++++++++++++++++++ 3 files changed, 213 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index 90639be4e5a2..80bcc9d831a5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -429,6 +429,15 @@ public void releaseReservedSpaceForWrite(long bytes) { spaceReservedForWrites.addAndGet(-bytes); } + /** + * Get the space reserved for in-flight writes. + * + * @return bytes reserved for in-flight writes + */ + public long getSpaceReservedForWrites() { + return spaceReservedForWrites.get(); + } + public long getFreeSpaceToSpare(long volumeCapacity) { return getDatanodeConfig().getMinFreeSpace(volumeCapacity); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index bc2600d8481f..b4dc879535ca 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1099,7 +1099,9 @@ ContainerCommandResponseProto handleWriteChunk( metrics.incContainerOpsLatencies(Type.PutBlock, Time.monotonicNowNanos() - startTime); } - commitSpaceReservedForWrite(volume, spaceReserved, bytesToWrite); + if (spaceReserved) { + commitSpaceReservedForWrite(volume, bytesToWrite); + } // We should increment stats after writeChunk if (isWrite) { metrics.incContainerBytesStats(Type.WriteChunk, writeChunk @@ -1121,11 +1123,9 @@ ContainerCommandResponseProto handleWriteChunk( /** * Commit space reserved for write to usedSpace when write operation succeeds. */ - private void commitSpaceReservedForWrite(HddsVolume volume, boolean spaceReserved, long bytes) { - if (spaceReserved) { - volume.releaseReservedSpaceForWrite(bytes); - volume.incrementUsedSpace(bytes); - } + private void commitSpaceReservedForWrite(HddsVolume volume, long bytes) { + volume.incrementUsedSpace(bytes); + volume.releaseReservedSpaceForWrite(bytes); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 6afee1c5d77f..f770e2103a22 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -53,6 +53,7 @@ import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; @@ -68,6 +69,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; @@ -75,15 +77,19 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.token.TokenVerifier; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; @@ -94,18 +100,21 @@ import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.GenericTestUtils.LogCapturer; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -898,6 +907,115 @@ public void testICRsOnContainerClose(ContainerLayoutVersion containerLayoutVersi } } + /** + * Test that space tracking (usedSpace and committedBytes) is correctly + * managed during successful write operations. + */ + @Test + public void testWriteChunkSpaceTrackingSuccess() throws Exception { + final long containerID = 1L; + SpaceTrackingTestSetup setup = setupSpaceTrackingTest(); + + final ContainerCommandRequestProto createContainer = + createContainerRequest(setup.datanodeId, containerID); + setup.handler.handleCreateContainer(createContainer, null); + + KeyValueContainer container = (KeyValueContainer) setup.containerSet.getContainer(containerID); + assertNotNull(container); + + long initialUsedSpace = setup.volume.getCurrentUsage().getUsedSpace(); + long initialCommittedBytes = setup.volume.getCommittedBytes(); + long initialReservedSpace = setup.volume.getSpaceReservedForWrites(); + + long chunkSize = 1024 * 1024; // 1MB + ContainerCommandRequestProto writeRequest = + createWriteChunkRequest(setup.datanodeId, chunkSize); + ContainerProtos.ContainerCommandResponseProto response = + setup.handler.handleWriteChunk(writeRequest, container, null); + assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); + + long finalUsedSpace = setup.volume.getCurrentUsage().getUsedSpace(); + long finalCommittedBytes = setup.volume.getCommittedBytes(); + long finalReservedSpace = setup.volume.getSpaceReservedForWrites(); + + assertEquals(initialUsedSpace + chunkSize, finalUsedSpace, + "usedSpace should be incremented by chunk size after successful write"); + assertTrue(finalCommittedBytes < initialCommittedBytes, + "committedBytes should be decremented after successful write"); + assertEquals(initialReservedSpace, finalReservedSpace, + "spaceReservedForWrites should be back to initial value after successful write"); + } + + /** + * Test that space tracking is correctly rolled back when write operation fails. + * This test uses reflection to mock the ChunkManager and inject a failure during + * writeChunk(), which happens AFTER space is reserved. This verifies that: + * 1. usedSpace remains unchanged (never incremented on failure) + * 2. spaceReservedForWrites is released (incremented then decremented back) + * 3. committedBytes is restored (decremented by writeChunk, then incremented back on rollback) + */ + @Test + public void testWriteChunkSpaceTrackingFailure() throws Exception { + final long containerID = 1L; + SpaceTrackingTestSetup setup = setupSpaceTrackingTest(); + + final ContainerCommandRequestProto createContainer = + createContainerRequest(setup.datanodeId, containerID); + setup.handler.handleCreateContainer(createContainer, null); + + KeyValueContainer container = (KeyValueContainer) setup.containerSet.getContainer(containerID); + assertNotNull(container); + + long initialUsedSpace = setup.volume.getCurrentUsage().getUsedSpace(); + long initialCommittedBytes = setup.volume.getCommittedBytes(); + long initialReservedSpace = setup.volume.getSpaceReservedForWrites(); + + // Use reflection to replace the chunkManager in the handler with a spy, + // so we can inject a failure during writeChunk() + Field chunkManagerField = KeyValueHandler.class.getDeclaredField("chunkManager"); + chunkManagerField.setAccessible(true); + ChunkManager originalChunkManager = (ChunkManager) chunkManagerField.get(setup.handler); + ChunkManager spyChunkManager = spy(originalChunkManager); + + // Configure the spy to throw an IOException on writeChunk call + doAnswer(invocation -> { + throw new IOException("Simulated disk write failure"); + }).when(spyChunkManager).writeChunk( + any(Container.class), + any(BlockID.class), + any(ChunkInfo.class), + any(ChunkBuffer.class), + any(DispatcherContext.class)); + + chunkManagerField.set(setup.handler, spyChunkManager); + + try { + // Attempt to write a chunk - should fail during chunkManager.writeChunk() + // but AFTER space has been reserved + long chunkSize = 1024 * 1024; // 1MB + ContainerCommandRequestProto writeRequest = + createWriteChunkRequest(setup.datanodeId, chunkSize); + ContainerProtos.ContainerCommandResponseProto response = + setup.handler.handleWriteChunk(writeRequest, container, null); + + assertNotEquals(ContainerProtos.Result.SUCCESS, response.getResult(), + "Write should fail due to injected IOException"); + + long finalUsedSpace = setup.volume.getCurrentUsage().getUsedSpace(); + long finalCommittedBytes = setup.volume.getCommittedBytes(); + long finalReservedSpace = setup.volume.getSpaceReservedForWrites(); + + assertEquals(initialUsedSpace, finalUsedSpace, + "usedSpace should remain unchanged after failed write"); + assertEquals(initialCommittedBytes, finalCommittedBytes, + "committedBytes should remain unchanged after failed write (decremented then restored)"); + assertEquals(initialReservedSpace, finalReservedSpace, + "spaceReservedForWrites should be back to initial value after failed write"); + } finally { + chunkManagerField.set(setup.handler, originalChunkManager); + } + } + private static ContainerCommandRequestProto createContainerRequest( String datanodeId, long containerID) { return ContainerCommandRequestProto.newBuilder() @@ -938,4 +1056,84 @@ private KeyValueHandler createKeyValueHandler(Path path) throws IOException { return kvHandler; } + + /** + * Helper class to hold the setup components for space tracking tests. + */ + private static class SpaceTrackingTestSetup { + final KeyValueHandler handler; + final ContainerSet containerSet; + final HddsVolume volume; + final String datanodeId; + final String clusterId; + + SpaceTrackingTestSetup(KeyValueHandler handler, ContainerSet containerSet, + HddsVolume volume, String datanodeId, String clusterId) { + this.handler = handler; + this.containerSet = containerSet; + this.volume = volume; + this.datanodeId = datanodeId; + this.clusterId = clusterId; + } + } + + /** + * Helper method to set up a KeyValueHandler with real volume for space tracking tests. + */ + private SpaceTrackingTestSetup setupSpaceTrackingTest() throws IOException { + final String testDir = tempDir.toString(); + final String clusterId = UUID.randomUUID().toString(); + final String datanodeId = UUID.randomUUID().toString(); + OzoneConfiguration testConf = new OzoneConfiguration(); + final ContainerSet containerSet = newContainerSet(); + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + + HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(testConf) + .clusterID(clusterId).datanodeUuid(datanodeId) + .volumeSet(volumeSet) + .build(); + hddsVolume.format(clusterId); + hddsVolume.createWorkingDir(clusterId, null); + hddsVolume.createTmpDirs(clusterId); + + when(volumeSet.getVolumesList()) + .thenReturn(Collections.singletonList(hddsVolume)); + + final ContainerMetrics metrics = ContainerMetrics.create(testConf); + final KeyValueHandler kvHandler = new KeyValueHandler(testConf, + datanodeId, containerSet, volumeSet, metrics, + c -> { }, new ContainerChecksumTreeManager(testConf)); + kvHandler.setClusterID(clusterId); + + return new SpaceTrackingTestSetup(kvHandler, containerSet, hddsVolume, datanodeId, clusterId); + } + + /** + * Helper method to create a WriteChunk request for testing. + */ + private ContainerCommandRequestProto createWriteChunkRequest( + String datanodeId, long chunkSize) { + final long containerID = 1L; + final long localID = 1L; + ByteString data = ByteString.copyFrom(new byte[(int) chunkSize]); + + ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo.newBuilder() + .setChunkName(localID + "_chunk_1") + .setOffset(0) + .setLen(data.size()) + .setChecksumData(Checksum.getNoChecksumDataProto()) + .build(); + + WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto.newBuilder() + .setBlockID(new BlockID(containerID, localID).getDatanodeBlockIDProtobuf()) + .setChunkData(chunk) + .setData(data); + + return ContainerCommandRequestProto.newBuilder() + .setContainerID(containerID) + .setCmdType(ContainerProtos.Type.WriteChunk) + .setDatanodeUuid(datanodeId) + .setWriteChunk(writeChunkRequest) + .build(); + } } From 4f3ab029ae8ae5eb7c0c33beff253dd0021e7166 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Wed, 28 Jan 2026 11:22:11 +0530 Subject: [PATCH 4/6] Fixed checkstyle issues --- .../keyvalue/TestKeyValueHandler.java | 149 +++++++++--------- 1 file changed, 71 insertions(+), 78 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index f770e2103a22..960008c3609d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -914,29 +914,51 @@ public void testICRsOnContainerClose(ContainerLayoutVersion containerLayoutVersi @Test public void testWriteChunkSpaceTrackingSuccess() throws Exception { final long containerID = 1L; - SpaceTrackingTestSetup setup = setupSpaceTrackingTest(); + final String testDir = tempDir.toString(); + final String clusterId = UUID.randomUUID().toString(); + final String datanodeId = UUID.randomUUID().toString(); + OzoneConfiguration testConf = new OzoneConfiguration(); + final ContainerSet containerSet = newContainerSet(); + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + + HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(testConf) + .clusterID(clusterId).datanodeUuid(datanodeId) + .volumeSet(volumeSet) + .build(); + hddsVolume.format(clusterId); + hddsVolume.createWorkingDir(clusterId, null); + hddsVolume.createTmpDirs(clusterId); + + when(volumeSet.getVolumesList()) + .thenReturn(Collections.singletonList(hddsVolume)); + + final ContainerMetrics metrics = ContainerMetrics.create(testConf); + final KeyValueHandler kvHandler = new KeyValueHandler(testConf, + datanodeId, containerSet, volumeSet, metrics, + c -> { }, new ContainerChecksumTreeManager(testConf)); + kvHandler.setClusterID(clusterId); final ContainerCommandRequestProto createContainer = - createContainerRequest(setup.datanodeId, containerID); - setup.handler.handleCreateContainer(createContainer, null); + createContainerRequest(datanodeId, containerID); + kvHandler.handleCreateContainer(createContainer, null); - KeyValueContainer container = (KeyValueContainer) setup.containerSet.getContainer(containerID); + KeyValueContainer container = (KeyValueContainer) containerSet.getContainer(containerID); assertNotNull(container); - long initialUsedSpace = setup.volume.getCurrentUsage().getUsedSpace(); - long initialCommittedBytes = setup.volume.getCommittedBytes(); - long initialReservedSpace = setup.volume.getSpaceReservedForWrites(); + long initialUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); + long initialCommittedBytes = hddsVolume.getCommittedBytes(); + long initialReservedSpace = hddsVolume.getSpaceReservedForWrites(); long chunkSize = 1024 * 1024; // 1MB ContainerCommandRequestProto writeRequest = - createWriteChunkRequest(setup.datanodeId, chunkSize); + createWriteChunkRequest(datanodeId, chunkSize); ContainerProtos.ContainerCommandResponseProto response = - setup.handler.handleWriteChunk(writeRequest, container, null); + kvHandler.handleWriteChunk(writeRequest, container, null); assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); - long finalUsedSpace = setup.volume.getCurrentUsage().getUsedSpace(); - long finalCommittedBytes = setup.volume.getCommittedBytes(); - long finalReservedSpace = setup.volume.getSpaceReservedForWrites(); + long finalUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); + long finalCommittedBytes = hddsVolume.getCommittedBytes(); + long finalReservedSpace = hddsVolume.getSpaceReservedForWrites(); assertEquals(initialUsedSpace + chunkSize, finalUsedSpace, "usedSpace should be incremented by chunk size after successful write"); @@ -957,24 +979,46 @@ public void testWriteChunkSpaceTrackingSuccess() throws Exception { @Test public void testWriteChunkSpaceTrackingFailure() throws Exception { final long containerID = 1L; - SpaceTrackingTestSetup setup = setupSpaceTrackingTest(); + final String testDir = tempDir.toString(); + final String clusterId = UUID.randomUUID().toString(); + final String datanodeId = UUID.randomUUID().toString(); + OzoneConfiguration testConf = new OzoneConfiguration(); + final ContainerSet containerSet = newContainerSet(); + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + + HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(testConf) + .clusterID(clusterId).datanodeUuid(datanodeId) + .volumeSet(volumeSet) + .build(); + hddsVolume.format(clusterId); + hddsVolume.createWorkingDir(clusterId, null); + hddsVolume.createTmpDirs(clusterId); + + when(volumeSet.getVolumesList()) + .thenReturn(Collections.singletonList(hddsVolume)); + + final ContainerMetrics metrics = ContainerMetrics.create(testConf); + final KeyValueHandler kvHandler = new KeyValueHandler(testConf, + datanodeId, containerSet, volumeSet, metrics, + c -> { }, new ContainerChecksumTreeManager(testConf)); + kvHandler.setClusterID(clusterId); final ContainerCommandRequestProto createContainer = - createContainerRequest(setup.datanodeId, containerID); - setup.handler.handleCreateContainer(createContainer, null); + createContainerRequest(datanodeId, containerID); + kvHandler.handleCreateContainer(createContainer, null); - KeyValueContainer container = (KeyValueContainer) setup.containerSet.getContainer(containerID); + KeyValueContainer container = (KeyValueContainer) containerSet.getContainer(containerID); assertNotNull(container); - long initialUsedSpace = setup.volume.getCurrentUsage().getUsedSpace(); - long initialCommittedBytes = setup.volume.getCommittedBytes(); - long initialReservedSpace = setup.volume.getSpaceReservedForWrites(); + long initialUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); + long initialCommittedBytes = hddsVolume.getCommittedBytes(); + long initialReservedSpace = hddsVolume.getSpaceReservedForWrites(); // Use reflection to replace the chunkManager in the handler with a spy, // so we can inject a failure during writeChunk() Field chunkManagerField = KeyValueHandler.class.getDeclaredField("chunkManager"); chunkManagerField.setAccessible(true); - ChunkManager originalChunkManager = (ChunkManager) chunkManagerField.get(setup.handler); + ChunkManager originalChunkManager = (ChunkManager) chunkManagerField.get(kvHandler); ChunkManager spyChunkManager = spy(originalChunkManager); // Configure the spy to throw an IOException on writeChunk call @@ -987,23 +1031,23 @@ public void testWriteChunkSpaceTrackingFailure() throws Exception { any(ChunkBuffer.class), any(DispatcherContext.class)); - chunkManagerField.set(setup.handler, spyChunkManager); + chunkManagerField.set(kvHandler, spyChunkManager); try { // Attempt to write a chunk - should fail during chunkManager.writeChunk() // but AFTER space has been reserved long chunkSize = 1024 * 1024; // 1MB ContainerCommandRequestProto writeRequest = - createWriteChunkRequest(setup.datanodeId, chunkSize); + createWriteChunkRequest(datanodeId, chunkSize); ContainerProtos.ContainerCommandResponseProto response = - setup.handler.handleWriteChunk(writeRequest, container, null); + kvHandler.handleWriteChunk(writeRequest, container, null); assertNotEquals(ContainerProtos.Result.SUCCESS, response.getResult(), "Write should fail due to injected IOException"); - long finalUsedSpace = setup.volume.getCurrentUsage().getUsedSpace(); - long finalCommittedBytes = setup.volume.getCommittedBytes(); - long finalReservedSpace = setup.volume.getSpaceReservedForWrites(); + long finalUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); + long finalCommittedBytes = hddsVolume.getCommittedBytes(); + long finalReservedSpace = hddsVolume.getSpaceReservedForWrites(); assertEquals(initialUsedSpace, finalUsedSpace, "usedSpace should remain unchanged after failed write"); @@ -1012,7 +1056,7 @@ public void testWriteChunkSpaceTrackingFailure() throws Exception { assertEquals(initialReservedSpace, finalReservedSpace, "spaceReservedForWrites should be back to initial value after failed write"); } finally { - chunkManagerField.set(setup.handler, originalChunkManager); + chunkManagerField.set(kvHandler, originalChunkManager); } } @@ -1057,57 +1101,6 @@ private KeyValueHandler createKeyValueHandler(Path path) throws IOException { return kvHandler; } - /** - * Helper class to hold the setup components for space tracking tests. - */ - private static class SpaceTrackingTestSetup { - final KeyValueHandler handler; - final ContainerSet containerSet; - final HddsVolume volume; - final String datanodeId; - final String clusterId; - - SpaceTrackingTestSetup(KeyValueHandler handler, ContainerSet containerSet, - HddsVolume volume, String datanodeId, String clusterId) { - this.handler = handler; - this.containerSet = containerSet; - this.volume = volume; - this.datanodeId = datanodeId; - this.clusterId = clusterId; - } - } - - /** - * Helper method to set up a KeyValueHandler with real volume for space tracking tests. - */ - private SpaceTrackingTestSetup setupSpaceTrackingTest() throws IOException { - final String testDir = tempDir.toString(); - final String clusterId = UUID.randomUUID().toString(); - final String datanodeId = UUID.randomUUID().toString(); - OzoneConfiguration testConf = new OzoneConfiguration(); - final ContainerSet containerSet = newContainerSet(); - final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); - - HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(testConf) - .clusterID(clusterId).datanodeUuid(datanodeId) - .volumeSet(volumeSet) - .build(); - hddsVolume.format(clusterId); - hddsVolume.createWorkingDir(clusterId, null); - hddsVolume.createTmpDirs(clusterId); - - when(volumeSet.getVolumesList()) - .thenReturn(Collections.singletonList(hddsVolume)); - - final ContainerMetrics metrics = ContainerMetrics.create(testConf); - final KeyValueHandler kvHandler = new KeyValueHandler(testConf, - datanodeId, containerSet, volumeSet, metrics, - c -> { }, new ContainerChecksumTreeManager(testConf)); - kvHandler.setClusterID(clusterId); - - return new SpaceTrackingTestSetup(kvHandler, containerSet, hddsVolume, datanodeId, clusterId); - } - /** * Helper method to create a WriteChunk request for testing. */ From 72882b77fe58af4e91fa5bf96d0e92c63a8e2598 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Wed, 25 Feb 2026 22:05:18 +0530 Subject: [PATCH 5/6] Check if file length increased during overwrite and increment usedSpace --- .../container/common/impl/ContainerData.java | 44 ++--- .../container/common/volume/HddsVolume.java | 28 --- .../container/keyvalue/KeyValueHandler.java | 41 ---- .../keyvalue/impl/FilePerBlockStrategy.java | 2 +- .../keyvalue/TestKeyValueHandler.java | 187 ------------------ .../impl/TestFilePerBlockStrategy.java | 51 +++++ 6 files changed, 67 insertions(+), 286 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index e4462e8829aa..3d089b69c108 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -398,10 +398,17 @@ public Statistics getStatistics() { } /** - * Calculate how much committedBytes should be decremented for write. - * This is used both for decrementing on write and restoring on failure. + * Increase the number of bytes write into the container. + * Also decrement committed bytes against the bytes written. + * @param bytes the number of bytes write into the container. */ - private long getCommittedBytesDecrement(long bytes) { + private void incrWriteBytes(long bytes) { + /* + Increase the cached Used Space in VolumeInfo as it + maybe not updated, DU or DedicatedDiskSpaceUsage runs + periodically to update the Used Space in VolumeInfo. + */ + this.getVolume().incrementUsedSpace(bytes); // Calculate bytes used before this write operation. // Note that getBytesUsed() already includes the 'bytes' from the current write. long bytesUsedBeforeWrite = getBytesUsed() - bytes; @@ -410,31 +417,8 @@ private long getCommittedBytesDecrement(long bytes) { if (committedSpace && availableSpaceBeforeWrite > 0) { // Decrement committed space only by the portion of the write that fits within the originally committed space, // up to maxSize - return Math.min(bytes, availableSpaceBeforeWrite); - } - return 0; - } - - /** - * Increase the number of bytes write into the container. - * Also decrement committed bytes against the bytes written. - * @param bytes the number of bytes write into the container. - */ - private void incrWriteBytes(long bytes) { - long committedBytesDecrement = getCommittedBytesDecrement(bytes); - if (committedBytesDecrement > 0) { - this.getVolume().incCommittedBytes(-committedBytesDecrement); - } - } - - /** - * Restore committedBytes when a write operation fails after writeChunk succeeded. - * This undoes the committedBytes decrement done in incrWriteBytes(). - */ - public void restoreCommittedBytesOnWriteFailure(long bytes) { - long committedBytesDecrement = getCommittedBytesDecrement(bytes); - if (committedBytesDecrement > 0) { - this.getVolume().incCommittedBytes(committedBytesDecrement); + long decrement = Math.min(bytes, availableSpaceBeforeWrite); + this.getVolume().incCommittedBytes(-decrement); } } @@ -587,7 +571,9 @@ public boolean needsDataChecksum() { public void updateWriteStats(long bytesWritten, boolean overwrite) { getStatistics().updateWrite(bytesWritten, overwrite); - incrWriteBytes(bytesWritten); + if (!overwrite) { + incrWriteBytes(bytesWritten); + } } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index 80bcc9d831a5..f331db7defc3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -93,7 +93,6 @@ public class HddsVolume extends StorageVolume { private ContainerController controller; private final AtomicLong committedBytes = new AtomicLong(); // till Open containers become full - private final AtomicLong spaceReservedForWrites = new AtomicLong(); // for in-flight writes private Function gatherContainerUsages = (K) -> 0L; private final ConcurrentSkipListSet containerIds = new ConcurrentSkipListSet<>(); @@ -411,33 +410,6 @@ public long getCommittedBytes() { return committedBytes.get(); } - /** - * Reserve space for an in-flight write operation. - * - * @param bytes bytes to reserve - */ - public void reserveSpaceForWrite(long bytes) { - spaceReservedForWrites.addAndGet(bytes); - } - - /** - * Release space reserved for write when write completes or fails. - * - * @param bytes bytes to release - */ - public void releaseReservedSpaceForWrite(long bytes) { - spaceReservedForWrites.addAndGet(-bytes); - } - - /** - * Get the space reserved for in-flight writes. - * - * @return bytes reserved for in-flight writes - */ - public long getSpaceReservedForWrites() { - return spaceReservedForWrites.get(); - } - public long getFreeSpaceToSpare(long volumeCapacity) { return getDatanodeConfig().getMinFreeSpace(volumeCapacity); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index bee2a1e95073..ef598f3c0cb2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -1037,11 +1037,6 @@ ContainerCommandResponseProto handleWriteChunk( } ContainerProtos.BlockData blockDataProto = null; - HddsVolume volume = kvContainer.getContainerData().getVolume(); - long bytesToWrite = 0; - boolean spaceReserved = false; - boolean writeChunkSucceeded = false; - try { checkContainerOpen(kvContainer); @@ -1062,17 +1057,9 @@ ContainerCommandResponseProto handleWriteChunk( ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList()); // TODO: Can improve checksum validation here. Make this one-shot after protocol change. validateChunkChecksumData(data, chunkInfo); - bytesToWrite = chunkInfo.getLen(); - - // Reserve space before writing - if (volume != null && bytesToWrite > 0) { - volume.reserveSpaceForWrite(bytesToWrite); - spaceReserved = true; - } } chunkManager .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); - writeChunkSucceeded = true; final boolean isCommit = dispatcherContext.getStage().isCommit(); if (isCommit && writeChunk.hasBlock()) { @@ -1099,19 +1086,14 @@ ContainerCommandResponseProto handleWriteChunk( metrics.incContainerOpsLatencies(Type.PutBlock, Time.monotonicNowNanos() - startTime); } - if (spaceReserved) { - commitSpaceReservedForWrite(volume, bytesToWrite); - } // We should increment stats after writeChunk if (isWrite) { metrics.incContainerBytesStats(Type.WriteChunk, writeChunk .getChunkData().getLen()); } } catch (StorageContainerException ex) { - releaseSpaceReservedForWrite(volume, spaceReserved, writeChunkSucceeded, bytesToWrite, kvContainer); return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { - releaseSpaceReservedForWrite(volume, spaceReserved, writeChunkSucceeded, bytesToWrite, kvContainer); return ContainerUtils.logAndReturnError(LOG, new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION), request); @@ -1120,29 +1102,6 @@ ContainerCommandResponseProto handleWriteChunk( return getWriteChunkResponseSuccess(request, blockDataProto); } - /** - * Commit space reserved for write to usedSpace when write operation succeeds. - */ - private void commitSpaceReservedForWrite(HddsVolume volume, long bytes) { - volume.incrementUsedSpace(bytes); - volume.releaseReservedSpaceForWrite(bytes); - } - - /** - * Release space reserved for write when write operation fails. - * Also restores committedBytes if it was decremented during write. - */ - private void releaseSpaceReservedForWrite(HddsVolume volume, boolean spaceReserved, - boolean writeChunkSucceeded, long bytes, KeyValueContainer kvContainer) { - if (spaceReserved) { - volume.releaseReservedSpaceForWrite(bytes); - // Only restore committedBytes if write chunk succeeded - if (writeChunkSucceeded) { - kvContainer.getContainerData().restoreCommittedBytesOnWriteFailure(bytes); - } - } - } - /** * Handle Write Chunk operation for closed container. Calls ChunkManager to process the request. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index 36ebdc5aa823..3e095ba6e896 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -188,7 +188,7 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info, if (overwrite) { long fileLengthAfterWrite = offset + chunkLength; if (fileLengthAfterWrite > fileLengthBeforeWrite) { - containerData.getStatistics().updateWrite(fileLengthAfterWrite - fileLengthBeforeWrite, false); + containerData.updateWriteStats(fileLengthAfterWrite - fileLengthBeforeWrite, false); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 1a578b954650..0385564ebf75 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -56,7 +56,6 @@ import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; -import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; @@ -82,7 +81,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -90,7 +88,6 @@ import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.hdds.utils.io.RandomAccessFileChannel; import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; @@ -116,14 +113,12 @@ import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration; import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.GenericTestUtils.LogCapturer; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -917,159 +912,6 @@ public void testICRsOnContainerClose(ContainerLayoutVersion containerLayoutVersi } } - /** - * Test that space tracking (usedSpace and committedBytes) is correctly - * managed during successful write operations. - */ - @Test - public void testWriteChunkSpaceTrackingSuccess() throws Exception { - final long containerID = 1L; - final String testDir = tempDir.toString(); - final String clusterId = UUID.randomUUID().toString(); - final String datanodeId = UUID.randomUUID().toString(); - OzoneConfiguration testConf = new OzoneConfiguration(); - final ContainerSet containerSet = newContainerSet(); - final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); - - HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(testConf) - .clusterID(clusterId).datanodeUuid(datanodeId) - .volumeSet(volumeSet) - .build(); - hddsVolume.format(clusterId); - hddsVolume.createWorkingDir(clusterId, null); - hddsVolume.createTmpDirs(clusterId); - - when(volumeSet.getVolumesList()) - .thenReturn(Collections.singletonList(hddsVolume)); - - final ContainerMetrics metrics = ContainerMetrics.create(testConf); - final KeyValueHandler kvHandler = new KeyValueHandler(testConf, - datanodeId, containerSet, volumeSet, metrics, - c -> { }, new ContainerChecksumTreeManager(testConf)); - kvHandler.setClusterID(clusterId); - - final ContainerCommandRequestProto createContainer = - createContainerRequest(datanodeId, containerID); - kvHandler.handleCreateContainer(createContainer, null); - - KeyValueContainer container = (KeyValueContainer) containerSet.getContainer(containerID); - assertNotNull(container); - - long initialUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); - long initialCommittedBytes = hddsVolume.getCommittedBytes(); - long initialReservedSpace = hddsVolume.getSpaceReservedForWrites(); - - long chunkSize = 1024 * 1024; // 1MB - ContainerCommandRequestProto writeRequest = - createWriteChunkRequest(datanodeId, chunkSize); - ContainerProtos.ContainerCommandResponseProto response = - kvHandler.handleWriteChunk(writeRequest, container, null); - assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); - - long finalUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); - long finalCommittedBytes = hddsVolume.getCommittedBytes(); - long finalReservedSpace = hddsVolume.getSpaceReservedForWrites(); - - assertEquals(initialUsedSpace + chunkSize, finalUsedSpace, - "usedSpace should be incremented by chunk size after successful write"); - assertTrue(finalCommittedBytes < initialCommittedBytes, - "committedBytes should be decremented after successful write"); - assertEquals(initialReservedSpace, finalReservedSpace, - "spaceReservedForWrites should be back to initial value after successful write"); - } - - /** - * Test that space tracking is correctly rolled back when write operation fails. - * This test uses reflection to mock the ChunkManager and inject a failure during - * writeChunk(), which happens AFTER space is reserved. This verifies that: - * 1. usedSpace remains unchanged (never incremented on failure) - * 2. spaceReservedForWrites is released (incremented then decremented back) - * 3. committedBytes is restored (decremented by writeChunk, then incremented back on rollback) - */ - @Test - public void testWriteChunkSpaceTrackingFailure() throws Exception { - final long containerID = 1L; - final String testDir = tempDir.toString(); - final String clusterId = UUID.randomUUID().toString(); - final String datanodeId = UUID.randomUUID().toString(); - OzoneConfiguration testConf = new OzoneConfiguration(); - final ContainerSet containerSet = newContainerSet(); - final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); - - HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(testConf) - .clusterID(clusterId).datanodeUuid(datanodeId) - .volumeSet(volumeSet) - .build(); - hddsVolume.format(clusterId); - hddsVolume.createWorkingDir(clusterId, null); - hddsVolume.createTmpDirs(clusterId); - - when(volumeSet.getVolumesList()) - .thenReturn(Collections.singletonList(hddsVolume)); - - final ContainerMetrics metrics = ContainerMetrics.create(testConf); - final KeyValueHandler kvHandler = new KeyValueHandler(testConf, - datanodeId, containerSet, volumeSet, metrics, - c -> { }, new ContainerChecksumTreeManager(testConf)); - kvHandler.setClusterID(clusterId); - - final ContainerCommandRequestProto createContainer = - createContainerRequest(datanodeId, containerID); - kvHandler.handleCreateContainer(createContainer, null); - - KeyValueContainer container = (KeyValueContainer) containerSet.getContainer(containerID); - assertNotNull(container); - - long initialUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); - long initialCommittedBytes = hddsVolume.getCommittedBytes(); - long initialReservedSpace = hddsVolume.getSpaceReservedForWrites(); - - // Use reflection to replace the chunkManager in the handler with a spy, - // so we can inject a failure during writeChunk() - Field chunkManagerField = KeyValueHandler.class.getDeclaredField("chunkManager"); - chunkManagerField.setAccessible(true); - ChunkManager originalChunkManager = (ChunkManager) chunkManagerField.get(kvHandler); - ChunkManager spyChunkManager = spy(originalChunkManager); - - // Configure the spy to throw an IOException on writeChunk call - doAnswer(invocation -> { - throw new IOException("Simulated disk write failure"); - }).when(spyChunkManager).writeChunk( - any(Container.class), - any(BlockID.class), - any(ChunkInfo.class), - any(ChunkBuffer.class), - any(DispatcherContext.class)); - - chunkManagerField.set(kvHandler, spyChunkManager); - - try { - // Attempt to write a chunk - should fail during chunkManager.writeChunk() - // but AFTER space has been reserved - long chunkSize = 1024 * 1024; // 1MB - ContainerCommandRequestProto writeRequest = - createWriteChunkRequest(datanodeId, chunkSize); - ContainerProtos.ContainerCommandResponseProto response = - kvHandler.handleWriteChunk(writeRequest, container, null); - - assertNotEquals(ContainerProtos.Result.SUCCESS, response.getResult(), - "Write should fail due to injected IOException"); - - long finalUsedSpace = hddsVolume.getCurrentUsage().getUsedSpace(); - long finalCommittedBytes = hddsVolume.getCommittedBytes(); - long finalReservedSpace = hddsVolume.getSpaceReservedForWrites(); - - assertEquals(initialUsedSpace, finalUsedSpace, - "usedSpace should remain unchanged after failed write"); - assertEquals(initialCommittedBytes, finalCommittedBytes, - "committedBytes should remain unchanged after failed write (decremented then restored)"); - assertEquals(initialReservedSpace, finalReservedSpace, - "spaceReservedForWrites should be back to initial value after failed write"); - } finally { - chunkManagerField.set(kvHandler, originalChunkManager); - } - } - private static ContainerCommandRequestProto createContainerRequest( String datanodeId, long containerID) { return ContainerCommandRequestProto.newBuilder() @@ -1244,33 +1086,4 @@ public void onCompleted() { ContainerMetrics.remove(); } } - - /** - * Helper method to create a WriteChunk request for testing. - */ - private ContainerCommandRequestProto createWriteChunkRequest( - String datanodeId, long chunkSize) { - final long containerID = 1L; - final long localID = 1L; - ByteString data = ByteString.copyFrom(new byte[(int) chunkSize]); - - ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo.newBuilder() - .setChunkName(localID + "_chunk_1") - .setOffset(0) - .setLen(data.size()) - .setChecksumData(Checksum.getNoChecksumDataProto()) - .build(); - - WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto.newBuilder() - .setBlockID(new BlockID(containerID, localID).getDatanodeBlockIDProtobuf()) - .setChunkData(chunk) - .setData(data); - - return ContainerCommandRequestProto.newBuilder() - .setContainerID(containerID) - .setCmdType(ContainerProtos.Type.WriteChunk) - .setDatanodeUuid(datanodeId) - .setWriteChunk(writeChunkRequest) - .build(); - } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java index 95475651d014..aa846ca12c1a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java @@ -229,6 +229,57 @@ public void testWriteChunkForClosedContainer() Assertions.assertEquals(containerData.getBytesUsed(), writeChunkData.remaining() + newWriteChunkData.remaining()); } + /** + * Test that overwrite operations that extend the file correctly update usedSpace by the delta. + */ + @Test + public void testOverwriteFileExtensionUpdatesByDelta() throws Exception { + KeyValueContainer kvContainer = getKeyValueContainer(); + KeyValueContainerData containerData = kvContainer.getContainerData(); + ChunkManager chunkManager = createTestSubject(); + + // Initial write: 4 bytes at offset 0 + byte[] initialData = "test".getBytes(UTF_8); + ChunkInfo initialChunk = new ChunkInfo(String.format("%d.data.%d", getBlockID().getLocalID(), 0), + 0, // offset + initialData.length); + ChunkBuffer initialBuffer = ChunkBuffer.allocate(initialData.length).put(initialData); + initialBuffer.rewind(); + setDataChecksum(initialChunk, initialBuffer); + + long initialUsedSpace = containerData.getVolume().getCurrentUsage().getUsedSpace(); + long initialBlockBytes = containerData.getBytesUsed(); + chunkManager.writeChunk(kvContainer, getBlockID(), initialChunk, initialBuffer, WRITE_STAGE); + long afterFirstWriteUsedSpace = containerData.getVolume().getCurrentUsage().getUsedSpace(); + long afterFirstWriteBlockBytes = containerData.getBytesUsed(); + + assertEquals(initialUsedSpace + initialData.length, afterFirstWriteUsedSpace); + assertEquals(initialBlockBytes + initialData.length, afterFirstWriteBlockBytes); + + // Overwrite that extends file: write 6 bytes at offset 2 (extends file from 4 to 8 bytes) + // File before: [t][e][s][t] + // File after: [t][e][e][x][t][e][n][d] + // File length delta: 8 - 4 = 4 bytes + byte[] overwriteData = "extend".getBytes(UTF_8); + ChunkInfo overwriteChunk = new ChunkInfo(String.format("%d.data.%d", getBlockID().getLocalID(), 0), + 2, // offset - starts at position 2 + overwriteData.length); + ChunkBuffer overwriteBuffer = ChunkBuffer.allocate(overwriteData.length).put(overwriteData); + overwriteBuffer.rewind(); + setDataChecksum(overwriteChunk, overwriteBuffer); + + chunkManager.writeChunk(kvContainer, getBlockID(), overwriteChunk, overwriteBuffer, WRITE_STAGE); + long afterOverwriteUsedSpace = containerData.getVolume().getCurrentUsage().getUsedSpace(); + long afterOverwriteBlockBytes = containerData.getBytesUsed(); + + long expectedDelta = (2 + overwriteData.length) - initialData.length; // 8 - 4 = 4 + long expectedWriteBytes = initialData.length + overwriteData.length + expectedDelta; + + assertEquals(afterFirstWriteUsedSpace + expectedDelta, afterOverwriteUsedSpace); + assertEquals(afterFirstWriteBlockBytes + expectedDelta, afterOverwriteBlockBytes); + assertEquals(expectedWriteBytes, containerData.getStatistics().getWriteBytes()); + } + @Test public void testPutBlockForClosedContainer() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); From 7c8c8637473dabf27d82937396f0c508c6f8afe7 Mon Sep 17 00:00:00 2001 From: sarvekshayr Date: Mon, 2 Mar 2026 17:10:44 +0530 Subject: [PATCH 6/6] Update values correctly on overwrite --- .../ozone/container/common/impl/ContainerData.java | 10 +++++++++- .../container/keyvalue/impl/FilePerBlockStrategy.java | 9 +++++---- .../keyvalue/impl/TestFilePerBlockStrategy.java | 2 +- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index 3d089b69c108..ef082aa9e0eb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -402,7 +402,7 @@ public Statistics getStatistics() { * Also decrement committed bytes against the bytes written. * @param bytes the number of bytes write into the container. */ - private void incrWriteBytes(long bytes) { + public void incrWriteBytes(long bytes) { /* Increase the cached Used Space in VolumeInfo as it maybe not updated, DU or DedicatedDiskSpaceUsage runs @@ -674,6 +674,14 @@ public synchronized void updateWrite(long length, boolean overwrite) { writeBytes += length; } + /** + * Increment blockBytes by the given delta. + * This is used for overwrite operations that extend the file. + */ + public synchronized void incrementBlockBytes(long delta) { + blockBytes += delta; + } + public synchronized void decDeletion(long deletedBytes, long processedBytes, long deletedBlockCount, long processedBlockCount) { blockBytes -= deletedBytes; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index 3e095ba6e896..33f5bc0bdb4a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -182,13 +182,14 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info, ChunkUtils.writeData(channel, chunkFile.getName(), data, offset, chunkLength, volume); - // When overwriting, update the bytes used if the new length is greater than the old length - // This is to ensure that the bytes used is updated correctly when overwriting a smaller chunk - // with a larger chunk at the end of the block. + // When overwriting, if the file extended beyond its previous length, + // we need to account for the delta in blockBytes, usedSpace and committedBytes. if (overwrite) { long fileLengthAfterWrite = offset + chunkLength; if (fileLengthAfterWrite > fileLengthBeforeWrite) { - containerData.updateWriteStats(fileLengthAfterWrite - fileLengthBeforeWrite, false); + long delta = fileLengthAfterWrite - fileLengthBeforeWrite; + containerData.getStatistics().incrementBlockBytes(delta); + containerData.incrWriteBytes(delta); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java index aa846ca12c1a..364ddad2cfd3 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java @@ -273,7 +273,7 @@ public void testOverwriteFileExtensionUpdatesByDelta() throws Exception { long afterOverwriteBlockBytes = containerData.getBytesUsed(); long expectedDelta = (2 + overwriteData.length) - initialData.length; // 8 - 4 = 4 - long expectedWriteBytes = initialData.length + overwriteData.length + expectedDelta; + long expectedWriteBytes = initialData.length + overwriteData.length; // 4 + 6 = 10 assertEquals(afterFirstWriteUsedSpace + expectedDelta, afterOverwriteUsedSpace); assertEquals(afterFirstWriteBlockBytes + expectedDelta, afterOverwriteBlockBytes);